dedocibula commented on code in PR #35042:
URL: https://github.com/apache/beam/pull/35042#discussion_r2111482619


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -178,47 +179,57 @@ public List<String> findAllTableIndexes() {
    *
    * @return the earliest partition watermark which is not in a {@link 
State#FINISHED} state.
    */
-  public @Nullable Timestamp getUnfinishedMinWatermark() {
+  public @Nullable Timestamp getUnfinishedMinWatermark(Optional<Timestamp> 
since) {

Review Comment:
   Can we make this input timestamp since definitive (non-optional)? Have the 
caller give the exact timestamp? I think it would be preferable to have 
timestamp filtering come from a single place



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java:
##########
@@ -42,13 +42,23 @@ public class AsyncWatermarkCache implements WatermarkCache {
   private static final Object MIN_WATERMARK_KEY = new Object();
   private final LoadingCache<Object, Optional<Timestamp>> cache;
 
+  private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
+
   public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
     this.cache =

Review Comment:
   Can we add a comment here how the the cache loading works? Namely let's 
cover:
   - what happens when there are no partition data in the metadata table
   - what happens if the call fails - deadline exceeded or spanner exception
   - what happens in the happy case
   
   Let's also call out that all the reload operations on this key are 
serialized due to use of the single threaded asyncReloadingExecutor



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java:
##########
@@ -460,19 +460,27 @@ private void mockGetPartitionsAfter(Timestamp timestamp, 
ResultSet getPartitionR
   }
 
   private void mockGetWatermark(Timestamp watermark) {
+    final String minWatermark = "min_watermark";

Review Comment:
   Shall we add more interesting test cases? Throwing exceptions? Returning 
null? Returning older value?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java:
##########
@@ -42,13 +42,23 @@ public class AsyncWatermarkCache implements WatermarkCache {
   private static final Object MIN_WATERMARK_KEY = new Object();
   private final LoadingCache<Object, Optional<Timestamp>> cache;
 
+  private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
+
   public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
     this.cache =
         CacheBuilder.newBuilder()
             
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
             .build(
                 CacheLoader.asyncReloading(
-                    CacheLoader.from(key -> 
Optional.ofNullable(dao.getUnfinishedMinWatermark())),
+                    CacheLoader.from(
+                        key -> {
+                          Timestamp unfinishedMinTimes =

Review Comment:
   Shall we wrap this value in nullable here since we have to do this anyway?
   
   Also, we should only assign unfinishedMinTimes to lastCachedMinWatermark 
only if it is after it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to