Copilot commented on code in PR #38883:
URL: https://github.com/apache/beam/pull/38883#discussion_r3392127956
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
Catching `InterruptedException` and wrapping it in `RuntimeException` clears
the thread’s interrupted status, which can cause subtle issues in
runners/executors. Prefer catching `InterruptedException` separately, calling
`Thread.currentThread().interrupt()`, and then either rethrowing (or wrapping)
so upstream cancellation behavior is preserved.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
Review Comment:
`cloneTable(...)` normalizes the clone source with
`withDefaultProject(...)`, but it does not normalize the `destination`. If the
destination `TableReference` ever lacks a project ID, `jobProjectId` and
`getDatasetLocation(...)` will be computed from null, leading to invalid job
refs / failures. Consider applying `withDefaultProject(...)` to `destination`
at the start of `cloneTable(...)` and using the normalized reference throughout
(job project id, dataset location, and copy destination).
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +226,34 @@ public String toString() {
}
}
+ /** Returns the same clone source for every table. */
+ static class ConstantCloneSourceDestinations<T, DestinationT>
+ extends DelegatingDynamicDestinations<T, DestinationT> {
+ private final ValueProvider<String> jsonCloneSource;
+
+ ConstantCloneSourceDestinations(
+ DynamicDestinations<T, DestinationT> inner, ValueProvider<String>
jsonCloneSource) {
+ super(inner);
+ Preconditions.checkArgumentNotNull(jsonCloneSource, "jsonCloneSource can
not be null");
Review Comment:
The error message uses “can not”, which should be “cannot” in standard
English (and Beam messages typically use “cannot”). Updating this improves
consistency and professionalism of surfaced validation errors.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
Review Comment:
`JobReference` construction is duplicated in each `PendingJob` lambda.
Consider extracting a small helper (e.g., `jobRef(jobId, jobProjectId,
bqLocation)`) to reduce duplication and make future changes (adding fields,
changing location logic) less error-prone.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +226,34 @@ public String toString() {
}
}
+ /** Returns the same clone source for every table. */
+ static class ConstantCloneSourceDestinations<T, DestinationT>
+ extends DelegatingDynamicDestinations<T, DestinationT> {
+ private final ValueProvider<String> jsonCloneSource;
+
+ ConstantCloneSourceDestinations(
+ DynamicDestinations<T, DestinationT> inner, ValueProvider<String>
jsonCloneSource) {
+ super(inner);
+ Preconditions.checkArgumentNotNull(jsonCloneSource, "jsonCloneSource can
not be null");
+ this.jsonCloneSource = jsonCloneSource;
+ }
+
+ @Override
+ public TableReference getCloneSource(DestinationT destination) {
+ String jsonCloneSource = this.jsonCloneSource.get();
+ checkArgument(jsonCloneSource != null, "jsonCloneSource can not be
null");
+ return BigQueryHelpers.fromJsonString(jsonCloneSource,
TableReference.class);
+ }
Review Comment:
`getCloneSource(...)` reparses the same JSON into a `TableReference` on
every call. Since this wrapper is explicitly “constant”, consider memoizing the
parsed `TableReference` after the first successful `ValueProvider.get()` (e.g.,
store it in a transient field) to avoid repeated JSON parsing in hot paths.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
Review Comment:
`cloneTable(...)` normalizes the clone source with
`withDefaultProject(...)`, but it does not normalize the `destination`. If the
destination `TableReference` ever lacks a project ID, `jobProjectId` and
`getDatasetLocation(...)` will be computed from null, leading to invalid job
refs / failures. Consider applying `withDefaultProject(...)` to `destination`
at the start of `cloneTable(...)` and using the normalized reference throughout
(job project id, dataset location, and copy destination).
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
Review Comment:
`JobReference` construction is duplicated in each `PendingJob` lambda.
Consider extracting a small helper (e.g., `jobRef(jobId, jobProjectId,
bqLocation)`) to reduce duplication and make future changes (adding fields,
changing location logic) less error-prone.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ return jobService.getJob(jobRef);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
Catching `InterruptedException` and wrapping it in `RuntimeException` clears
the thread’s interrupted status, which can cause subtle issues in
runners/executors. Prefer catching `InterruptedException` separately, calling
`Thread.currentThread().interrupt()`, and then either rethrowing (or wrapping)
so upstream cancellation behavior is preserved.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ jobService.startCopyJob(jobRef, copyConfig);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ },
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
+ try {
+ return jobService.pollJob(jobRef,
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
Catching `InterruptedException` and wrapping it in `RuntimeException` clears
the thread’s interrupted status, which can cause subtle issues in
runners/executors. Prefer catching `InterruptedException` separately, calling
`Thread.currentThread().interrupt()`, and then either rethrowing (or wrapping)
so upstream cancellation behavior is preserved.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java:
##########
@@ -221,6 +226,34 @@ public String toString() {
}
}
+ /** Returns the same clone source for every table. */
+ static class ConstantCloneSourceDestinations<T, DestinationT>
+ extends DelegatingDynamicDestinations<T, DestinationT> {
+ private final ValueProvider<String> jsonCloneSource;
+
+ ConstantCloneSourceDestinations(
+ DynamicDestinations<T, DestinationT> inner, ValueProvider<String>
jsonCloneSource) {
+ super(inner);
+ Preconditions.checkArgumentNotNull(jsonCloneSource, "jsonCloneSource can
not be null");
+ this.jsonCloneSource = jsonCloneSource;
+ }
+
+ @Override
+ public TableReference getCloneSource(DestinationT destination) {
+ String jsonCloneSource = this.jsonCloneSource.get();
+ checkArgument(jsonCloneSource != null, "jsonCloneSource can not be
null");
Review Comment:
The error message uses “can not”, which should be “cannot” in standard
English (and Beam messages typically use “cannot”). Updating this improves
consistency and professionalism of surfaced validation errors.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java:
##########
@@ -162,67 +176,179 @@ private static void tryCreateTable(
if (datasetService.getTable(
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
- TableSchema tableSchema = schemaSupplier.get();
- @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
- Preconditions.checkArgumentNotNull(
- tableSchema,
- "Unless create disposition is %s, a schema must be specified, i.e.
"
- + "DynamicDestinations.getSchema() may not return null. "
- + "However, create disposition is %s, and "
- + " %s returned null for destination %s",
- CreateDisposition.CREATE_NEVER,
- createDisposition,
- tableDestination);
- Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
-
- if (tableConstraints != null) {
- table = table.setTableConstraints(tableConstraints);
- }
+ @Nullable TableReference cloneSource = cloneSourceSupplier.get();
+ if (cloneSource != null) {
+ cloneTable(
+ options, datasetService, bqServices, cloneSource,
tableReference, kmsKey, tableSpec);
+ } else {
+ TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
+ Preconditions.checkArgumentNotNull(
+ tableSchema,
+ "Unless create disposition is %s, a schema or clone source must
be specified, i.e. "
+ + "DynamicDestinations.getSchema() and
DynamicDestinations.getCloneSource() may "
+ + "not both return null. However, create disposition is %s,
and "
+ + "no schema or clone source was returned for destination
%s",
+ CreateDisposition.CREATE_NEVER,
+ createDisposition,
+ tableDestination);
+ Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
- String tableDescription = tableDestination.getTableDescription();
- if (tableDescription != null) {
- table = table.setDescription(tableDescription);
- }
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
- TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
- if (timePartitioning != null) {
- table.setTimePartitioning(timePartitioning);
- }
+ String tableDescription = tableDestination.getTableDescription();
+ if (tableDescription != null) {
+ table = table.setDescription(tableDescription);
+ }
- Clustering clustering = tableDestination.getClustering();
- if (clustering != null) {
- table.setClustering(clustering);
- }
+ TimePartitioning timePartitioning =
tableDestination.getTimePartitioning();
+ if (timePartitioning != null) {
+ table.setTimePartitioning(timePartitioning);
+ }
- if (kmsKey != null) {
- table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
- if (bigLakeConfiguration != null) {
- TableReference ref = table.getTableReference();
- table.setBiglakeConfiguration(
- new BigLakeConfiguration()
- .setTableFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
- .setFileFormat(
-
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
- .setConnectionId(
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
- .setStorageUri(
- String.format(
- "%s/%s/%s/%s",
-
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
- ref.getProjectId(),
- ref.getDatasetId(),
- ref.getTableId())));
+ Clustering clustering = tableDestination.getClustering();
+ if (clustering != null) {
+ table.setClustering(clustering);
+ }
+
+ if (kmsKey != null) {
+ table.setEncryptionConfiguration(new
EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+ if (bigLakeConfiguration != null) {
+ TableReference ref = table.getTableReference();
+ table.setBiglakeConfiguration(
+ new BigLakeConfiguration()
+ .setTableFormat(
+ MoreObjects.firstNonNull(
+ bigLakeConfiguration.get("tableFormat"),
"iceberg"))
+ .setFileFormat(
+
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
+ .setConnectionId(
+
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
+ .setStorageUri(
+ String.format(
+ "%s/%s/%s/%s",
+ Preconditions.checkArgumentNotNull(
+ bigLakeConfiguration.get(STORAGE_URI)),
+ ref.getProjectId(),
+ ref.getDatasetId(),
+ ref.getTableId())));
+ }
+ datasetService.createTable(table);
}
- datasetService.createTable(table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdTables.add(tableSpec);
}
+ private static void cloneTable(
+ BigQueryOptions options,
+ DatasetService datasetService,
+ BigQueryServices bqServices,
+ TableReference cloneSource,
+ TableReference destination,
+ @Nullable String kmsKey,
+ String tableSpec)
+ throws Exception {
+ TableReference source = withDefaultProject(options, cloneSource);
+ JobConfigurationTableCopy copyConfig =
+ new JobConfigurationTableCopy()
+ .setSourceTables(Collections.singletonList(source))
+ .setDestinationTable(destination)
+ .setOperationType("CLONE")
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name());
+ if (kmsKey != null) {
+ copyConfig.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
+
+ String jobProjectId = destination.getProjectId();
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, destination.getProjectId(),
destination.getDatasetId());
+ String jobIdPrefix = createCloneJobIdPrefix(options, source, destination);
+ try (JobService jobService = bqServices.getJobService(options)) {
+ BigQueryHelpers.PendingJob cloneJob =
+ new BigQueryHelpers.PendingJob(
+ jobId -> {
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(jobProjectId)
+ .setJobId(jobId.getJobId())
+ .setLocation(bqLocation);
Review Comment:
`JobReference` construction is duplicated in each `PendingJob` lambda.
Consider extracting a small helper (e.g., `jobRef(jobId, jobProjectId,
bqLocation)`) to reduce duplication and make future changes (adding fields,
changing location logic) less error-prone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]