pabloem commented on code in PR #25459:
URL: https://github.com/apache/beam/pull/25459#discussion_r1108893903
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java:
##########
@@ -100,6 +106,79 @@ public Optional<DoFn.ProcessContinuation> run(
DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
boolean shouldDebug) {
+ if (record instanceof Heartbeat) {
+ Heartbeat heartbeat = (Heartbeat) record;
+ StreamProgress streamProgress =
+ new StreamProgress(
+ heartbeat.getChangeStreamContinuationToken(),
heartbeat.getLowWatermark());
+ final Instant watermark =
TimestampConverter.toInstant(heartbeat.getLowWatermark());
+ watermarkEstimator.setWatermark(watermark);
+
+ if (shouldDebug) {
+ LOG.info(
+ "RCSP {}: Heartbeat partition: {} token: {} watermark: {}",
+ formatByteStringRange(partitionRecord.getPartition()),
+
formatByteStringRange(heartbeat.getChangeStreamContinuationToken().getPartition()),
+ heartbeat.getChangeStreamContinuationToken().getToken(),
+ heartbeat.getLowWatermark());
+ }
+ // If the tracker fail to claim the streamProgress, it most likely means
the runner initiated
+ // a checkpoint. See {@link
+ //
org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker}
+ // for more information regarding runner initiated checkpoints.
+ if (!tracker.tryClaim(streamProgress)) {
+ if (shouldDebug) {
+ LOG.info(
+ "RCSP {}: Failed to claim heart beat tracker",
+ formatByteStringRange(partitionRecord.getPartition()));
+ }
+ return Optional.of(DoFn.ProcessContinuation.stop());
+ }
+ metrics.incHeartbeatCount();
+ } else if (record instanceof ChangeStreamMutation) {
+ ChangeStreamMutation changeStreamMutation = (ChangeStreamMutation)
record;
+ final Instant watermark =
+ TimestampConverter.toInstant(changeStreamMutation.getLowWatermark());
+ watermarkEstimator.setWatermark(watermark);
+ // Build a new StreamProgress with the continuation token to be claimed.
+ ChangeStreamContinuationToken changeStreamContinuationToken =
+ new ChangeStreamContinuationToken(
+ Range.ByteStringRange.create(
+ partitionRecord.getPartition().getStart(),
+ partitionRecord.getPartition().getEnd()),
+ changeStreamMutation.getToken());
+ StreamProgress streamProgress =
+ new StreamProgress(changeStreamContinuationToken,
changeStreamMutation.getLowWatermark());
+ // If the tracker fail to claim the streamProgress, it most likely means
the runner initiated
+ // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more
information regarding
+ // runner initiated checkpoints.
+ if (!tracker.tryClaim(streamProgress)) {
+ if (shouldDebug) {
+ LOG.info(
+ "RCSP {}: Failed to claim data change tracker",
+ formatByteStringRange(partitionRecord.getPartition()));
+ }
+ return Optional.of(DoFn.ProcessContinuation.stop());
+ }
+ if (changeStreamMutation.getType() ==
ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
+ metrics.incChangeStreamMutationGcCounter();
+ } else if (changeStreamMutation.getType() ==
ChangeStreamMutation.MutationType.USER) {
+ metrics.incChangeStreamMutationUserCounter();
+ }
+ Instant delay =
TimestampConverter.toInstant(changeStreamMutation.getCommitTimestamp());
+ metrics.updateProcessingDelayFromCommitTimestamp(
+ Instant.now().getMillis() - delay.getMillis());
+
+ KV<ByteString, ChangeStreamMutation> outputRecord =
+ KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
+ // We are outputting elements with timestamp of 0 to prevent reliance on
event time. This
+ // limits the ability to window on commit time of any data changes. It
is still possible to
+ // window on processing time.
+ receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
Review Comment:
wouldnt the `getLowWatermark` parameter be a better measure of the timestamp
for the record? Currently the watermark is advancing, so users may still add
windowing.
I think rather than set the timestamp to EPOCH, I would say it's best to
keep the watermark stuck at EPOCH, but the records having proper timestamps
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]