Abacn commented on code in PR #27755:
URL: https://github.com/apache/beam/pull/27755#discussion_r1283331908
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.java:
##########
@@ -100,6 +101,7 @@ public PartitionReconciler(MetadataTableDao
metadataTableDao, ChangeStreamMetric
* @param missingPartitions partitions not being streamed.
*/
public void addMissingPartitions(List<ByteStringRange> missingPartitions) {
+ hasAddedMissingPartitions = true;
Review Comment:
I see this controls getPartitionsToReconcile's behavior. Currently once set
to true it never turns back. Should this flag be reset after
partitionsToReconcile are handled correctly somewhere?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java:
##########
@@ -325,26 +328,26 @@ public ProcessContinuation run(
Optional<Instant> maybeWatermark =
getNewWatermark(streamPartitionsWithWatermark, newPartitions);
maybeWatermark.ifPresent(metadataTableDao::updateDetectNewPartitionWatermark);
- // Using NewPartitions and StreamPartitions, evaluate partitions that
are possibly not being
- // streamed. This isn't perfect because there may be partitions moving
between
- // StreamPartitions and NewPartitions while scanning the metadata table.
Also, this does not
- // include NewPartitions marked as deleted from a previous DNP run not
yet processed by RCSP.
- List<ByteStringRange> existingPartitions =
- streamPartitionsWithWatermark.stream()
- .map(StreamPartitionWithWatermark::getPartition)
- .collect(Collectors.toList());
- existingPartitions.addAll(outputtedNewPartitions);
- List<ByteStringRange> missingStreamPartitions =
- getMissingPartitionsFromEntireKeySpace(existingPartitions);
- orphanedMetadataCleaner.addMissingPartitions(missingStreamPartitions);
- partitionReconciler.addMissingPartitions(missingStreamPartitions);
- }
-
- // Only start reconciling after the pipeline has been running for a while.
- if (tracker.currentRestriction().getFrom() > 50) {
- processReconcilerPartitions(
- receiver, watermarkEstimator, initialPipelineState.getStartTime());
- cleanUpOrphanedMetadata();
+ // Only start reconciling after the pipeline has been running for a
while.
+ if (tracker.currentRestriction().getFrom() > 50) {
Review Comment:
This assumes restriction incremented to 50 means "pipeline has been running
for a while". iirc there is no guaranteed correspondence to the restriction
(long number) to real time In general. What is the purpose here, and is it
tested how robust this condition would be?
CC: @johnjcasey for SDF expertise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]