This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 077589acb98 Fix PostRelease Nightly Snapshot job (#34055)
077589acb98 is described below

commit 077589acb98ce768da2238ae6fc8df3da5a9c797
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Sun Feb 23 19:00:14 2025 +0400

    Fix PostRelease Nightly Snapshot job (#34055)
    
    * Refactor mobilegaming groovy scripts
    
    * Add retry policy
---
 .../game/utils/WriteWindowedToBigQuery.java        |  4 +-
 .../src/main/groovy/MobileGamingCommands.groovy    |  2 +-
 .../main/groovy/mobilegaming-java-dataflow.groovy  | 59 +++++++++++++++++-----
 .../main/groovy/mobilegaming-java-direct.groovy    | 53 ++++++++++++++-----
 4 files changed, 90 insertions(+), 28 deletions(-)

diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
 
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 37bd8176015..36fa18a34e0 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -64,7 +65,8 @@ public class WriteWindowedToBigQuery<T> extends 
WriteToBigQuery<T> {
                 .to(getTable(projectId, datasetId, tableName))
                 .withSchema(getSchema())
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-                .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+                
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
     return PDone.in(teamAndScore.getPipeline());
   }
 }
diff --git a/release/src/main/groovy/MobileGamingCommands.groovy 
b/release/src/main/groovy/MobileGamingCommands.groovy
index eeac968f576..197cbd7a1cd 100644
--- a/release/src/main/groovy/MobileGamingCommands.groovy
+++ b/release/src/main/groovy/MobileGamingCommands.groovy
@@ -30,7 +30,7 @@ class MobileGamingCommands {
     SparkRunner: "spark-runner",
     FlinkRunner: "flink-runner"]
 
-  public static final EXECUTION_TIMEOUT_IN_MINUTES = 60
+  public static final EXECUTION_TIMEOUT_IN_MINUTES = 80
 
   // Lists used to verify team names generated in the LeaderBoard example.
   // This list should be kept sync with COLORS in 
org.apache.beam.examples.complete.game.injector.Injector.
diff --git a/release/src/main/groovy/mobilegaming-java-dataflow.groovy 
b/release/src/main/groovy/mobilegaming-java-dataflow.groovy
index 60853d5542f..2ead5e11a3c 100644
--- a/release/src/main/groovy/mobilegaming-java-dataflow.groovy
+++ b/release/src/main/groovy/mobilegaming-java-dataflow.groovy
@@ -66,16 +66,53 @@ class LeaderBoardRunner {
   def run(runner, TestScripts t, MobileGamingCommands mobileGamingCommands, 
boolean useStreamingEngine) {
     t.intent("Running: LeaderBoard example on DataflowRunner" +
             (useStreamingEngine ? " with Streaming Engine" : ""))
-    t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_user")
-    t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_team")
+
+    def dataset = t.bqDataset()
+    def userTable = "leaderboard_DataflowRunner_user"
+    def teamTable = "leaderboard_DataflowRunner_team"
+    def userSchema = [
+            "user:STRING",
+            "total_score:INTEGER",
+            "processing_time:STRING"
+    ].join(",")
+    def teamSchema = [
+            "team:STRING",
+            "total_score:INTEGER",
+            "window_start:STRING",
+            "processing_time:STRING",
+            "timing:STRING"
+    ].join(",")
+
+    // Remove existing tables if they exist
+    String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name 
FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
+
+    if (tables.contains(userTable)) {
+      t.run("bq rm -f -t ${dataset}.${userTable}")
+    }
+    if (tables.contains(teamTable)) {
+      t.run("bq rm -f -t ${dataset}.${teamTable}")
+    }
+
     // It will take couple seconds to clean up tables.
     // This loop makes sure tables are completely deleted before running the 
pipeline
-    String tables = ""
-    while ({
+    tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES'")
+    while (tables.contains(userTable) || tables.contains(teamTable)) {
       sleep(3000)
-      tables = t.run("bq query SELECT table_id FROM 
${t.bqDataset()}.__TABLES_SUMMARY__")
-      tables.contains("leaderboard_${}_user") || 
tables.contains("leaderboard_${runner}_team")
-    }());
+      tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES'")
+    }
+
+    t.intent("Creating table: ${userTable}")
+    t.run("bq mk --table ${dataset}.${userTable} ${userSchema}")
+    t.intent("Creating table: ${teamTable}")
+    t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}")
+
+    // Verify that the tables have been created successfully
+    tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES'")
+    while (!tables.contains(userTable) || !tables.contains(teamTable)) {
+      sleep(3000)
+      tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES'")
+    }
+    println "Tables ${userTable} and ${teamTable} created successfully."
 
     def InjectorThread = Thread.start() {
       t.run(mobileGamingCommands.createInjectorCommand())
@@ -99,11 +136,9 @@ class LeaderBoardRunner {
     String query_result = ""
     while ((System.currentTimeMillis() - startTime) / 60000 < 
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
       try {
-        tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM 
${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
-        if (tables.contains("leaderboard_${runner}_user") && 
tables.contains("leaderboard_${runner}_team")) {
-          query_result = t.run """bq query --batch "SELECT user FROM 
[${t.gcpProject()}:${
-            t.bqDataset()
-          }.leaderboard_${runner}_user] LIMIT 10\""""
+        tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES"
+        if (tables.contains(userTable) && tables.contains(teamTable)) {
+          query_result = t.run """bq query --batch "SELECT user FROM 
[${dataset}.${userTable}] LIMIT 10\""""
           if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) {
             isSuccess = true
             break
diff --git a/release/src/main/groovy/mobilegaming-java-direct.groovy 
b/release/src/main/groovy/mobilegaming-java-direct.groovy
index 8622a8a4a6c..34eab4c0076 100644
--- a/release/src/main/groovy/mobilegaming-java-direct.groovy
+++ b/release/src/main/groovy/mobilegaming-java-direct.groovy
@@ -62,16 +62,41 @@ t.success("HourlyTeamScore successfully run on 
DirectRunners.")
  * */
 
 t.intent("Running: LeaderBoard example on DirectRunner")
-t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_user")
-t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_team")
-// It will take couple seconds to clean up tables.
-// This loop makes sure tables are completely deleted before running the 
pipeline
-String tables = ""
-while({
+
+def dataset = t.bqDataset()
+def userTable = "leaderboard_DirectRunner_user"
+def teamTable = "leaderboard_DirectRunner_team"
+def userSchema = [
+        "user:STRING",
+        "total_score:INTEGER",
+        "processing_time:STRING"
+].join(",")
+def teamSchema = [
+        "team:STRING",
+        "total_score:INTEGER",
+        "window_start:STRING",
+        "processing_time:STRING",
+        "timing:STRING"
+].join(",")
+
+String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES'")
+
+if (!tables.contains(userTable)) {
+  t.intent("Creating table: ${userTable}")
+  t.run("bq mk --table ${dataset}.${userTable} ${userSchema}")
+}
+if (!tables.contains(teamTable)) {
+  t.intent("Creating table: ${teamTable}")
+  t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}")
+}
+
+// Verify that the tables have been created
+tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES'")
+while (!tables.contains(userTable) || !tables.contains(teamTable)) {
   sleep(3000)
-  tables = t.run ("bq query SELECT table_id FROM 
${t.bqDataset()}.__TABLES_SUMMARY__")
-  tables.contains("leaderboard_${runner}_user") || 
tables.contains("leaderboard_${runner}_team")
-}());
+  tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES'")
+}
+println "Tables ${userTable} and ${teamTable} created successfully."
 
 def InjectorThread = Thread.start() {
   t.run(mobileGamingCommands.createInjectorCommand())
@@ -86,12 +111,12 @@ def LeaderBoardThread = Thread.start() {
 def startTime = System.currentTimeMillis()
 def isSuccess = false
 String query_result = ""
-while((System.currentTimeMillis() - startTime)/60000 < 
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
+while ((System.currentTimeMillis() - startTime)/60000 < 
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
   try {
-    tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM 
${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
-    if(tables.contains("leaderboard_${runner}_user") && 
tables.contains("leaderboard_${runner}_team")) {
-      query_result = t.run """bq query --batch "SELECT user FROM 
[${t.gcpProject()}.${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\""""
-      if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
+    tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM 
${dataset}.INFORMATION_SCHEMA.TABLES"
+    if (tables.contains(userTable) && tables.contains(teamTable)) {
+      query_result = t.run """bq query --batch "SELECT user FROM 
[${dataset}.${userTable}] LIMIT 10\""""
+      if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
         isSuccess = true
         break
       }

Reply via email to