Repository: incubator-beam Updated Branches: refs/heads/master db47c63ab -> 3879db036
[BEAM-283] finalize CheckpointMarks upon completed checkpoint Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf14e809 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf14e809 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf14e809 Branch: refs/heads/master Commit: cf14e809d4a790c407ab7c3cf1c90bb436a86dc9 Parents: c403675 Author: Maximilian Michels <[email protected]> Authored: Fri Sep 16 17:04:22 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Sep 16 20:48:52 2016 +0200 ---------------------------------------------------------------------- .../streaming/io/UnboundedSourceWrapper.java | 57 ++++++++++++++++++-- .../streaming/UnboundedSourceWrapperTest.java | 7 +++ 2 files changed, 61 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf14e809/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 7fdc816..64cf703 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -22,6 +22,8 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import java.io.ByteArrayInputStream; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.coders.Coder; @@ -38,6 +40,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; @@ -54,7 +57,7 @@ import org.slf4j.LoggerFactory; public class UnboundedSourceWrapper< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction<WindowedValue<OutputT>> - implements Triggerable, StoppableFunction, Checkpointed<byte[]> { + implements Triggerable, StoppableFunction, Checkpointed<byte[]>, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); @@ -106,6 +109,15 @@ public class UnboundedSourceWrapper< private transient StreamSource.ManualWatermarkContext<WindowedValue<OutputT>> context; /** + * Pending checkpoints which have not been acknowledged yet. + */ + private transient LinkedHashMap<Long, List<CheckpointMarkT>> pendingCheckpoints; + /** + * Keep a maximum of 32 checkpoints for {@code CheckpointMark.finalizeCheckpoint()}. + */ + private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32; + + /** * When restoring from a snapshot we put the restored sources/checkpoint marks here * and open in {@link #open(Configuration)}. */ @@ -159,6 +171,8 @@ public class UnboundedSourceWrapper< localSplitSources = new ArrayList<>(); localReaders = new ArrayList<>(); + pendingCheckpoints = new LinkedHashMap<>(); + if (restoredState != null) { // restore the splitSources from the checkpoint to ensure consistent ordering @@ -324,7 +338,7 @@ public class UnboundedSourceWrapper< } @Override - public byte[] snapshotState(long l, long l1) throws Exception { + public byte[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { if (checkpointCoder == null) { // no checkpoint coder available in this source @@ -335,7 +349,8 @@ public class UnboundedSourceWrapper< // than we have a correct mapping of checkpoints to sources when // restoring List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpoints = - new ArrayList<>(); + new ArrayList<>(localSplitSources.size()); + List<CheckpointMarkT> checkpointMarks = new ArrayList<>(localSplitSources.size()); for (int i = 0; i < localSplitSources.size(); i++) { UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i); @@ -343,6 +358,7 @@ public class UnboundedSourceWrapper< @SuppressWarnings("unchecked") CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark(); + checkpointMarks.add(mark); KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv = KV.of(source, mark); checkpoints.add(kv); @@ -351,6 +367,18 @@ public class UnboundedSourceWrapper< try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { checkpointCoder.encode(checkpoints, baos, Coder.Context.OUTER); return baos.toByteArray(); + } finally { + // cleanup old pending checkpoints and add new checkpoint + int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS; + if (diff >= 0) { + for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator(); + diff >= 0; + diff--) { + iterator.next(); + iterator.remove(); + } + } + pendingCheckpoints.put(checkpointId, checkpointMarks); } } @@ -411,4 +439,27 @@ public class UnboundedSourceWrapper< public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() { return localSplitSources; } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + + List<CheckpointMarkT> checkpointMarks = pendingCheckpoints.get(checkpointId); + + if (checkpointMarks != null) { + + // remove old checkpoints including the current one + Iterator<Long> iterator = pendingCheckpoints.keySet().iterator(); + long currentId; + do { + currentId = iterator.next(); + iterator.remove(); + } while (currentId != checkpointId); + + // confirm all marks + for (CheckpointMarkT mark : checkpointMarks) { + mark.finalizeCheckpoint(); + } + + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf14e809/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 0cc584e..e728653 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -206,6 +207,12 @@ public class UnboundedSourceWrapperTest { // draw a snapshot byte[] snapshot = flinkWrapper.snapshotState(0, 0); + // test that finalizeCheckpoint on CheckpointMark is called + final ArrayList<Integer> finalizeList = new ArrayList<>(); + TestCountingSource.setFinalizeTracker(finalizeList); + flinkWrapper.notifyCheckpointComplete(0); + assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size()); + // create a completely new source but restore from the snapshot TestCountingSource restoredSource = new TestCountingSource(numElements); UnboundedSourceWrapper<
