Repository: incubator-beam
Updated Branches:
  refs/heads/master db47c63ab -> 3879db036


[BEAM-283] finalize CheckpointMarks upon completed checkpoint


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf14e809
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf14e809
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf14e809

Branch: refs/heads/master
Commit: cf14e809d4a790c407ab7c3cf1c90bb436a86dc9
Parents: c403675
Author: Maximilian Michels <[email protected]>
Authored: Fri Sep 16 17:04:22 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Sep 16 20:48:52 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    | 57 ++++++++++++++++++--
 .../streaming/UnboundedSourceWrapperTest.java   |  7 +++
 2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf14e809/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 7fdc816..64cf703 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -22,6 +22,8 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
@@ -38,6 +40,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -54,7 +57,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceWrapper<
     OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements Triggerable, StoppableFunction, Checkpointed<byte[]> {
+    implements Triggerable, StoppableFunction, Checkpointed<byte[]>, 
CheckpointListener {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
@@ -106,6 +109,15 @@ public class UnboundedSourceWrapper<
   private transient 
StreamSource.ManualWatermarkContext<WindowedValue<OutputT>> context;
 
   /**
+   * Pending checkpoints which have not been acknowledged yet.
+   */
+  private transient LinkedHashMap<Long, List<CheckpointMarkT>> 
pendingCheckpoints;
+  /**
+   * Keep a maximum of 32 checkpoints for {@code 
CheckpointMark.finalizeCheckpoint()}.
+   */
+  private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32;
+
+  /**
    * When restoring from a snapshot we put the restored sources/checkpoint 
marks here
    * and open in {@link #open(Configuration)}.
    */
@@ -159,6 +171,8 @@ public class UnboundedSourceWrapper<
     localSplitSources = new ArrayList<>();
     localReaders = new ArrayList<>();
 
+    pendingCheckpoints = new LinkedHashMap<>();
+
     if (restoredState != null) {
 
       // restore the splitSources from the checkpoint to ensure consistent 
ordering
@@ -324,7 +338,7 @@ public class UnboundedSourceWrapper<
   }
 
   @Override
-  public byte[] snapshotState(long l, long l1) throws Exception {
+  public byte[] snapshotState(long checkpointId, long checkpointTimestamp) 
throws Exception {
 
     if (checkpointCoder == null) {
       // no checkpoint coder available in this source
@@ -335,7 +349,8 @@ public class UnboundedSourceWrapper<
     // than we have a correct mapping of checkpoints to sources when
     // restoring
     List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, 
CheckpointMarkT>> checkpoints =
-        new ArrayList<>();
+        new ArrayList<>(localSplitSources.size());
+    List<CheckpointMarkT> checkpointMarks = new 
ArrayList<>(localSplitSources.size());
 
     for (int i = 0; i < localSplitSources.size(); i++) {
       UnboundedSource<OutputT, CheckpointMarkT> source = 
localSplitSources.get(i);
@@ -343,6 +358,7 @@ public class UnboundedSourceWrapper<
 
       @SuppressWarnings("unchecked")
       CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
+      checkpointMarks.add(mark);
       KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv =
           KV.of(source, mark);
       checkpoints.add(kv);
@@ -351,6 +367,18 @@ public class UnboundedSourceWrapper<
     try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
       checkpointCoder.encode(checkpoints, baos, Coder.Context.OUTER);
       return baos.toByteArray();
+    } finally {
+      // cleanup old pending checkpoints and add new checkpoint
+      int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS;
+      if (diff >= 0) {
+        for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
+             diff >= 0;
+             diff--) {
+          iterator.next();
+          iterator.remove();
+        }
+      }
+      pendingCheckpoints.put(checkpointId, checkpointMarks);
     }
   }
 
@@ -411,4 +439,27 @@ public class UnboundedSourceWrapper<
   public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> 
getLocalSplitSources() {
     return localSplitSources;
   }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    List<CheckpointMarkT> checkpointMarks = 
pendingCheckpoints.get(checkpointId);
+
+    if (checkpointMarks != null) {
+
+      // remove old checkpoints including the current one
+      Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
+      long currentId;
+      do {
+        currentId = iterator.next();
+        iterator.remove();
+      } while (currentId != checkpointId);
+
+      // confirm all marks
+      for (CheckpointMarkT mark : checkpointMarks) {
+        mark.finalizeCheckpoint();
+      }
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf14e809/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 0cc584e..e728653 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -206,6 +207,12 @@ public class UnboundedSourceWrapperTest {
     // draw a snapshot
     byte[] snapshot = flinkWrapper.snapshotState(0, 0);
 
+    // test that finalizeCheckpoint on CheckpointMark is called
+    final ArrayList<Integer> finalizeList = new ArrayList<>();
+    TestCountingSource.setFinalizeTracker(finalizeList);
+    flinkWrapper.notifyCheckpointComplete(0);
+    assertEquals(flinkWrapper.getLocalSplitSources().size(), 
finalizeList.size());
+
     // create a completely new source but restore from the snapshot
     TestCountingSource restoredSource = new TestCountingSource(numElements);
     UnboundedSourceWrapper<

Reply via email to