dmvk commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r412714540
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -170,44 +201,48 @@ public void checkpointCompleted(long checkpointId) throws
Exception {
}
}
- private void addToBeAcknowledgedCheckpoint(long checkpointId, String
internalId)
- throws Exception {
+ private void addToBeAcknowledgedCheckpoint(long checkpointId, int
internalId) throws Exception {
notYetAcknowledgedSnapshots.addAll(
- Collections.singletonList(new CheckpointElement(internalId,
checkpointId)));
+ Collections.singletonList(new CheckpointIdentifier(internalId,
checkpointId)));
}
- private List<CheckpointElement> removeToBeAcknowledgedCheckpoints(long
checkpointId)
+ private List<CheckpointIdentifier> gatherToBeAcknowledgedCheckpoints(long
checkpointId)
throws Exception {
- List<CheckpointElement> toBeAcknowledged = new ArrayList<>();
- List<CheckpointElement> checkpoints = new ArrayList<>();
- for (CheckpointElement element : notYetAcknowledgedSnapshots.get()) {
+ List<CheckpointIdentifier> toBeAcknowledged = new ArrayList<>();
+ List<CheckpointIdentifier> remaining = new ArrayList<>();
+ for (CheckpointIdentifier element : notYetAcknowledgedSnapshots.get()) {
if (element.checkpointId <= checkpointId) {
toBeAcknowledged.add(element);
} else {
- checkpoints.add(element);
+ remaining.add(element);
}
}
- notYetAcknowledgedSnapshots.update(checkpoints);
+ notYetAcknowledgedSnapshots.update(remaining);
// Sort by checkpoint id to preserve order
toBeAcknowledged.sort(Comparator.comparingLong(o -> o.checkpointId));
return toBeAcknowledged;
}
- private static String generateNewId() {
- return UUID.randomUUID().toString();
+ private int rotateAndGetStateIndex() {
+ currentStateIndex = (currentStateIndex + 1) % numCheckpointBuffers;
+ return currentStateIndex;
+ }
+
+ private int getStateIndex() {
+ return currentStateIndex;
}
/** Constructs a new instance of BufferingElementsHandler with a provided
state namespace. */
private interface BufferingElementsHandlerFactory {
- BufferingElementsHandler get(String stateId) throws Exception;
+ BufferingElementsHandler get(int stateIndex) throws Exception;
}
- private static class CheckpointElement {
+ static class CheckpointIdentifier {
Review comment:
nit: `@VisibleForTesting`
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws
Exception {
// We are about to get checkpointed. The elements buffered thus far
// have to be added to the global CheckpointElement state which will
// be used to emit elements later when this checkpoint is acknowledged.
- addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
- currentStateId = generateNewId();
- currentBufferingElementsHandler =
bufferingElementsHandlerFactory.get(currentStateId);
+ addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+ int newStateIndex = rotateAndGetStateIndex();
+ currentBufferingElementsHandler =
bufferingElementsHandlerFactory.get(newStateIndex);
}
/** Should be called when a checkpoint is completed. */
public void checkpointCompleted(long checkpointId) throws Exception {
Review comment:
just to check if I understand this correctly:
- We are guaranteed that checkpoints complete in sequential order.
- We might get gaps in case of failed / expired checkpoints, so any
completed checkpoint must complete all prior checkpoints.
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link BufferingDoFnRunner}.
+ *
+ * <p>For more tests see:
+ *
+ * <p>- {@link org.apache.beam.runners.flink.FlinkRequiresStableInputTest}
+ *
+ * <p>-{@link
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest}
+ *
+ * <p>- {@link BufferedElementsTest}
+ */
+public class BufferingDoFnRunnerTest {
Review comment:
👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]