This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 077589acb98 Fix PostRelease Nightly Snapshot job (#34055)
077589acb98 is described below
commit 077589acb98ce768da2238ae6fc8df3da5a9c797
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Sun Feb 23 19:00:14 2025 +0400
Fix PostRelease Nightly Snapshot job (#34055)
* Refactor mobilegaming groovy scripts
* Add retry policy
---
.../game/utils/WriteWindowedToBigQuery.java | 4 +-
.../src/main/groovy/MobileGamingCommands.groovy | 2 +-
.../main/groovy/mobilegaming-java-dataflow.groovy | 59 +++++++++++++++++-----
.../main/groovy/mobilegaming-java-direct.groovy | 53 ++++++++++++++-----
4 files changed, 90 insertions(+), 28 deletions(-)
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 37bd8176015..36fa18a34e0 100644
---
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -64,7 +65,8 @@ public class WriteWindowedToBigQuery<T> extends
WriteToBigQuery<T> {
.to(getTable(projectId, datasetId, tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
return PDone.in(teamAndScore.getPipeline());
}
}
diff --git a/release/src/main/groovy/MobileGamingCommands.groovy
b/release/src/main/groovy/MobileGamingCommands.groovy
index eeac968f576..197cbd7a1cd 100644
--- a/release/src/main/groovy/MobileGamingCommands.groovy
+++ b/release/src/main/groovy/MobileGamingCommands.groovy
@@ -30,7 +30,7 @@ class MobileGamingCommands {
SparkRunner: "spark-runner",
FlinkRunner: "flink-runner"]
- public static final EXECUTION_TIMEOUT_IN_MINUTES = 60
+ public static final EXECUTION_TIMEOUT_IN_MINUTES = 80
// Lists used to verify team names generated in the LeaderBoard example.
// This list should be kept sync with COLORS in
org.apache.beam.examples.complete.game.injector.Injector.
diff --git a/release/src/main/groovy/mobilegaming-java-dataflow.groovy
b/release/src/main/groovy/mobilegaming-java-dataflow.groovy
index 60853d5542f..2ead5e11a3c 100644
--- a/release/src/main/groovy/mobilegaming-java-dataflow.groovy
+++ b/release/src/main/groovy/mobilegaming-java-dataflow.groovy
@@ -66,16 +66,53 @@ class LeaderBoardRunner {
def run(runner, TestScripts t, MobileGamingCommands mobileGamingCommands,
boolean useStreamingEngine) {
t.intent("Running: LeaderBoard example on DataflowRunner" +
(useStreamingEngine ? " with Streaming Engine" : ""))
- t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_user")
- t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DataflowRunner_team")
+
+ def dataset = t.bqDataset()
+ def userTable = "leaderboard_DataflowRunner_user"
+ def teamTable = "leaderboard_DataflowRunner_team"
+ def userSchema = [
+ "user:STRING",
+ "total_score:INTEGER",
+ "processing_time:STRING"
+ ].join(",")
+ def teamSchema = [
+ "team:STRING",
+ "total_score:INTEGER",
+ "window_start:STRING",
+ "processing_time:STRING",
+ "timing:STRING"
+ ].join(",")
+
+ // Remove existing tables if they exist
+ String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name
FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
+
+ if (tables.contains(userTable)) {
+ t.run("bq rm -f -t ${dataset}.${userTable}")
+ }
+ if (tables.contains(teamTable)) {
+ t.run("bq rm -f -t ${dataset}.${teamTable}")
+ }
+
// It will take couple seconds to clean up tables.
// This loop makes sure tables are completely deleted before running the
pipeline
- String tables = ""
- while ({
+ tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES'")
+ while (tables.contains(userTable) || tables.contains(teamTable)) {
sleep(3000)
- tables = t.run("bq query SELECT table_id FROM
${t.bqDataset()}.__TABLES_SUMMARY__")
- tables.contains("leaderboard_${}_user") ||
tables.contains("leaderboard_${runner}_team")
- }());
+ tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES'")
+ }
+
+ t.intent("Creating table: ${userTable}")
+ t.run("bq mk --table ${dataset}.${userTable} ${userSchema}")
+ t.intent("Creating table: ${teamTable}")
+ t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}")
+
+ // Verify that the tables have been created successfully
+ tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES'")
+ while (!tables.contains(userTable) || !tables.contains(teamTable)) {
+ sleep(3000)
+ tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES'")
+ }
+ println "Tables ${userTable} and ${teamTable} created successfully."
def InjectorThread = Thread.start() {
t.run(mobileGamingCommands.createInjectorCommand())
@@ -99,11 +136,9 @@ class LeaderBoardRunner {
String query_result = ""
while ((System.currentTimeMillis() - startTime) / 60000 <
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
try {
- tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM
${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
- if (tables.contains("leaderboard_${runner}_user") &&
tables.contains("leaderboard_${runner}_team")) {
- query_result = t.run """bq query --batch "SELECT user FROM
[${t.gcpProject()}:${
- t.bqDataset()
- }.leaderboard_${runner}_user] LIMIT 10\""""
+ tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES"
+ if (tables.contains(userTable) && tables.contains(teamTable)) {
+ query_result = t.run """bq query --batch "SELECT user FROM
[${dataset}.${userTable}] LIMIT 10\""""
if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) {
isSuccess = true
break
diff --git a/release/src/main/groovy/mobilegaming-java-direct.groovy
b/release/src/main/groovy/mobilegaming-java-direct.groovy
index 8622a8a4a6c..34eab4c0076 100644
--- a/release/src/main/groovy/mobilegaming-java-direct.groovy
+++ b/release/src/main/groovy/mobilegaming-java-direct.groovy
@@ -62,16 +62,41 @@ t.success("HourlyTeamScore successfully run on
DirectRunners.")
* */
t.intent("Running: LeaderBoard example on DirectRunner")
-t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_user")
-t.run("bq rm -f -t ${t.bqDataset()}.leaderboard_DirectRunner_team")
-// It will take couple seconds to clean up tables.
-// This loop makes sure tables are completely deleted before running the
pipeline
-String tables = ""
-while({
+
+def dataset = t.bqDataset()
+def userTable = "leaderboard_DirectRunner_user"
+def teamTable = "leaderboard_DirectRunner_team"
+def userSchema = [
+ "user:STRING",
+ "total_score:INTEGER",
+ "processing_time:STRING"
+].join(",")
+def teamSchema = [
+ "team:STRING",
+ "total_score:INTEGER",
+ "window_start:STRING",
+ "processing_time:STRING",
+ "timing:STRING"
+].join(",")
+
+String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES'")
+
+if (!tables.contains(userTable)) {
+ t.intent("Creating table: ${userTable}")
+ t.run("bq mk --table ${dataset}.${userTable} ${userSchema}")
+}
+if (!tables.contains(teamTable)) {
+ t.intent("Creating table: ${teamTable}")
+ t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}")
+}
+
+// Verify that the tables have been created
+tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES'")
+while (!tables.contains(userTable) || !tables.contains(teamTable)) {
sleep(3000)
- tables = t.run ("bq query SELECT table_id FROM
${t.bqDataset()}.__TABLES_SUMMARY__")
- tables.contains("leaderboard_${runner}_user") ||
tables.contains("leaderboard_${runner}_team")
-}());
+ tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES'")
+}
+println "Tables ${userTable} and ${teamTable} created successfully."
def InjectorThread = Thread.start() {
t.run(mobileGamingCommands.createInjectorCommand())
@@ -86,12 +111,12 @@ def LeaderBoardThread = Thread.start() {
def startTime = System.currentTimeMillis()
def isSuccess = false
String query_result = ""
-while((System.currentTimeMillis() - startTime)/60000 <
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
+while ((System.currentTimeMillis() - startTime)/60000 <
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
try {
- tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM
${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
- if(tables.contains("leaderboard_${runner}_user") &&
tables.contains("leaderboard_${runner}_team")) {
- query_result = t.run """bq query --batch "SELECT user FROM
[${t.gcpProject()}.${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\""""
- if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
+ tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM
${dataset}.INFORMATION_SCHEMA.TABLES"
+ if (tables.contains(userTable) && tables.contains(teamTable)) {
+ query_result = t.run """bq query --batch "SELECT user FROM
[${dataset}.${userTable}] LIMIT 10\""""
+ if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
isSuccess = true
break
}