This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 80ba916e94d fix retry scenario for query to table materialization 
(#36912)
80ba916e94d is described below

commit 80ba916e94d5ff28fc33a16d38861057eb492313
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Wed Nov 26 20:35:44 2025 +0100

    fix retry scenario for query to table materialization (#36912)
    
    * fix retry scenario for query to table materialization
    
    * fix retry scenario for query to table materialization
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  2 ++
 .../gcp/bigquery/BigQueryStorageQuerySource.java   | 22 ++++++++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7aef1bd1ce0..69b9c62ceea 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -979,6 +979,8 @@ public class BigQueryIO {
                   getParseFn(),
                   getOutputCoder(),
                   getBigQueryServices());
+          // due to retry, table may already exist, remove it to ensure 
correctness
+          
querySource.removeDestinationIfExists(options.as(BigQueryOptions.class));
           Table queryResultTable = 
querySource.getTargetTable(options.as(BigQueryOptions.class));
 
           BigQueryStorageTableSource<T> output =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index a2350ef19a7..07c3273c293 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.bigquery.model.JobStatistics;
@@ -25,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.cloud.bigquery.storage.v1.DataFormat;
 import java.io.IOException;
 import java.io.ObjectInputStream;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
@@ -188,4 +190,24 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
   protected @Nullable String getTargetTableId(BigQueryOptions options) throws 
Exception {
     return null;
   }
+
+  void removeDestinationIfExists(BigQueryOptions options) throws Exception {
+    DatasetService datasetService = 
bqServices.getDatasetService(options.as(BigQueryOptions.class));
+    String project = queryTempProject;
+    if (project == null) {
+      project =
+          options.as(BigQueryOptions.class).getBigQueryProject() == null
+              ? options.as(BigQueryOptions.class).getProject()
+              : options.as(BigQueryOptions.class).getBigQueryProject();
+    }
+    String tempTableID =
+        BigQueryResourceNaming.createJobIdPrefix(
+            options.getJobName(), stepUuid, 
BigQueryResourceNaming.JobType.QUERY);
+    TableReference tempTableReference =
+        createTempTableReference(project, tempTableID, 
Optional.ofNullable(queryTempDataset));
+    Table destTable = datasetService.getTable(tempTableReference);
+    if (destTable != null) {
+      datasetService.deleteTable(tempTableReference);
+    }
+  }
 }

Reply via email to