This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fd5e212737a Fix PostRelease Nightly Snapshot (#33798)
fd5e212737a is described below
commit fd5e212737afd8bc12e63b72c3aefd797147b56b
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Thu Jan 30 17:48:20 2025 +0400
Fix PostRelease Nightly Snapshot (#33798)
* Update mobile gaming groovy scripts
* Add retry
---
.../complete/game/utils/WriteToBigQuery.java | 4 +++-
.../main/groovy/mobilegaming-java-dataflow.groovy | 21 +++++++++++++--------
.../src/main/groovy/mobilegaming-java-direct.groovy | 17 +++++++++++------
3 files changed, 27 insertions(+), 15 deletions(-)
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index dadc974e62c..eef4bc93268 100644
---
a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++
b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -129,7 +130,8 @@ public class WriteToBigQuery<InputT> extends
PTransform<PCollection<InputT>, PDo
.to(getTable(projectId, datasetId, tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
return PDone.in(teamAndScore.getPipeline());
}
diff --git a/release/src/main/groovy/mobilegaming-java-dataflow.groovy
b/release/src/main/groovy/mobilegaming-java-dataflow.groovy
index bb0b76bd675..60853d5542f 100644
--- a/release/src/main/groovy/mobilegaming-java-dataflow.groovy
+++ b/release/src/main/groovy/mobilegaming-java-dataflow.groovy
@@ -98,15 +98,20 @@ class LeaderBoardRunner {
def isSuccess = false
String query_result = ""
while ((System.currentTimeMillis() - startTime) / 60000 <
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
- tables = t.run "bq query SELECT table_id FROM
${t.bqDataset()}.__TABLES_SUMMARY__"
- if (tables.contains("leaderboard_${runner}_user") &&
tables.contains("leaderboard_${runner}_team")) {
- query_result = t.run """bq query --batch "SELECT user FROM
[${t.gcpProject()}:${
- t.bqDataset()
- }.leaderboard_${runner}_user] LIMIT 10\""""
- if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) {
- isSuccess = true
- break
+ try {
+ tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM
${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
+ if (tables.contains("leaderboard_${runner}_user") &&
tables.contains("leaderboard_${runner}_team")) {
+ query_result = t.run """bq query --batch "SELECT user FROM
[${t.gcpProject()}:${
+ t.bqDataset()
+ }.leaderboard_${runner}_user] LIMIT 10\""""
+ if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) {
+ isSuccess = true
+ break
+ }
}
+ } catch (Exception e) {
+ println "Warning: Exception while checking tables: ${e.message}"
+ println "Retrying..."
}
println "Waiting for pipeline to produce more results..."
sleep(60000) // wait for 1 min
diff --git a/release/src/main/groovy/mobilegaming-java-direct.groovy
b/release/src/main/groovy/mobilegaming-java-direct.groovy
index 3c6f4ca01a6..8622a8a4a6c 100644
--- a/release/src/main/groovy/mobilegaming-java-direct.groovy
+++ b/release/src/main/groovy/mobilegaming-java-direct.groovy
@@ -87,13 +87,18 @@ def startTime = System.currentTimeMillis()
def isSuccess = false
String query_result = ""
while((System.currentTimeMillis() - startTime)/60000 <
mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
- tables = t.run "bq query SELECT table_id FROM
${t.bqDataset()}.__TABLES_SUMMARY__"
- if(tables.contains("leaderboard_${runner}_user") &&
tables.contains("leaderboard_${runner}_team")){
- query_result = t.run """bq query --batch "SELECT user FROM
[${t.gcpProject()}:${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\""""
- if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
- isSuccess = true
- break
+ try {
+ tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM
${t.bqDataset()}.INFORMATION_SCHEMA.TABLES"
+ if(tables.contains("leaderboard_${runner}_user") &&
tables.contains("leaderboard_${runner}_team")) {
+ query_result = t.run """bq query --batch "SELECT user FROM
[${t.gcpProject()}.${t.bqDataset()}.leaderboard_${runner}_user] LIMIT 10\""""
+ if(t.seeAnyOf(mobileGamingCommands.COLORS, query_result)){
+ isSuccess = true
+ break
+ }
}
+ } catch (Exception e) {
+ println "Warning: Exception while checking tables: ${e.message}"
+ println "Retrying..."
}
println "Waiting for pipeline to produce more results..."
sleep(60000) // wait for 1 min