Copilot commented on code in PR #38883:
URL: https://github.com/apache/beam/pull/38883#discussion_r3392127956


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }

Review Comment:
   Catching `InterruptedException` and wrapping it in `RuntimeException` clears 
the thread’s interrupted status, which can cause subtle issues in 
runners/executors. Prefer catching `InterruptedException` separately, calling 
`Thread.currentThread().interrupt()`, and then either rethrowing (or wrapping) 
so upstream cancellation behavior is preserved.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);

Review Comment:
   `cloneTable(...)` normalizes the clone source with 
`withDefaultProject(...)`, but it does not normalize the `destination`. If the 
destination `TableReference` ever lacks a project ID, `jobProjectId` and 
`getDatasetLocation(...)` will be computed from null, leading to invalid job 
refs / failures. Consider applying `withDefaultProject(...)` to `destination` 
at the start of `cloneTable(...)` and using the normalized reference throughout 
(job project id, dataset location, and copy destination).



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +226,34 @@ public String toString() {
     }
   }
 
+  /** Returns the same clone source for every table. */
+  static class ConstantCloneSourceDestinations<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
+    private final ValueProvider<String> jsonCloneSource;
+
+    ConstantCloneSourceDestinations(
+        DynamicDestinations<T, DestinationT> inner, ValueProvider<String> 
jsonCloneSource) {
+      super(inner);
+      Preconditions.checkArgumentNotNull(jsonCloneSource, "jsonCloneSource can 
not be null");

Review Comment:
   The error message uses “can not”, which should be “cannot” in standard 
English (and Beam messages typically use “cannot”). Updating this improves 
consistency and professionalism of surfaced validation errors.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);

Review Comment:
   `JobReference` construction is duplicated in each `PendingJob` lambda. 
Consider extracting a small helper (e.g., `jobRef(jobId, jobProjectId, 
bqLocation)`) to reduce duplication and make future changes (adding fields, 
changing location logic) less error-prone.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +226,34 @@ public String toString() {
     }
   }
 
+  /** Returns the same clone source for every table. */
+  static class ConstantCloneSourceDestinations<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
+    private final ValueProvider<String> jsonCloneSource;
+
+    ConstantCloneSourceDestinations(
+        DynamicDestinations<T, DestinationT> inner, ValueProvider<String> 
jsonCloneSource) {
+      super(inner);
+      Preconditions.checkArgumentNotNull(jsonCloneSource, "jsonCloneSource can 
not be null");
+      this.jsonCloneSource = jsonCloneSource;
+    }
+
+    @Override
+    public TableReference getCloneSource(DestinationT destination) {
+      String jsonCloneSource = this.jsonCloneSource.get();
+      checkArgument(jsonCloneSource != null, "jsonCloneSource can not be 
null");
+      return BigQueryHelpers.fromJsonString(jsonCloneSource, 
TableReference.class);
+    }

Review Comment:
   `getCloneSource(...)` reparses the same JSON into a `TableReference` on 
every call. Since this wrapper is explicitly “constant”, consider memoizing the 
parsed `TableReference` after the first successful `ValueProvider.get()` (e.g., 
store it in a transient field) to avoid repeated JSON parsing in hot paths.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());

Review Comment:
   `cloneTable(...)` normalizes the clone source with 
`withDefaultProject(...)`, but it does not normalize the `destination`. If the 
destination `TableReference` ever lacks a project ID, `jobProjectId` and 
`getDatasetLocation(...)` will be computed from null, leading to invalid job 
refs / failures. Consider applying `withDefaultProject(...)` to `destination` 
at the start of `cloneTable(...)` and using the normalized reference throughout 
(job project id, dataset location, and copy destination).



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);

Review Comment:
   `JobReference` construction is duplicated in each `PendingJob` lambda. 
Consider extracting a small helper (e.g., `jobRef(jobId, jobProjectId, 
bqLocation)`) to reduce duplication and make future changes (adding fields, 
changing location logic) less error-prone.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  return jobService.getJob(jobRef);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }

Review Comment:
   Catching `InterruptedException` and wrapping it in `RuntimeException` clears 
the thread’s interrupted status, which can cause subtle issues in 
runners/executors. Prefer catching `InterruptedException` separately, calling 
`Thread.currentThread().interrupt()`, and then either rethrowing (or wrapping) 
so upstream cancellation behavior is preserved.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }

Review Comment:
   Catching `InterruptedException` and wrapping it in `RuntimeException` clears 
the thread’s interrupted status, which can cause subtle issues in 
runners/executors. Prefer catching `InterruptedException` separately, calling 
`Thread.currentThread().interrupt()`, and then either rethrowing (or wrapping) 
so upstream cancellation behavior is preserved.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +226,34 @@ public String toString() {
     }
   }
 
+  /** Returns the same clone source for every table. */
+  static class ConstantCloneSourceDestinations<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
+    private final ValueProvider<String> jsonCloneSource;
+
+    ConstantCloneSourceDestinations(
+        DynamicDestinations<T, DestinationT> inner, ValueProvider<String> 
jsonCloneSource) {
+      super(inner);
+      Preconditions.checkArgumentNotNull(jsonCloneSource, "jsonCloneSource can 
not be null");
+      this.jsonCloneSource = jsonCloneSource;
+    }
+
+    @Override
+    public TableReference getCloneSource(DestinationT destination) {
+      String jsonCloneSource = this.jsonCloneSource.get();
+      checkArgument(jsonCloneSource != null, "jsonCloneSource can not be 
null");

Review Comment:
   The error message uses “can not”, which should be “cannot” in standard 
English (and Beam messages typically use “cannot”). Updating this improves 
consistency and professionalism of surfaced validation errors.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);

Review Comment:
   `JobReference` construction is duplicated in each `PendingJob` lambda. 
Consider extracting a small helper (e.g., `jobRef(jobId, jobProjectId, 
bqLocation)`) to reduce duplication and make future changes (adding fields, 
changing location logic) less error-prone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to