scwhittle commented on code in PR #32474:
URL: https://github.com/apache/beam/pull/32474#discussion_r1878156969


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -62,6 +63,7 @@ public class QueryChangeStreamAction {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(QueryChangeStreamAction.class);
   private static final Duration BUNDLE_FINALIZER_TIMEOUT = 
Duration.standardMinutes(5);
+  private static final Duration RESTRICTION_TRACKER_TIMEOUT = 
Duration.standardSeconds(40);

Review Comment:
   Can you comment where this timeout comes from?  Ie what is the rpc deadline 
we are trying to avoid hitting that can currently cause an exception?  If the 
deadline is O(minutes) we might want to increase this to give the dofn the 
ability to catch up more if it is behind.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import java.util.function.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** An interrupter for restriction tracker of type T. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   can you remove this and add Nullable annotations where needed?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import java.util.function.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** An interrupter for restriction tracker of type T. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class RestrictionInterrupter<T> {
+  private T lastAttemptedPosition;
+
+  private Supplier<Instant> timeSupplier;
+  private Instant softDeadline;

Review Comment:
   final



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+public class RestrictionInterrupterTest {
+
+  @Test
+  public void testTryInterrupt() {
+    RestrictionInterrupter<Integer> interrupter =
+        new RestrictionInterrupter<Integer>(
+            () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(10));
+    assertFalse(interrupter.tryInterrupt(1));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(15));
+    assertFalse(interrupter.tryInterrupt(2));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30));
+    assertFalse(interrupter.tryInterrupt(3));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40));
+    assertFalse(interrupter.tryInterrupt(3));

Review Comment:
   add a comment to help future test readers
   
   // Though the deadline has passed same position as previously accepted is 
not interrupted.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+public class RestrictionInterrupterTest {
+
+  @Test
+  public void testTryInterrupt() {
+    RestrictionInterrupter<Integer> interrupter =
+        new RestrictionInterrupter<Integer>(
+            () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(10));
+    assertFalse(interrupter.tryInterrupt(1));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(15));
+    assertFalse(interrupter.tryInterrupt(2));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30));
+    assertFalse(interrupter.tryInterrupt(3));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40));
+    assertFalse(interrupter.tryInterrupt(3));
+    assertTrue(interrupter.tryInterrupt(4));
+    assertTrue(interrupter.tryInterrupt(5));
+    interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50));

Review Comment:
   could verify that even if the clock is non-monotonic that once true is 
returned it always returns true



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import java.util.function.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** An interrupter for restriction tracker of type T. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class RestrictionInterrupter<T> {
+  private T lastAttemptedPosition;
+
+  private Supplier<Instant> timeSupplier;
+  private Instant softDeadline;
+  private boolean hasInterrupted = true;
+
+  /**
+   * Sets a soft timeout from now for processing new positions. After the 
timeout the tryInterrupt
+   * will start returning true indicating an early exit from processing.
+   */
+  public static <T> RestrictionInterrupter<T> withSoftTimeout(Duration 
timeout) {
+    return new RestrictionInterrupter<T>(() -> Instant.now(), timeout);
+  }
+
+  RestrictionInterrupter(Supplier<Instant> timeSupplier, Duration timeout) {
+    this.timeSupplier = timeSupplier;
+    this.softDeadline = this.timeSupplier.get().plus(timeout);
+    hasInterrupted = false;
+  }
+
+  @VisibleForTesting
+  void setTimeSupplier(Supplier<Instant> timeSupplier) {
+    this.timeSupplier = timeSupplier;
+  }
+
+  /**
+   * Returns true if the restriction tracker should be interrupted in claiming 
new positions.
+   *
+   * <ol>
+   *   <li>If soft deadline hasn't been reached always returns false.
+   *   <li>If soft deadline has been reached but we haven't processed any 
positions returns false.
+   *   <li>If soft deadline has been reached but the new position is the same 
as the last attempted
+   *       position returns false.
+   *   <li>If soft deadline has been reached and the new position differs from 
the last attempted
+   *       position returns true.
+   * </ol>
+   *
+   * @return {@code true} if the position processing should continue, {@code 
false} if the soft
+   *     deadline has been reached and we have fully processed the previous 
position.
+   */
+  public boolean tryInterrupt(T position) {
+    if (hasInterrupted) {
+      return true;
+    }
+    if (lastAttemptedPosition == null) {
+      lastAttemptedPosition = position;
+      return false;
+    }
+
+    hasInterrupted |=

Review Comment:
   nit: seems clearer flow to check position first separately
   
   if (position.equals(lastAttemptedPosition)) return false;
   lastAttemptedPosition = position;
   
   hasInterrupted |= timeSupplier.get().isAfter(softDeadline)
   return hasInterrupted;
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java:
##########
@@ -105,6 +106,7 @@ public Optional<ProcessContinuation> run(
       PartitionMetadata partition,
       ChildPartitionsRecord record,
       RestrictionTracker<TimestampRange, Timestamp> tracker,
+      RestrictionInterrupter<Timestamp> interrupter,

Review Comment:
   add to comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to