stankiewicz commented on code in PR #37535:
URL: https://github.com/apache/beam/pull/37535#discussion_r3190216345
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1024,25 +1062,208 @@ public String apply(BigQueryDynamicReadDescriptor
input) {
.apply("Checkpoint", Redistribute.byKey());
PCollectionTuple resultTuple =
- addJobId
- .apply("Create streams", ParDo.of(new
CreateBoundedSourceForTable()))
+ addJobId.apply(
+ "Create streams",
+ ParDo.of(new CreateBoundedSourceForTable(cleanupInfoTag))
+ .withOutputTags(streamTag, TupleTagList.of(cleanupInfoTag)));
+
+ PCollection<KV<String, BigQueryStorageStreamSource<T>>> streams =
+ resultTuple
+ .get(streamTag)
.setCoder(
- SerializableCoder.of(new
TypeDescriptor<BigQueryStorageStreamSource<T>>() {}))
- .apply("Redistribute", Redistribute.arbitrarily())
- .apply(
- "Read Streams with storage read api",
- ParDo.of(
- new TypedRead.ReadTableSource<T>(
- rowTag, getParseFn(), getBadRecordRouter()))
- .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG)));
+ KvCoder.of(
+ StringUtf8Coder.of(),
+ SerializableCoder.of(
+ new TypeDescriptor<BigQueryStorageStreamSource<T>>()
{})))
+ .apply("Redistribute", Redistribute.arbitrarily());
+
+ PCollectionTuple readResultTuple =
+ streams.apply(
+ "Read Streams with storage read api",
+ ParDo.of(
+ new ReadDynamicStreamSource<T>(
+ rowTag, getParseFn(), getBadRecordRouter(),
cleanupInfoTag))
+ .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG).and(cleanupInfoTag)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages1 =
+ resultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages2 =
+ readResultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollectionList.of(cleanupMessages1)
+ .and(cleanupMessages2)
+ .apply(Flatten.pCollections())
+ .apply("CleanupTempTables", ParDo.of(new
CleanupTempTableDoFn(getBigQueryServices())));
+
getBadRecordErrorHandler()
.addErrorCollection(
-
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
- return resultTuple.get(rowTag).setCoder(getOutputCoder());
+ readResultTuple
+ .get(BAD_RECORD_TAG)
+ .setCoder(BadRecord.getCoder(input.getPipeline())));
+ return readResultTuple.get(rowTag).setCoder(getOutputCoder());
}
}
/** Implementation of {@link BigQueryIO#read()}. */
+ static class CleanupInfo implements Serializable {
+ private final String projectId;
+ private final String datasetId;
+ private final String tableId;
+ private final boolean datasetCreatedByBeam;
+ private final int totalStreams;
+
+ public CleanupInfo(TableReference tableRef, boolean datasetCreatedByBeam,
int totalStreams) {
+ if (tableRef != null) {
+ this.projectId = tableRef.getProjectId();
+ this.datasetId = tableRef.getDatasetId();
+ this.tableId = tableRef.getTableId();
+ } else {
+ this.projectId = null;
+ this.datasetId = null;
+ this.tableId = null;
+ }
+ this.datasetCreatedByBeam = datasetCreatedByBeam;
+ this.totalStreams = totalStreams;
+ }
+
+ public TableReference getTableReference() {
+ if (projectId == null || datasetId == null || tableId == null) {
+ return null;
+ }
+ return new TableReference()
+ .setProjectId(projectId)
+ .setDatasetId(datasetId)
+ .setTableId(tableId);
+ }
+
+ public boolean isDatasetCreatedByBeam() {
+ return datasetCreatedByBeam;
+ }
+
+ public int getTotalStreams() {
+ return totalStreams;
+ }
+ }
+
+ static class CleanupOperationMessage implements Serializable {
+ private final @Nullable CleanupInfo cleanupInfo;
+ private final boolean isStreamCompletion;
+
+ private CleanupOperationMessage(@Nullable CleanupInfo cleanupInfo, boolean
isStreamCompletion) {
+ this.cleanupInfo = cleanupInfo;
+ this.isStreamCompletion = isStreamCompletion;
+ }
+
+ public static CleanupOperationMessage streamComplete() {
+ return new CleanupOperationMessage(null, true);
+ }
+
+ public static CleanupOperationMessage initialize(CleanupInfo cleanupInfo) {
+ return new CleanupOperationMessage(cleanupInfo, false);
+ }
+
+ public @Nullable CleanupInfo getCleanupInfo() {
+ return cleanupInfo;
+ }
+
+ public boolean isStreamCompletion() {
+ return isStreamCompletion;
+ }
+ }
+
+ static class CleanupTempTableDoFn extends DoFn<KV<String,
CleanupOperationMessage>, Void> {
+ private final BigQueryServices bqServices;
+ private static final Logger LOG =
LoggerFactory.getLogger(CleanupTempTableDoFn.class);
+
+ @StateId("cleanupInfo")
+ private final StateSpec<ValueState<CleanupInfo>> cleanupInfoSpec =
StateSpecs.value();
+
+ @StateId("completedStreams")
+ private final StateSpec<ValueState<Integer>> completedStreamsSpec =
StateSpecs.value();
+
+ CleanupTempTableDoFn(BigQueryServices bqServices) {
+ this.bqServices = bqServices;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<String, CleanupOperationMessage> element,
+ @StateId("cleanupInfo") ValueState<CleanupInfo> cleanupInfoState,
Review Comment:
I have test suite running, will do several comparison tests, with various
frequencies and table sizes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]