gemini-code-assist[bot] commented on code in PR #38883:
URL: https://github.com/apache/beam/pull/38883#discussion_r3392128824
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -501,6 +541,15 @@ public TableDestination getTable(DestinationT destination)
{
if (existingTable == null
|| existingTable.getSchema() == null
|| existingTable.getSchema().isEmpty()) {
+ @Nullable TableReference cloneSource =
super.getCloneSource(destination);
+ if (cloneSource != null) {
+ @Nullable Table cloneSourceTable = getBigQueryTable(cloneSource);
+ if (cloneSourceTable != null
+ && cloneSourceTable.getSchema() != null
+ && !cloneSourceTable.getSchema().isEmpty()) {
+ return cloneSourceTable.getSchema();
+ }
+ }
Review Comment:

Similar to the above, if the clone source table reference does not specify a
project ID, calling `getBigQueryTable(cloneSource)` can fail. We should default
the project ID to the destination table's project ID to ensure it is fully
qualified.
```java
@Nullable TableReference cloneSource =
super.getCloneSource(destination);
if (cloneSource != null) {
TableReference cloneSourceWithProject = cloneSource.clone();
if (cloneSourceWithProject.getProjectId() == null ||
cloneSourceWithProject.getProjectId().isEmpty()) {
cloneSourceWithProject.setProjectId(super.getTable(destination).getTableReference().getProjectId());
}
@Nullable Table cloneSourceTable =
getBigQueryTable(cloneSourceWithProject);
if (cloneSourceTable != null
&& cloneSourceTable.getSchema() != null
&& !cloneSourceTable.getSchema().isEmpty()) {
return cloneSourceTable.getSchema();
}
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -478,15 +511,22 @@ private static boolean nextBackOff(Sleeper sleeper,
BackOff backoff)
public TableDestination getTable(DestinationT destination) {
TableDestination wrappedDestination = super.getTable(destination);
Table existingTable =
getBigQueryTable(wrappedDestination.getTableReference());
+ Table tableToMatch = existingTable;
+ if (tableToMatch == null) {
+ @Nullable TableReference cloneSource =
super.getCloneSource(destination);
+ if (cloneSource != null) {
+ tableToMatch = getBigQueryTable(cloneSource);
+ }
+ }
Review Comment:

If the clone source table reference does not specify a project ID (e.g., if
it was parsed from a table spec like `dataset.table`), calling
`getBigQueryTable(cloneSource)` directly can fail with a `NullPointerException`
or API error because the project ID is missing. We should default the project
ID to the destination table's project ID to ensure it is fully qualified before
querying.
```java
if (tableToMatch == null) {
@Nullable TableReference cloneSource =
super.getCloneSource(destination);
if (cloneSource != null) {
TableReference cloneSourceWithProject = cloneSource.clone();
if (cloneSourceWithProject.getProjectId() == null ||
cloneSourceWithProject.getProjectId().isEmpty()) {
cloneSourceWithProject.setProjectId(wrappedDestination.getTableReference().getProjectId());
}
tableToMatch = getBigQueryTable(cloneSourceWithProject);
}
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3771,6 +3809,16 @@ public WriteResult expand(PCollection<T> input) {
.collect(Collectors.toList())),
"No more than one of jsonSchema, schemaFromView, or
dynamicDestinations may be set");
+ List<?> allTableCreationMetadataArgs =
+ Lists.newArrayList(getJsonSchema(), getSchemaFromView(),
getJsonCloneSourceTableRef());
+ checkArgument(
+ 2
+ > Iterables.size(
+ allTableCreationMetadataArgs.stream()
+ .filter(Predicates.notNull()::apply)
+ .collect(Collectors.toList())),
+ "No more than one of jsonSchema, schemaFromView, or cloneSource may
be set");
Review Comment:

This check can be simplified and made more readable by using Java 8 Streams
directly, avoiding the creation of unnecessary Guava lists and iterators.
```java
long activeCreationMetadataCount =
java.util.stream.Stream.of(getJsonSchema(), getSchemaFromView(),
getJsonCloneSourceTableRef())
.filter(x -> x != null)
.count();
checkArgument(
activeCreationMetadataCount <= 1,
"No more than one of jsonSchema, schemaFromView, or cloneSource
may be set");
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:

When catching `InterruptedException`, it is a standard Java best practice to
restore the interrupted status of the thread by calling
`Thread.currentThread().interrupt()`. This ensures that higher-level call
stacks or the runner are aware of the cancellation/interruption.
```suggestion
try {
jobService.startCopyJob(jobRef, copyConfig);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ return jobService.getJob(jobRef);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:

Restore the interrupted status of the thread when catching
`InterruptedException` to ensure the thread's interrupted state is preserved.
```java
try {
return jobService.getJob(jobRef);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:

Restore the interrupted status of the thread when catching
`InterruptedException` to ensure the thread's interrupted state is preserved.
```java
try {
return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]