tonytanger commented on code in PR #25459:
URL: https://github.com/apache/beam/pull/25459#discussion_r1110203693


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java:
##########
@@ -100,6 +106,79 @@ public Optional<DoFn.ProcessContinuation> run(
       DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
       boolean shouldDebug) {
+    if (record instanceof Heartbeat) {
+      Heartbeat heartbeat = (Heartbeat) record;
+      StreamProgress streamProgress =
+          new StreamProgress(
+              heartbeat.getChangeStreamContinuationToken(), 
heartbeat.getLowWatermark());
+      final Instant watermark = 
TimestampConverter.toInstant(heartbeat.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+
+      if (shouldDebug) {
+        LOG.info(
+            "RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
+            formatByteStringRange(partitionRecord.getPartition()),
+            
formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
+            heartbeat.getChangeStreamContinuationToken().getToken(),
+            heartbeat.getLowWatermark());
+      }
+      // If the tracker fail to claim the streamProgress, it most likely means 
the runner initiated
+      // a checkpoint. See {@link
+      // 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
+      // for more information regarding runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim heart beat tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      metrics.incHeartbeatCount();
+    } else if (record instanceof ChangeStreamMutation) {
+      ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation) 
record;
+      final Instant watermark =
+          TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+      watermarkEstimator.setWatermark(watermark);
+      // Build a new StreamProgress with the continuation token to be claimed.
+      ChangeStreamContinuationToken changeStreamContinuationToken =
+          new ChangeStreamContinuationToken(
+              Range.ByteStringRange.create(
+                  partitionRecord.getPartition().getStart(),
+                  partitionRecord.getPartition().getEnd()),
+              changeStreamMutation.getToken());
+      StreamProgress streamProgress =
+          new StreamProgress(changeStreamContinuationToken, 
changeStreamMutation.getLowWatermark());
+      // If the tracker fail to claim the streamProgress, it most likely means 
the runner initiated
+      // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more 
information regarding
+      // runner initiated checkpoints.
+      if (!tracker.tryClaim(streamProgress)) {
+        if (shouldDebug) {
+          LOG.info(
+              "RCSP {}: Failed to claim data change tracker",
+              formatByteStringRange(partitionRecord.getPartition()));
+        }
+        return Optional.of(DoFn.ProcessContinuation.stop());
+      }
+      if (changeStreamMutation.getType() == 
ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+        metrics.incChangeStreamMutationGcCounter();
+      } else if (changeStreamMutation.getType() == 
ChangeStreamMutation.MutationType.USER) {
+        metrics.incChangeStreamMutationUserCounter();
+      }
+      Instant delay = 
TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+      metrics.updateProcessingDelayFromCommitTimestamp(
+          Instant.now().getMillis() - delay.getMillis());
+
+      KV<ByteString, ChangeStreamMutation> outputRecord =
+          KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+      // We are outputting elements with timestamp of 0 to prevent reliance on 
event time. This
+      // limits the ability to window on commit time of any data changes. It 
is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);

Review Comment:
   We considered not advancing the watermark. I think not advancing the 
watermark gives the confusing signal that the pipeline is not progressing.
   
   `getCommitTimestamp` is the best measure of the timestamp of the record.
   
   In the future, we are considering adding an option so the user can choose to 
output the records with the `commitTimestamp`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to