Abacn commented on code in PR #27755:
URL: https://github.com/apache/beam/pull/27755#discussion_r1283331908


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/reconciler/PartitionReconciler.java:
##########
@@ -100,6 +101,7 @@ public PartitionReconciler(MetadataTableDao 
metadataTableDao, ChangeStreamMetric
    * @param missingPartitions partitions not being streamed.
    */
   public void addMissingPartitions(List<ByteStringRange> missingPartitions) {
+    hasAddedMissingPartitions = true;

Review Comment:
   I see this controls getPartitionsToReconcile's behavior. Currently once set 
to true it never turns back. Should this flag be reset after 
partitionsToReconcile are handled correctly somewhere?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java:
##########
@@ -325,26 +328,26 @@ public ProcessContinuation run(
       Optional<Instant> maybeWatermark =
           getNewWatermark(streamPartitionsWithWatermark, newPartitions);
       
maybeWatermark.ifPresent(metadataTableDao::updateDetectNewPartitionWatermark);
-      // Using NewPartitions and StreamPartitions, evaluate partitions that 
are possibly not being
-      // streamed. This isn't perfect because there may be partitions moving 
between
-      // StreamPartitions and NewPartitions while scanning the metadata table. 
Also, this does not
-      // include NewPartitions marked as deleted from a previous DNP run not 
yet processed by RCSP.
-      List<ByteStringRange> existingPartitions =
-          streamPartitionsWithWatermark.stream()
-              .map(StreamPartitionWithWatermark::getPartition)
-              .collect(Collectors.toList());
-      existingPartitions.addAll(outputtedNewPartitions);
-      List<ByteStringRange> missingStreamPartitions =
-          getMissingPartitionsFromEntireKeySpace(existingPartitions);
-      orphanedMetadataCleaner.addMissingPartitions(missingStreamPartitions);
-      partitionReconciler.addMissingPartitions(missingStreamPartitions);
-    }
-
-    // Only start reconciling after the pipeline has been running for a while.
-    if (tracker.currentRestriction().getFrom() > 50) {
-      processReconcilerPartitions(
-          receiver, watermarkEstimator, initialPipelineState.getStartTime());
-      cleanUpOrphanedMetadata();
+      // Only start reconciling after the pipeline has been running for a 
while.
+      if (tracker.currentRestriction().getFrom() > 50) {

Review Comment:
   This assumes restriction incremented to 50 means "pipeline has been running 
for a while". iirc there is no guaranteed correspondence to the restriction 
(long number) to real time In general. What is the purpose here, and is it 
tested how robust this condition would be?
   
   CC: @johnjcasey for SDF expertise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to