gemini-code-assist[bot] commented on code in PR #38883:
URL: https://github.com/apache/beam/pull/38883#discussion_r3413854579


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,197 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference destinationWithDefaultProject = withDefaultProject(options, 
destination);
+    TableReference source =
+        withDefaultProject(options, cloneSource, 
destinationWithDefaultProject.getProjectId());
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destinationWithDefaultProject)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destinationWithDefaultProject.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService,
+            destinationWithDefaultProject.getProjectId(),
+            destinationWithDefaultProject.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, 
destinationWithDefaultProject);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    createJobReference(jobId.getJobId(), jobProjectId, 
bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    createJobReference(jobId.getJobId(), jobProjectId, 
bqLocation);
+                try {
+                  return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new RuntimeException(e);
+                }
+              },

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The lambda expression for `pollJob` does not catch `IOException`. Since 
`jobService.pollJob` throws `IOException` (a checked exception) and 
`SerializableFunction.apply` does not declare any checked exceptions, this will 
cause a compilation failure. Please catch `IOException` and wrap it in a 
`RuntimeException`, similar to how it is handled in the `getJob` and 
`startCopyJob` lambdas.
   
   ```java
                 jobId -> {
                   JobReference jobRef = 
                       createJobReference(jobId.getJobId(), jobProjectId, 
bqLocation);
                   try {
                     return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
                   } catch (IOException e) {
                     throw new RuntimeException(e);
                   } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw new RuntimeException(e);
                   }
                 },
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +227,40 @@ public String toString() {
     }
   }
 
+  /** Returns the same clone source for every table. */
+  static class ConstantCloneSourceDestinations<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
+    private final ValueProvider<String> jsonCloneSource;
+    private transient @Nullable TableReference cloneSource;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `cloneSource` field is lazily initialized in `getCloneSource` but is not 
marked as `volatile`. Since `DynamicDestinations` instances can be accessed 
concurrently by multiple worker threads, a lack of volatile/synchronization can 
lead to safe publication issues (e.g., another thread reading a partially 
constructed `TableReference` object). Marking the field as `volatile` ensures 
safe publication across threads.
   
   ```suggestion
       private transient volatile @Nullable TableReference cloneSource;
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -442,16 +487,12 @@ private MatchTableDynamicDestinations(
               throw new IllegalStateException("pipeline options cannot be 
null");
             }
             BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-            if (tableReference.getProjectId() == null) {
-              tableReference.setProjectId(
-                  bqOptions.getBigQueryProject() == null
-                      ? bqOptions.getProject()
-                      : bqOptions.getBigQueryProject());
-            }
+            TableReference tableReferenceToFetch =
+                withDefaultProject(bqOptions, tableReference, 
defaultProjectId);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Use the package-private `CreateTableHelpers.withDefaultProject` helper 
instead of the duplicated local version to improve maintainability.
   
   ```suggestion
               TableReference tableReferenceToFetch =
                   CreateTableHelpers.withDefaultProject(bqOptions, 
tableReference, defaultProjectId);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,197 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference destinationWithDefaultProject = withDefaultProject(options, 
destination);
+    TableReference source =
+        withDefaultProject(options, cloneSource, 
destinationWithDefaultProject.getProjectId());
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destinationWithDefaultProject)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destinationWithDefaultProject.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService,
+            destinationWithDefaultProject.getProjectId(),
+            destinationWithDefaultProject.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, 
destinationWithDefaultProject);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    createJobReference(jobId.getJobId(), jobProjectId, 
bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    createJobReference(jobId.getJobId(), jobProjectId, 
bqLocation);
+                try {
+                  return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new RuntimeException(e);
+                }
+              },
+              jobId -> {
+                JobReference jobRef =
+                    createJobReference(jobId.getJobId(), jobProjectId, 
bqLocation);
+                try {
+                  return jobService.getJob(jobRef);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  throw new RuntimeException(e);
+                }
+              },
+              BatchLoads.DEFAULT_MAX_RETRY_JOBS,
+              jobIdPrefix);
+      cloneJob.runJob();
+      new BigQueryHelpers.PendingJobManager()
+          .addPendingJob(
+              cloneJob,
+              job -> {
+                LOG.info("Created BigQuery clone table {} from {}", tableSpec, 
source);
+                return null;
+              })
+          .waitForDone();
+    }
+  }
+
+  private static JobReference createJobReference(String jobId, String 
projectId, String location) {
+    return new 
JobReference().setProjectId(projectId).setJobId(jobId).setLocation(location);
+  }
+
+  private static TableReference withDefaultProject(
+      BigQueryOptions options, TableReference tableReference) {
+    return withDefaultProject(options, tableReference, null);
+  }
+
+  private static TableReference withDefaultProject(
+      BigQueryOptions options, TableReference tableReference, @Nullable String 
defaultProjectId) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `withDefaultProject` helper method is duplicated between 
`CreateTableHelpers` and `DynamicDestinationsHelpers`. Since both classes 
reside in the same package (`org.apache.beam.sdk.io.gcp.bigquery`), we can make 
this method package-private (by removing `private`) and reuse it in 
`DynamicDestinationsHelpers` to avoid code duplication.
   
   ```suggestion
     static TableReference withDefaultProject(
         BigQueryOptions options, TableReference tableReference, @Nullable 
String defaultProjectId) {
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -463,6 +504,25 @@ private MatchTableDynamicDestinations(
       return null;
     }
 
+    private static TableReference withDefaultProject(
+        BigQueryOptions options, TableReference tableReference, @Nullable 
String defaultProjectId) {
+      TableReference updated = tableReference.clone();
+      if (Strings.isNullOrEmpty(updated.getProjectId())) {
+        if (defaultProjectId != null && !defaultProjectId.isEmpty()) {
+          updated.setProjectId(defaultProjectId);
+        } else {
+          @Nullable String projectId = options.getBigQueryProject();
+          if (projectId == null || projectId.isEmpty()) {
+            projectId = options.getProject();
+          }
+          if (projectId != null && !projectId.isEmpty()) {
+            updated.setProjectId(projectId);
+          }
+        }
+      }
+      return updated;
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This `withDefaultProject` method is identical to the one in 
`CreateTableHelpers`. Since both classes are in the same package, we can remove 
this duplicated method entirely and call 
`CreateTableHelpers.withDefaultProject` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to