tonytanger commented on code in PR #27755:
URL: https://github.com/apache/beam/pull/27755#discussion_r1283378423
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java:
##########
@@ -325,26 +328,26 @@ public ProcessContinuation run(
Optional<Instant> maybeWatermark =
getNewWatermark(streamPartitionsWithWatermark, newPartitions);
maybeWatermark.ifPresent(metadataTableDao::updateDetectNewPartitionWatermark);
- // Using NewPartitions and StreamPartitions, evaluate partitions that
are possibly not being
- // streamed. This isn't perfect because there may be partitions moving
between
- // StreamPartitions and NewPartitions while scanning the metadata table.
Also, this does not
- // include NewPartitions marked as deleted from a previous DNP run not
yet processed by RCSP.
- List<ByteStringRange> existingPartitions =
- streamPartitionsWithWatermark.stream()
- .map(StreamPartitionWithWatermark::getPartition)
- .collect(Collectors.toList());
- existingPartitions.addAll(outputtedNewPartitions);
- List<ByteStringRange> missingStreamPartitions =
- getMissingPartitionsFromEntireKeySpace(existingPartitions);
- orphanedMetadataCleaner.addMissingPartitions(missingStreamPartitions);
- partitionReconciler.addMissingPartitions(missingStreamPartitions);
- }
-
- // Only start reconciling after the pipeline has been running for a while.
- if (tracker.currentRestriction().getFrom() > 50) {
- processReconcilerPartitions(
- receiver, watermarkEstimator, initialPipelineState.getStartTime());
- cleanUpOrphanedMetadata();
+ // Only start reconciling after the pipeline has been running for a
while.
+ if (tracker.currentRestriction().getFrom() > 50) {
Review Comment:
InitialRestriction is set to 0 and it increments once per processElement.
The purpose is to allow the partitions outputted by DetectNewPartitions to be
processed by the next step ReadChangeStreamPartition. Often times there could
be thousands to hundreds of thousands of partitions outputted. We just want to
allow some time for them to start being processed by ReadChangeStreamPartition.
This reduces the noise that reconciler can create during the initial starting
period of a pipeline. This is more of a precaution than a strict requirement.
We haven't tested precisely in terms what 50 means in terms of real time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]