gemini-code-assist[bot] commented on code in PR #38883:
URL: https://github.com/apache/beam/pull/38883#discussion_r3413854579
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,197 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference destinationWithDefaultProject = withDefaultProject(options,
destination);
+ TableReference source =
+ withDefaultProject(options, cloneSource,
destinationWithDefaultProject.getProjectId());
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destinationWithDefaultProject)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destinationWithDefaultProject.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService,
+ destinationWithDefaultProject.getProjectId(),
+ destinationWithDefaultProject.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source,
destinationWithDefaultProject);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ createJobReference(jobId.getJobId(), jobProjectId,
bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ createJobReference(jobId.getJobId(), jobProjectId,
bqLocation);
+ try {
+ return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ },
Review Comment:

The lambda expression for `pollJob` does not catch `IOException`. Since
`jobService.pollJob` throws `IOException` (a checked exception) and
`SerializableFunction.apply` does not declare any checked exceptions, this will
cause a compilation failure. Please catch `IOException` and wrap it in a
`RuntimeException`, similar to how it is handled in the `getJob` and
`startCopyJob` lambdas.
```java
jobId -> {
JobReference jobRef =
createJobReference(jobId.getJobId(), jobProjectId,
bqLocation);
try {
return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +227,40 @@ public String toString() {
}
}
+ /** Returns the same clone source for every table. */
+ static class ConstantCloneSourceDestinations<T, DestinationT>
+ extends DelegatingDynamicDestinations<T, DestinationT> {
+ private final ValueProvider<String> jsonCloneSource;
+ private transient @Nullable TableReference cloneSource;
Review Comment:

The `cloneSource` field is lazily initialized in `getCloneSource` but is not
marked as `volatile`. Since `DynamicDestinations` instances can be accessed
concurrently by multiple worker threads, a lack of volatile/synchronization can
lead to safe publication issues (e.g., another thread reading a partially
constructed `TableReference` object). Marking the field as `volatile` ensures
safe publication across threads.
```suggestion
private transient volatile @Nullable TableReference cloneSource;
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -442,16 +487,12 @@ private MatchTableDynamicDestinations(
throw new IllegalStateException("pipeline options cannot be
null");
}
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- if (tableReference.getProjectId() == null) {
- tableReference.setProjectId(
- bqOptions.getBigQueryProject() == null
- ? bqOptions.getProject()
- : bqOptions.getBigQueryProject());
- }
+ TableReference tableReferenceToFetch =
+ withDefaultProject(bqOptions, tableReference,
defaultProjectId);
Review Comment:

Use the package-private `CreateTableHelpers.withDefaultProject` helper
instead of the duplicated local version to improve maintainability.
```suggestion
TableReference tableReferenceToFetch =
CreateTableHelpers.withDefaultProject(bqOptions,
tableReference, defaultProjectId);
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,197 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference destinationWithDefaultProject = withDefaultProject(options,
destination);
+ TableReference source =
+ withDefaultProject(options, cloneSource,
destinationWithDefaultProject.getProjectId());
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destinationWithDefaultProject)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destinationWithDefaultProject.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService,
+ destinationWithDefaultProject.getProjectId(),
+ destinationWithDefaultProject.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source,
destinationWithDefaultProject);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ createJobReference(jobId.getJobId(), jobProjectId,
bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ createJobReference(jobId.getJobId(), jobProjectId,
bqLocation);
+ try {
+ return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ },
+ jobId -> {
+ JobReference jobRef =
+ createJobReference(jobId.getJobId(), jobProjectId,
bqLocation);
+ try {
+ return jobService.getJob(jobRef);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ },
+ BatchLoads.DEFAULT_MAX_RETRY_JOBS,
+ jobIdPrefix);
+ cloneJob.runJob();
+ new BigQueryHelpers.PendingJobManager()
+ .addPendingJob(
+ cloneJob,
+ job -> {
+ LOG.info("Created BigQuery clone table {} from {}", tableSpec,
source);
+ return null;
+ })
+ .waitForDone();
+ }
+ }
+
+ private static JobReference createJobReference(String jobId, String
projectId, String location) {
+ return new
JobReference().setProjectId(projectId).setJobId(jobId).setLocation(location);
+ }
+
+ private static TableReference withDefaultProject(
+ BigQueryOptions options, TableReference tableReference) {
+ return withDefaultProject(options, tableReference, null);
+ }
+
+ private static TableReference withDefaultProject(
+ BigQueryOptions options, TableReference tableReference, @Nullable String
defaultProjectId) {
Review Comment:

The `withDefaultProject` helper method is duplicated between
`CreateTableHelpers` and `DynamicDestinationsHelpers`. Since both classes
reside in the same package (`org.apache.beam.sdk.io.gcp.bigquery`), we can make
this method package-private (by removing `private`) and reuse it in
`DynamicDestinationsHelpers` to avoid code duplication.
```suggestion
static TableReference withDefaultProject(
BigQueryOptions options, TableReference tableReference, @Nullable
String defaultProjectId) {
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -463,6 +504,25 @@ private MatchTableDynamicDestinations(
return null;
}
+ private static TableReference withDefaultProject(
+ BigQueryOptions options, TableReference tableReference, @Nullable
String defaultProjectId) {
+ TableReference updated = tableReference.clone();
+ if (Strings.isNullOrEmpty(updated.getProjectId())) {
+ if (defaultProjectId != null && !defaultProjectId.isEmpty()) {
+ updated.setProjectId(defaultProjectId);
+ } else {
+ @Nullable String projectId = options.getBigQueryProject();
+ if (projectId == null || projectId.isEmpty()) {
+ projectId = options.getProject();
+ }
+ if (projectId != null && !projectId.isEmpty()) {
+ updated.setProjectId(projectId);
+ }
+ }
+ }
+ return updated;
+ }
Review Comment:

This `withDefaultProject` method is identical to the one in
`CreateTableHelpers`. Since both classes are in the same package, we can remove
this duplicated method entirely and call
`CreateTableHelpers.withDefaultProject` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]