scwhittle commented on code in PR #33566:
URL: https://github.com/apache/beam/pull/33566#discussion_r1916233326


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/WatermarkCache.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
+
+import com.google.cloud.Timestamp;
+import javax.annotation.Nullable;
+
+public interface WatermarkCache {
+  @Nullable
+  Timestamp getUnfinishedMinWatermark();

Review Comment:
   improve the comment?  coudl base upon dao comment
    /**
      * Fetches the earliest partition watermark from the partition metadata 
table that is not in a
      * {@link State#FINISHED} state.
      *
      * @return the earliest partition watermark which is not in a {@link 
State#FINISHED} state.
      */
   
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
+
+import com.google.cloud.Timestamp;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import org.joda.time.Duration;
+
+public class AsyncWatermarkCache implements WatermarkCache {
+
+  private static final Object MIN_WATERMARK_KEY = new Object();
+  private final LoadingCache<Object, Optional<Timestamp>> cache;
+
+  public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
+    this.cache =
+        CacheBuilder.newBuilder()
+            
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
+            .build(
+                CacheLoader.asyncReloading(
+                    CacheLoader.from(key -> 
Optional.ofNullable(dao.getUnfinishedMinWatermark())),
+                    Executors.newSingleThreadExecutor()));

Review Comment:
   if you're using a dedicated executor might as well set a thread-name



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1594,6 +1595,8 @@ public abstract static class ReadChangeStream
     @Deprecated
     abstract @Nullable Double getTraceSampleProbability();
 
+    abstract @Nullable Duration getWatermarkRefreshRate();

Review Comment:
   Do we need to set a default value for this when returnign the builder (see 
where rpcpriority default is set). I thought autobuilder complained about unset 
fields but perhaps nullable ones default to null?  In either case it might be 
better to handle the defaults consistently for the different fields.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/WatermarkCache.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
+
+import com.google.cloud.Timestamp;
+import javax.annotation.Nullable;
+
+public interface WatermarkCache {

Review Comment:
   add @FunctionalInterface annotation?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java:
##########
@@ -151,7 +152,8 @@ public void testReadSpannerChangeStreamImpl(TestPipeline 
testPipeline, String ro
                     .withMetadataDatabase(ENV.getMetadataDatabaseId())
                     .withMetadataTable(metadataTableName)
                     .withInclusiveStartAt(startAt)
-                    .withInclusiveEndAt(endAt))
+                    .withInclusiveEndAt(endAt)
+                    .withWatermarkRefreshRate(Duration.standardSeconds(1)))

Review Comment:
   Seems this could be removed and just use the default once it is set.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import org.joda.time.Duration;
+
+public class CacheFactory implements Serializable {
+
+  private static final long serialVersionUID = -8722905670370252723L;
+  private static final Map<Long, WatermarkCache> WATERMARK_CACHE = new 
ConcurrentHashMap<>();
+  private static final AtomicLong CACHE_ID = new AtomicLong();
+  private final DaoFactory daoFactory;
+  private final @Nullable Duration refreshRate;
+
+  public CacheFactory(DaoFactory daoFactory, @Nullable Duration 
watermarkRefreshRate) {
+    this.daoFactory = daoFactory;
+    this.refreshRate = watermarkRefreshRate;
+  }
+
+  public static long generateCacheId() {

Review Comment:
   I wonder if this could be internal to the cache factory to simplify usage?
   
   
   It seems that 
   final long cacheId = CACHE_ID.getAndIncrement();
    coudl be member of the CacheFactory itself, so each factory has it's own 
id. Then the id is then serialized with it. And then when deserializing the 
factory (as part of serialized dofn) the multiple factories will have the same 
id and share the cache.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import org.joda.time.Duration;
+
+public class CacheFactory implements Serializable {

Review Comment:
   Can you add a unit test for this that the caching works as expected?
   
   For the test you could create an instance and serialize it.
   Then deserialize multiple times but verify that the cache is shared by the 
different deserialized instances.
   
   Could also have one where you have different configured instances, serialize 
them both and then check they don't share a cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to