ahmedabu98 commented on code in PR #30023:
URL: https://github.com/apache/beam/pull/30023#discussion_r1461881181
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java:
##########
@@ -95,237 +83,376 @@ public WriteRename(
CreateDisposition createDisposition,
int maxRetryJobs,
@Nullable String kmsKey,
- @Nullable ValueProvider<String> loadJobProjectId) {
+ @Nullable ValueProvider<String> loadJobProjectId,
+ PCollectionView<String> copyJobIdPrefixView) {
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.firstPaneWriteDisposition = writeDisposition;
this.firstPaneCreateDisposition = createDisposition;
this.maxRetryJobs = maxRetryJobs;
this.kmsKey = kmsKey;
this.loadJobProjectId = loadJobProjectId;
+ this.copyJobIdPrefixView = copyJobIdPrefixView;
}
- @StartBundle
- public void startBundle(StartBundleContext c) {
- pendingJobs.clear();
+ @Override
+ public PCollection<TableDestination> expand(
+ PCollection<Iterable<KV<TableDestination, Result>>> input) {
+ return input
+ .apply(
+ "WriteRename",
+ ParDo.of(
+ new WriteRenameFn(
+ bqServices,
+ jobIdToken,
+ firstPaneWriteDisposition,
+ firstPaneCreateDisposition,
+ maxRetryJobs,
+ kmsKey,
+ loadJobProjectId))
+ .withSideInputs(copyJobIdPrefixView))
+ // We apply a fusion break here to ensure that the temp file renaming
won't attempt to
+ // rename a temp file is then deleted in the TempTableCleanupFn
Review Comment:
```suggestion
// We apply a fusion break here to ensure that on retries, the temp
table renaming won't attempt to
// rename a temp table that was previously deleted in
TempTableCleanupFn
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java:
##########
@@ -95,237 +83,376 @@ public WriteRename(
CreateDisposition createDisposition,
int maxRetryJobs,
@Nullable String kmsKey,
- @Nullable ValueProvider<String> loadJobProjectId) {
+ @Nullable ValueProvider<String> loadJobProjectId,
+ PCollectionView<String> copyJobIdPrefixView) {
this.bqServices = bqServices;
this.jobIdToken = jobIdToken;
this.firstPaneWriteDisposition = writeDisposition;
this.firstPaneCreateDisposition = createDisposition;
this.maxRetryJobs = maxRetryJobs;
this.kmsKey = kmsKey;
this.loadJobProjectId = loadJobProjectId;
+ this.copyJobIdPrefixView = copyJobIdPrefixView;
}
- @StartBundle
- public void startBundle(StartBundleContext c) {
- pendingJobs.clear();
+ @Override
+ public PCollection<TableDestination> expand(
+ PCollection<Iterable<KV<TableDestination, Result>>> input) {
+ return input
+ .apply(
+ "WriteRename",
+ ParDo.of(
+ new WriteRenameFn(
+ bqServices,
+ jobIdToken,
+ firstPaneWriteDisposition,
+ firstPaneCreateDisposition,
+ maxRetryJobs,
+ kmsKey,
+ loadJobProjectId))
+ .withSideInputs(copyJobIdPrefixView))
+ // We apply a fusion break here to ensure that the temp file renaming
won't attempt to
+ // rename a temp file is then deleted in the TempTableCleanupFn
+ .apply(Reshuffle.viaRandomKey())
+ .apply("RemoveTempTables", ParDo.of(new
TempTableCleanupFn(bqServices)))
+ .setCoder(TableDestinationCoder.of());
}
- @Teardown
- public void onTeardown() {
- try {
- if (datasetService != null) {
- datasetService.close();
- datasetService = null;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
+ public static class PendingJobData implements Serializable {
+
+ final BigQueryHelpers.PendingJob retryJob;
+ final TableDestination tableDestination;
+ final List<String> tempTables;
+ final BoundedWindow window;
+
+ public PendingJobData(
+ BigQueryHelpers.PendingJob retryJob,
+ TableDestination tableDestination,
+ List<String> tempTables,
+ BoundedWindow window) {
+ this.retryJob = retryJob;
+ this.tableDestination = tableDestination;
+ this.tempTables = tempTables;
+ this.window = window;
}
}
- @ProcessElement
- public void processElement(
- @Element Iterable<KV<TableDestination, WriteTables.Result>> element,
- ProcessContext c,
- BoundedWindow window)
- throws Exception {
- Multimap<TableDestination, WriteTables.Result> tempTables =
ArrayListMultimap.create();
- for (KV<TableDestination, WriteTables.Result> entry : element) {
- tempTables.put(entry.getKey(), entry.getValue());
+ public static class WriteRenameFn
+ extends DoFn<
+ Iterable<KV<TableDestination, WriteTables.Result>>,
KV<TableDestination, List<String>>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(WriteRenameFn.class);
+
+ private final BigQueryServices bqServices;
+ private final PCollectionView<String> jobIdToken;
+
+ // In the triggered scenario, the user-supplied create and write
dispositions only apply to the
+ // first trigger pane, as that's when when the table is created.
Subsequent loads should always
+ // append to the table, and so use CREATE_NEVER and WRITE_APPEND
dispositions respectively.
+ private final WriteDisposition firstPaneWriteDisposition;
+ private final CreateDisposition firstPaneCreateDisposition;
+ private final int maxRetryJobs;
+ private final @Nullable String kmsKey;
+ private final @Nullable ValueProvider<String> loadJobProjectId;
+ private transient @Nullable DatasetService datasetService;
+
+ // All pending copy jobs.
+ private List<PendingJobData> pendingJobs = Lists.newArrayList();
+
+ public WriteRenameFn(
+ BigQueryServices bqServices,
+ PCollectionView<String> jobIdToken,
+ WriteDisposition writeDisposition,
+ CreateDisposition createDisposition,
+ int maxRetryJobs,
+ @Nullable String kmsKey,
+ @Nullable ValueProvider<String> loadJobProjectId) {
+ this.bqServices = bqServices;
+ this.jobIdToken = jobIdToken;
+ this.firstPaneWriteDisposition = writeDisposition;
+ this.firstPaneCreateDisposition = createDisposition;
+ this.maxRetryJobs = maxRetryJobs;
+ this.kmsKey = kmsKey;
+ this.loadJobProjectId = loadJobProjectId;
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext c) {
+ pendingJobs.clear();
}
- for (Map.Entry<TableDestination, Collection<WriteTables.Result>> entry :
- tempTables.asMap().entrySet()) {
- // Process each destination table.
- // Do not copy if no temp tables are provided.
- if (!entry.getValue().isEmpty()) {
- pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c,
window));
+
+ @Teardown
+ public void onTeardown() {
+ try {
+ if (datasetService != null) {
+ datasetService.close();
+ datasetService = null;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element Iterable<KV<TableDestination, WriteTables.Result>> element,
+ ProcessContext c,
+ BoundedWindow window)
+ throws Exception {
+ Multimap<TableDestination, WriteTables.Result> tempTables =
ArrayListMultimap.create();
+ for (KV<TableDestination, WriteTables.Result> entry : element) {
+ tempTables.put(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<TableDestination, Collection<WriteTables.Result>> entry :
+ tempTables.asMap().entrySet()) {
+ // Process each destination table.
+ // Do not copy if no temp tables are provided.
+ if (!entry.getValue().isEmpty()) {
+ pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(),
c, window));
+ }
}
}
- }
- @FinishBundle
- public void finishBundle(FinishBundleContext c) throws Exception {
- DatasetService datasetService =
- getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
- PendingJobManager jobManager = new PendingJobManager();
- for (PendingJobData pendingJob : pendingJobs) {
- jobManager.addPendingJob(
- pendingJob.retryJob,
- j -> {
- try {
- if (pendingJob.tableDestination.getTableDescription() != null) {
- TableReference ref =
pendingJob.tableDestination.getTableReference();
- datasetService.patchTableDescription(
- ref.clone()
-
.setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
- pendingJob.tableDestination.getTableDescription());
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) throws Exception {
+ DatasetService datasetService =
+ getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+ PendingJobManager jobManager = new PendingJobManager();
+ for (PendingJobData pendingJob : pendingJobs) {
+ jobManager.addPendingJob(
+ pendingJob.retryJob,
+ j -> {
+ try {
+ if (pendingJob.tableDestination.getTableDescription() != null)
{
+ TableReference ref =
pendingJob.tableDestination.getTableReference();
+ datasetService.patchTableDescription(
+ ref.clone()
+
.setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
+ pendingJob.tableDestination.getTableDescription());
+ }
+ c.output(
+ KV.of(pendingJob.tableDestination, pendingJob.tempTables),
+ pendingJob.window.maxTimestamp(),
+ pendingJob.window);
+ return null;
+ } catch (IOException | InterruptedException e) {
+ return e;
}
- c.output(
- pendingJob.tableDestination,
pendingJob.window.maxTimestamp(), pendingJob.window);
- removeTemporaryTables(datasetService, pendingJob.tempTables);
- return null;
- } catch (IOException | InterruptedException e) {
- return e;
- }
- });
+ });
+ }
+ jobManager.waitForDone();
}
- jobManager.waitForDone();
- }
- private DatasetService getDatasetService(PipelineOptions pipelineOptions)
throws IOException {
- if (datasetService == null) {
- datasetService =
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+ private DatasetService getDatasetService(PipelineOptions pipelineOptions)
throws IOException {
+ if (datasetService == null) {
+ datasetService =
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+ }
+ return datasetService;
}
- return datasetService;
- }
- private PendingJobData startWriteRename(
- TableDestination finalTableDestination,
- Iterable<WriteTables.Result> tempTableNames,
- ProcessContext c,
- BoundedWindow window)
- throws Exception {
- // The pane may have advanced either here due to triggering or due to an
upstream trigger. We
- // check the upstream
- // trigger to handle the case where an earlier pane triggered the
single-partition path. If this
- // happened, then the
- // table will already exist so we want to append to the table.
- WriteTables.@Nullable Result firstTempTable =
Iterables.getFirst(tempTableNames, null);
- boolean isFirstPane =
- firstTempTable != null && firstTempTable.isFirstPane() &&
c.pane().isFirst();
- WriteDisposition writeDisposition =
- isFirstPane ? firstPaneWriteDisposition :
WriteDisposition.WRITE_APPEND;
- CreateDisposition createDisposition =
- isFirstPane ? firstPaneCreateDisposition :
CreateDisposition.CREATE_NEVER;
- List<TableReference> tempTables =
- StreamSupport.stream(tempTableNames.spliterator(), false)
- .map(
- result ->
- BigQueryHelpers.fromJsonString(result.getTableName(),
TableReference.class))
- .collect(Collectors.toList());
-
- // Make sure each destination table gets a unique job id.
- String jobIdPrefix =
- BigQueryResourceNaming.createJobIdWithDestination(
- c.sideInput(jobIdToken), finalTableDestination, -1,
c.pane().getIndex());
-
- BigQueryHelpers.PendingJob retryJob =
- startCopy(
-
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-
getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
- jobIdPrefix,
- finalTableDestination.getTableReference(),
- tempTables,
- writeDisposition,
- createDisposition,
- kmsKey,
- loadJobProjectId);
- return new PendingJobData(retryJob, finalTableDestination, tempTables,
window);
- }
+ private PendingJobData startWriteRename(
+ TableDestination finalTableDestination,
+ Iterable<WriteTables.Result> tempTableNames,
+ ProcessContext c,
+ BoundedWindow window)
+ throws Exception {
+ // The pane may have advanced either here due to triggering or due to an
upstream trigger. We
+ // check the upstream
+ // trigger to handle the case where an earlier pane triggered the
single-partition path. If
+ // this
+ // happened, then the
+ // table will already exist so we want to append to the table.
+ WriteTables.@Nullable Result firstTempTable =
Iterables.getFirst(tempTableNames, null);
+ boolean isFirstPane =
+ firstTempTable != null && firstTempTable.isFirstPane() &&
c.pane().isFirst();
+ WriteDisposition writeDisposition =
+ isFirstPane ? firstPaneWriteDisposition :
WriteDisposition.WRITE_APPEND;
+ CreateDisposition createDisposition =
+ isFirstPane ? firstPaneCreateDisposition :
CreateDisposition.CREATE_NEVER;
+ List<TableReference> tempTables =
+ StreamSupport.stream(tempTableNames.spliterator(), false)
+ .map(
+ result ->
+ BigQueryHelpers.fromJsonString(result.getTableName(),
TableReference.class))
+ .collect(Collectors.toList());
- private BigQueryHelpers.PendingJob startCopy(
- JobService jobService,
- DatasetService datasetService,
- String jobIdPrefix,
- TableReference ref,
- List<TableReference> tempTables,
- WriteDisposition writeDisposition,
- CreateDisposition createDisposition,
- @Nullable String kmsKey,
- @Nullable ValueProvider<String> loadJobProjectId) {
- JobConfigurationTableCopy copyConfig =
- new JobConfigurationTableCopy()
- .setSourceTables(tempTables)
- .setDestinationTable(ref)
- .setWriteDisposition(writeDisposition.name())
- .setCreateDisposition(createDisposition.name());
- if (kmsKey != null) {
- copyConfig.setDestinationEncryptionConfiguration(
- new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ List<String> tempTableStrings =
+ StreamSupport.stream(tempTableNames.spliterator(), false)
+ .map(Result::getTableName)
+ .collect(Collectors.toList());
+ ;
Review Comment:
Why do we convert from TableReference to strings here then back to
TableReference in `TempTableCleanupFn`? Might be worth adding a comment here to
clarify
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]