[FLINK-2330] [streaming] Make FromElementsFunction checkpointable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e451a4ae Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e451a4ae Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e451a4ae Branch: refs/heads/master Commit: e451a4ae00d84c2ed1aae1bec6cc2c43caa2bf90 Parents: 28713a2 Author: Stephan Ewen <[email protected]> Authored: Wed Jul 8 19:02:59 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Jul 8 20:28:41 2015 +0200 ---------------------------------------------------------------------- .../functions/source/FromElementsFunction.java | 74 ++++++++++++++++++-- .../api/functions/FromElementsFunctionTest.java | 74 ++++++++++++++++++++ .../api/functions/ListSourceContext.java | 16 +++++ 3 files changed, 158 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 394fa77..28544ee 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -39,7 +40,7 @@ import java.util.Collection; * * @param <T> The type of elements returned by this function. */ -public class FromElementsFunction<T> implements SourceFunction<T> { +public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedAsynchronously<Integer> { private static final long serialVersionUID = 1L; @@ -52,6 +53,12 @@ public class FromElementsFunction<T> implements SourceFunction<T> { /** The number of serialized elements */ private final int numElements; + /** The number of elements emitted already */ + private volatile int numElementsEmitted; + + /** The number of elements to skip initially */ + private volatile int numElementsToSkip; + /** Flag to make the source cancelable */ private volatile boolean isRunning = true; @@ -83,10 +90,29 @@ public class FromElementsFunction<T> implements SourceFunction<T> { @Override public void run(SourceContext<T> ctx) throws Exception { ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized); - DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais)); - - int numEmitted = 0; - while (isRunning && numEmitted++ < numElements) { + final DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais)); + + // if we are restored from a checkpoint and need to skip elements, skip them now. + int toSkip = numElementsToSkip; + if (toSkip > 0) { + try { + while (toSkip > 0) { + serializer.deserialize(input); + toSkip--; + } + } + catch (Exception e) { + throw new IOException("Failed to deserialize an element from the source. " + + "If you are using user-defined serialization (Value and Writable types), check the " + + "serialization functions.\nSerializer is " + serializer); + } + + this.numElementsEmitted = this.numElementsToSkip; + } + + final Object lock = ctx.getCheckpointLock(); + + while (isRunning && numElementsEmitted < numElements) { T next; try { next = serializer.deserialize(input); @@ -97,7 +123,10 @@ public class FromElementsFunction<T> implements SourceFunction<T> { "serialization functions.\nSerializer is " + serializer); } - ctx.collect(next); + synchronized (lock) { + ctx.collect(next); + numElementsEmitted++; + } } } @@ -105,6 +134,39 @@ public class FromElementsFunction<T> implements SourceFunction<T> { public void cancel() { isRunning = false; } + + + /** + * Gets the number of elements produced in total by this function. + * + * @return The number of elements produced in total. + */ + public int getNumElements() { + return numElements; + } + + /** + * Gets the number of elements emitted so far. + * + * @return The number of elements emitted so far. + */ + public int getNumElementsEmitted() { + return numElementsEmitted; + } + + // ------------------------------------------------------------------------ + // Checkpointing + // ------------------------------------------------------------------------ + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return this.numElementsEmitted; + } + + @Override + public void restoreState(Integer state) { + this.numElementsToSkip = state; + } // ------------------------------------------------------------------------ // Utilities http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java index db91b33..9c3653b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java @@ -21,13 +21,17 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.types.Value; import org.apache.flink.util.ExceptionUtils; + import org.junit.Test; import java.io.IOException; @@ -125,6 +129,76 @@ public class FromElementsFunctionTest { } } + @Test + public void testCheckpointAndRestore() { + try { + final int NUM_ELEMENTS = 10000; + + List<Integer> data = new ArrayList<Integer>(NUM_ELEMENTS); + List<Integer> result = new ArrayList<Integer>(NUM_ELEMENTS); + + for (int i = 0; i < NUM_ELEMENTS; i++) { + data.add(i); + } + + final FromElementsFunction<Integer> source = new FromElementsFunction<Integer>(IntSerializer.INSTANCE, data); + final FromElementsFunction<Integer> sourceCopy = CommonTestUtils.createCopySerializable(source); + + final SourceFunction.SourceContext<Integer> ctx = new ListSourceContext<Integer>(result, 2L); + + final Throwable[] error = new Throwable[1]; + + // run the source asynchronously + Thread runner = new Thread() { + @Override + public void run() { + try { + source.run(ctx); + } + catch (Throwable t) { + error[0] = t; + } + } + }; + runner.start(); + + // wait for a bit + Thread.sleep(1000); + + // make a checkpoint + int count; + List<Integer> checkpointData = new ArrayList<Integer>(NUM_ELEMENTS); + + synchronized (ctx.getCheckpointLock()) { + count = source.snapshotState(566, System.currentTimeMillis()); + checkpointData.addAll(result); + } + + // cancel the source + source.cancel(); + runner.join(); + + // check for errors + if (error[0] != null) { + System.err.println("Error in asynchronous source runner"); + error[0].printStackTrace(); + fail("Error in asynchronous source runner"); + } + + // recovery run + SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<Integer>(checkpointData); + sourceCopy.restoreState(count); + + sourceCopy.run(newCtx); + + assertEquals(data, checkpointData); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + // ------------------------------------------------------------------------ // Test Types http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java index e718633..f241955 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java @@ -33,14 +33,30 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> { private final List<T> target; + private final long delay; + public ListSourceContext(List<T> target) { + this(target, 0L); + } + + public ListSourceContext(List<T> target, long delay) { this.target = target; + this.delay = delay; } @Override public void collect(T element) { target.add(element); + + if (delay > 0) { + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { + // ignore + } + } } @Override
