dmvk commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r412714540



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -170,44 +201,48 @@ public void checkpointCompleted(long checkpointId) throws 
Exception {
     }
   }
 
-  private void addToBeAcknowledgedCheckpoint(long checkpointId, String 
internalId)
-      throws Exception {
+  private void addToBeAcknowledgedCheckpoint(long checkpointId, int 
internalId) throws Exception {
     notYetAcknowledgedSnapshots.addAll(
-        Collections.singletonList(new CheckpointElement(internalId, 
checkpointId)));
+        Collections.singletonList(new CheckpointIdentifier(internalId, 
checkpointId)));
   }
 
-  private List<CheckpointElement> removeToBeAcknowledgedCheckpoints(long 
checkpointId)
+  private List<CheckpointIdentifier> gatherToBeAcknowledgedCheckpoints(long 
checkpointId)
       throws Exception {
-    List<CheckpointElement> toBeAcknowledged = new ArrayList<>();
-    List<CheckpointElement> checkpoints = new ArrayList<>();
-    for (CheckpointElement element : notYetAcknowledgedSnapshots.get()) {
+    List<CheckpointIdentifier> toBeAcknowledged = new ArrayList<>();
+    List<CheckpointIdentifier> remaining = new ArrayList<>();
+    for (CheckpointIdentifier element : notYetAcknowledgedSnapshots.get()) {
       if (element.checkpointId <= checkpointId) {
         toBeAcknowledged.add(element);
       } else {
-        checkpoints.add(element);
+        remaining.add(element);
       }
     }
-    notYetAcknowledgedSnapshots.update(checkpoints);
+    notYetAcknowledgedSnapshots.update(remaining);
     // Sort by checkpoint id to preserve order
     toBeAcknowledged.sort(Comparator.comparingLong(o -> o.checkpointId));
     return toBeAcknowledged;
   }
 
-  private static String generateNewId() {
-    return UUID.randomUUID().toString();
+  private int rotateAndGetStateIndex() {
+    currentStateIndex = (currentStateIndex + 1) % numCheckpointBuffers;
+    return currentStateIndex;
+  }
+
+  private int getStateIndex() {
+    return currentStateIndex;
   }
 
   /** Constructs a new instance of BufferingElementsHandler with a provided 
state namespace. */
   private interface BufferingElementsHandlerFactory {
-    BufferingElementsHandler get(String stateId) throws Exception;
+    BufferingElementsHandler get(int stateIndex) throws Exception;
   }
 
-  private static class CheckpointElement {
+  static class CheckpointIdentifier {

Review comment:
       nit: `@VisibleForTesting`

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws 
Exception {
     // We are about to get checkpointed. The elements buffered thus far
     // have to be added to the global CheckpointElement state which will
     // be used to emit elements later when this checkpoint is acknowledged.
-    addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
-    currentStateId = generateNewId();
-    currentBufferingElementsHandler = 
bufferingElementsHandlerFactory.get(currentStateId);
+    addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+    int newStateIndex = rotateAndGetStateIndex();
+    currentBufferingElementsHandler = 
bufferingElementsHandlerFactory.get(newStateIndex);
   }
 
   /** Should be called when a checkpoint is completed. */
   public void checkpointCompleted(long checkpointId) throws Exception {

Review comment:
       just to check if I understand this correctly:
    
   - We are guaranteed that checkpoints complete in sequential order.
   - We might get gaps in case of failed / expired checkpoints, so any 
completed checkpoint must complete all prior checkpoints.

##########
File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link BufferingDoFnRunner}.
+ *
+ * <p>For more tests see:
+ *
+ * <p>- {@link org.apache.beam.runners.flink.FlinkRequiresStableInputTest}
+ *
+ * <p>-{@link 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest}
+ *
+ * <p>- {@link BufferedElementsTest}
+ */
+public class BufferingDoFnRunnerTest {

Review comment:
       👍




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to