This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch release-2.70
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.70 by this push:
new e075a50101a fix retry scenario for query to table materialization
(#36912) (#36916)
e075a50101a is described below
commit e075a50101ace82386cb20637721e310c29f53f7
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Wed Nov 26 21:43:15 2025 +0100
fix retry scenario for query to table materialization (#36912) (#36916)
* fix retry scenario for query to table materialization
* fix retry scenario for query to table materialization
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 ++
.../gcp/bigquery/BigQueryStorageQuerySource.java | 22 ++++++++++++++++++++++
2 files changed, 24 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7aef1bd1ce0..69b9c62ceea 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -979,6 +979,8 @@ public class BigQueryIO {
getParseFn(),
getOutputCoder(),
getBigQueryServices());
+ // due to retry, table may already exist, remove it to ensure
correctness
+
querySource.removeDestinationIfExists(options.as(BigQueryOptions.class));
Table queryResultTable =
querySource.getTargetTable(options.as(BigQueryOptions.class));
BigQueryStorageTableSource<T> output =
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index a2350ef19a7..07c3273c293 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static
org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.bigquery.model.JobStatistics;
@@ -25,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
@@ -188,4 +190,24 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
protected @Nullable String getTargetTableId(BigQueryOptions options) throws
Exception {
return null;
}
+
+ void removeDestinationIfExists(BigQueryOptions options) throws Exception {
+ DatasetService datasetService =
bqServices.getDatasetService(options.as(BigQueryOptions.class));
+ String project = queryTempProject;
+ if (project == null) {
+ project =
+ options.as(BigQueryOptions.class).getBigQueryProject() == null
+ ? options.as(BigQueryOptions.class).getProject()
+ : options.as(BigQueryOptions.class).getBigQueryProject();
+ }
+ String tempTableID =
+ BigQueryResourceNaming.createJobIdPrefix(
+ options.getJobName(), stepUuid,
BigQueryResourceNaming.JobType.QUERY);
+ TableReference tempTableReference =
+ createTempTableReference(project, tempTableID,
Optional.ofNullable(queryTempDataset));
+ Table destTable = datasetService.getTable(tempTableReference);
+ if (destTable != null) {
+ datasetService.deleteTable(tempTableReference);
+ }
+ }
}