gemini-code-assist[bot] commented on code in PR #38883:
URL: https://github.com/apache/beam/pull/38883#discussion_r3392128824


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -501,6 +541,15 @@ public TableDestination getTable(DestinationT destination) 
{
       if (existingTable == null
           || existingTable.getSchema() == null
           || existingTable.getSchema().isEmpty()) {
+        @Nullable TableReference cloneSource = 
super.getCloneSource(destination);
+        if (cloneSource != null) {
+          @Nullable Table cloneSourceTable = getBigQueryTable(cloneSource);
+          if (cloneSourceTable != null
+              && cloneSourceTable.getSchema() != null
+              && !cloneSourceTable.getSchema().isEmpty()) {
+            return cloneSourceTable.getSchema();
+          }
+        }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Similar to the above, if the clone source table reference does not specify a 
project ID, calling `getBigQueryTable(cloneSource)` can fail. We should default 
the project ID to the destination table's project ID to ensure it is fully 
qualified.
   
   ```java
           @Nullable TableReference cloneSource = 
super.getCloneSource(destination);
           if (cloneSource != null) {
             TableReference cloneSourceWithProject = cloneSource.clone();
             if (cloneSourceWithProject.getProjectId() == null || 
cloneSourceWithProject.getProjectId().isEmpty()) {
               
cloneSourceWithProject.setProjectId(super.getTable(destination).getTableReference().getProjectId());
             }
             @Nullable Table cloneSourceTable = 
getBigQueryTable(cloneSourceWithProject);
             if (cloneSourceTable != null
                 && cloneSourceTable.getSchema() != null
                 && !cloneSourceTable.getSchema().isEmpty()) {
               return cloneSourceTable.getSchema();
             }
           }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -478,15 +511,22 @@ private static boolean nextBackOff(Sleeper sleeper, 
BackOff backoff)
     public TableDestination getTable(DestinationT destination) {
       TableDestination wrappedDestination = super.getTable(destination);
       Table existingTable = 
getBigQueryTable(wrappedDestination.getTableReference());
+      Table tableToMatch = existingTable;
+      if (tableToMatch == null) {
+        @Nullable TableReference cloneSource = 
super.getCloneSource(destination);
+        if (cloneSource != null) {
+          tableToMatch = getBigQueryTable(cloneSource);
+        }
+      }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If the clone source table reference does not specify a project ID (e.g., if 
it was parsed from a table spec like `dataset.table`), calling 
`getBigQueryTable(cloneSource)` directly can fail with a `NullPointerException` 
or API error because the project ID is missing. We should default the project 
ID to the destination table's project ID to ensure it is fully qualified before 
querying.
   
   ```java
         if (tableToMatch == null) {
           @Nullable TableReference cloneSource = 
super.getCloneSource(destination);
           if (cloneSource != null) {
             TableReference cloneSourceWithProject = cloneSource.clone();
             if (cloneSourceWithProject.getProjectId() == null || 
cloneSourceWithProject.getProjectId().isEmpty()) {
               
cloneSourceWithProject.setProjectId(wrappedDestination.getTableReference().getProjectId());
             }
             tableToMatch = getBigQueryTable(cloneSourceWithProject);
           }
         }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3771,6 +3809,16 @@ public WriteResult expand(PCollection<T> input) {
                       .collect(Collectors.toList())),
           "No more than one of jsonSchema, schemaFromView, or 
dynamicDestinations may be set");
 
+      List<?> allTableCreationMetadataArgs =
+          Lists.newArrayList(getJsonSchema(), getSchemaFromView(), 
getJsonCloneSourceTableRef());
+      checkArgument(
+          2
+              > Iterables.size(
+                  allTableCreationMetadataArgs.stream()
+                      .filter(Predicates.notNull()::apply)
+                      .collect(Collectors.toList())),
+          "No more than one of jsonSchema, schemaFromView, or cloneSource may 
be set");

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This check can be simplified and made more readable by using Java 8 Streams 
directly, avoiding the creation of unnecessary Guava lists and iterators.
   
   ```java
         long activeCreationMetadataCount = 
java.util.stream.Stream.of(getJsonSchema(), getSchemaFromView(), 
getJsonCloneSourceTableRef())
             .filter(x -> x != null)
             .count();
         checkArgument(
             activeCreationMetadataCount <= 1,
             "No more than one of jsonSchema, schemaFromView, or cloneSource 
may be set");
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   When catching `InterruptedException`, it is a standard Java best practice to 
restore the interrupted status of the thread by calling 
`Thread.currentThread().interrupt()`. This ensures that higher-level call 
stacks or the runner are aware of the cancellation/interruption.
   
   ```suggestion
                   try {
                     jobService.startCopyJob(jobRef, copyConfig);
                   } catch (IOException e) {
                     throw new RuntimeException(e);
                   } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw new RuntimeException(e);
                   }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  return jobService.getJob(jobRef);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Restore the interrupted status of the thread when catching 
`InterruptedException` to ensure the thread's interrupted state is preserved.
   
   ```java
                   try {
                     return jobService.getJob(jobRef);
                   } catch (IOException e) {
                     throw new RuntimeException(e);
                   } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw new RuntimeException(e);
                   }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
       if (datasetService.getTable(
               tableReference, Collections.emptyList(), 
DatasetService.TableMetadataView.BASIC)
           == null) {
-        TableSchema tableSchema = schemaSupplier.get();
-        @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
-        Preconditions.checkArgumentNotNull(
-            tableSchema,
-            "Unless create disposition is %s, a schema must be specified, i.e. 
"
-                + "DynamicDestinations.getSchema() may not return null. "
-                + "However, create disposition is %s, and "
-                + " %s returned null for destination %s",
-            CreateDisposition.CREATE_NEVER,
-            createDisposition,
-            tableDestination);
-        Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
-
-        if (tableConstraints != null) {
-          table = table.setTableConstraints(tableConstraints);
-        }
+        @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+        if (cloneSource != null) {
+          cloneTable(
+              options, datasetService, bqServices, cloneSource, 
tableReference, kmsKey, tableSpec);
+        } else {
+          TableSchema tableSchema = schemaSupplier.get();
+          @Nullable TableConstraints tableConstraints = 
tableConstraintsSupplier.get();
+          Preconditions.checkArgumentNotNull(
+              tableSchema,
+              "Unless create disposition is %s, a schema or clone source must 
be specified, i.e. "
+                  + "DynamicDestinations.getSchema() and 
DynamicDestinations.getCloneSource() may "
+                  + "not both return null. However, create disposition is %s, 
and "
+                  + "no schema or clone source was returned for destination 
%s",
+              CreateDisposition.CREATE_NEVER,
+              createDisposition,
+              tableDestination);
+          Table table = new 
Table().setTableReference(tableReference).setSchema(tableSchema);
 
-        String tableDescription = tableDestination.getTableDescription();
-        if (tableDescription != null) {
-          table = table.setDescription(tableDescription);
-        }
+          if (tableConstraints != null) {
+            table = table.setTableConstraints(tableConstraints);
+          }
 
-        TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
-        if (timePartitioning != null) {
-          table.setTimePartitioning(timePartitioning);
-        }
+          String tableDescription = tableDestination.getTableDescription();
+          if (tableDescription != null) {
+            table = table.setDescription(tableDescription);
+          }
 
-        Clustering clustering = tableDestination.getClustering();
-        if (clustering != null) {
-          table.setClustering(clustering);
-        }
+          TimePartitioning timePartitioning = 
tableDestination.getTimePartitioning();
+          if (timePartitioning != null) {
+            table.setTimePartitioning(timePartitioning);
+          }
 
-        if (kmsKey != null) {
-          table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
-        }
-        if (bigLakeConfiguration != null) {
-          TableReference ref = table.getTableReference();
-          table.setBiglakeConfiguration(
-              new BigLakeConfiguration()
-                  .setTableFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
-                  .setFileFormat(
-                      
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
-                  .setConnectionId(
-                      
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
-                  .setStorageUri(
-                      String.format(
-                          "%s/%s/%s/%s",
-                          
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
-                          ref.getProjectId(),
-                          ref.getDatasetId(),
-                          ref.getTableId())));
+          Clustering clustering = tableDestination.getClustering();
+          if (clustering != null) {
+            table.setClustering(clustering);
+          }
+
+          if (kmsKey != null) {
+            table.setEncryptionConfiguration(new 
EncryptionConfiguration().setKmsKeyName(kmsKey));
+          }
+          if (bigLakeConfiguration != null) {
+            TableReference ref = table.getTableReference();
+            table.setBiglakeConfiguration(
+                new BigLakeConfiguration()
+                    .setTableFormat(
+                        MoreObjects.firstNonNull(
+                            bigLakeConfiguration.get("tableFormat"), 
"iceberg"))
+                    .setFileFormat(
+                        
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+                    .setConnectionId(
+                        
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+                    .setStorageUri(
+                        String.format(
+                            "%s/%s/%s/%s",
+                            Preconditions.checkArgumentNotNull(
+                                bigLakeConfiguration.get(STORAGE_URI)),
+                            ref.getProjectId(),
+                            ref.getDatasetId(),
+                            ref.getTableId())));
+          }
+          datasetService.createTable(table);
         }
-        datasetService.createTable(table);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     createdTables.add(tableSpec);
   }
 
+  private static void cloneTable(
+      BigQueryOptions options,
+      DatasetService datasetService,
+      BigQueryServices bqServices,
+      TableReference cloneSource,
+      TableReference destination,
+      @Nullable String kmsKey,
+      String tableSpec)
+      throws Exception {
+    TableReference source = withDefaultProject(options, cloneSource);
+    JobConfigurationTableCopy copyConfig =
+        new JobConfigurationTableCopy()
+            .setSourceTables(Collections.singletonList(source))
+            .setDestinationTable(destination)
+            .setOperationType("CLONE")
+            .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+    if (kmsKey != null) {
+      copyConfig.setDestinationEncryptionConfiguration(
+          new EncryptionConfiguration().setKmsKeyName(kmsKey));
+    }
+
+    String jobProjectId = destination.getProjectId();
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, destination.getProjectId(), 
destination.getDatasetId());
+    String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+    try (JobService jobService = bqServices.getJobService(options)) {
+      BigQueryHelpers.PendingJob cloneJob =
+          new BigQueryHelpers.PendingJob(
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  jobService.startCopyJob(jobRef, copyConfig);
+                } catch (IOException | InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                return null;
+              },
+              jobId -> {
+                JobReference jobRef =
+                    new JobReference()
+                        .setProjectId(jobProjectId)
+                        .setJobId(jobId.getJobId())
+                        .setLocation(bqLocation);
+                try {
+                  return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Restore the interrupted status of the thread when catching 
`InterruptedException` to ensure the thread's interrupted state is preserved.
   
   ```java
                   try {
                     return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
                   } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw new RuntimeException(e);
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to