acrites commented on code in PR #36456:
URL: https://github.com/apache/beam/pull/36456#discussion_r2417983905


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -528,14 +528,23 @@ public Void updateToFinished(String partitionToken) {
     }
 
     /**
-     * Update the partition watermark to the given timestamp.
+     * Update the partition watermark to the given timestamp if the given 
timestamp is larger than
+     * the existing value.
      *
      * @param partitionToken the partition unique identifier
      * @param watermark the new partition watermark
      * @return the commit timestamp of the read / write transaction
      */
     public Void updateWatermark(String partitionToken, Timestamp watermark) {
-      
transaction.buffer(createUpdateMetadataWatermarkMutationFrom(partitionToken, 
watermark));
+      final Struct partition = getPartition(partitionToken);
+      if (partition == null) {
+        LOG.debug("Partiton {} cannot find.", partitionToken);
+        return null;
+      }
+      final Timestamp currentWatermark = 
partition.getTimestamp(COLUMN_WATERMARK);
+      if (watermark.compareTo(currentWatermark) > 0) {

Review Comment:
   Where does this compare to run (since it's not part of the transaction)? 
Does it happen atomically along with the read and write? Or are we getting a 
snapshot, comparing it on the user worker and then writing the value? I'm 
asking because we need to make sure that if two different processes are trying 
to update the watermark for the same partition they won't race with each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to