[FLINK-2330] [streaming] Make FromElementsFunction checkpointable

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e451a4ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e451a4ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e451a4ae

Branch: refs/heads/master
Commit: e451a4ae00d84c2ed1aae1bec6cc2c43caa2bf90
Parents: 28713a2
Author: Stephan Ewen <[email protected]>
Authored: Wed Jul 8 19:02:59 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jul 8 20:28:41 2015 +0200

----------------------------------------------------------------------
 .../functions/source/FromElementsFunction.java  | 74 ++++++++++++++++++--
 .../api/functions/FromElementsFunctionTest.java | 74 ++++++++++++++++++++
 .../api/functions/ListSourceContext.java        | 16 +++++
 3 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 394fa77..28544ee 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -39,7 +40,7 @@ import java.util.Collection;
  * 
  * @param <T> The type of elements returned by this function.
  */
-public class FromElementsFunction<T> implements SourceFunction<T> {
+public class FromElementsFunction<T> implements SourceFunction<T>, 
CheckpointedAsynchronously<Integer> {
        
        private static final long serialVersionUID = 1L;
 
@@ -52,6 +53,12 @@ public class FromElementsFunction<T> implements 
SourceFunction<T> {
        /** The number of serialized elements */
        private final int numElements;
 
+       /** The number of elements emitted already */
+       private volatile int numElementsEmitted;
+
+       /** The number of elements to skip initially */
+       private volatile int numElementsToSkip;
+       
        /** Flag to make the source cancelable */
        private volatile boolean isRunning = true;
 
@@ -83,10 +90,29 @@ public class FromElementsFunction<T> implements 
SourceFunction<T> {
        @Override
        public void run(SourceContext<T> ctx) throws Exception {
                ByteArrayInputStream bais = new 
ByteArrayInputStream(elementsSerialized);
-               DataInputView input = new InputViewDataInputStreamWrapper(new 
DataInputStream(bais));
-
-               int numEmitted = 0;
-               while (isRunning && numEmitted++ < numElements) {
+               final DataInputView input = new 
InputViewDataInputStreamWrapper(new DataInputStream(bais));
+               
+               // if we are restored from a checkpoint and need to skip 
elements, skip them now.
+               int toSkip = numElementsToSkip;
+               if (toSkip > 0) {
+                       try {
+                               while (toSkip > 0) {
+                                       serializer.deserialize(input);
+                                       toSkip--;
+                               }
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Failed to deserialize an 
element from the source. " +
+                                               "If you are using user-defined 
serialization (Value and Writable types), check the " +
+                                               "serialization 
functions.\nSerializer is " + serializer);
+                       }
+                       
+                       this.numElementsEmitted = this.numElementsToSkip;
+               }
+               
+               final Object lock = ctx.getCheckpointLock();
+               
+               while (isRunning && numElementsEmitted < numElements) {
                        T next;
                        try {
                                next = serializer.deserialize(input);
@@ -97,7 +123,10 @@ public class FromElementsFunction<T> implements 
SourceFunction<T> {
                                                "serialization 
functions.\nSerializer is " + serializer);
                        }
                        
-                       ctx.collect(next);
+                       synchronized (lock) {
+                               ctx.collect(next);
+                               numElementsEmitted++;
+                       }
                }
        }
 
@@ -105,6 +134,39 @@ public class FromElementsFunction<T> implements 
SourceFunction<T> {
        public void cancel() {
                isRunning = false;
        }
+
+
+       /**
+        * Gets the number of elements produced in total by this function.
+        * 
+        * @return The number of elements produced in total.
+        */
+       public int getNumElements() {
+               return numElements;
+       }
+
+       /**
+        * Gets the number of elements emitted so far.
+        * 
+        * @return The number of elements emitted so far.
+        */
+       public int getNumElementsEmitted() {
+               return numElementsEmitted;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Checkpointing
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
+               return this.numElementsEmitted;
+       }
+
+       @Override
+       public void restoreState(Integer state) {
+               this.numElementsToSkip = state;
+       }
        
        // 
------------------------------------------------------------------------
        //  Utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index db91b33..9c3653b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -21,13 +21,17 @@ package org.apache.flink.streaming.api.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -125,6 +129,76 @@ public class FromElementsFunctionTest {
                }
        }
        
+       @Test
+       public void testCheckpointAndRestore() {
+               try {
+                       final int NUM_ELEMENTS = 10000;
+                       
+                       List<Integer> data = new 
ArrayList<Integer>(NUM_ELEMENTS);
+                       List<Integer> result = new 
ArrayList<Integer>(NUM_ELEMENTS);
+                       
+                       for (int i = 0; i < NUM_ELEMENTS; i++) {
+                               data.add(i);
+                       }
+                       
+                       final FromElementsFunction<Integer> source = new 
FromElementsFunction<Integer>(IntSerializer.INSTANCE, data);
+                       final FromElementsFunction<Integer> sourceCopy = 
CommonTestUtils.createCopySerializable(source);
+                       
+                       final SourceFunction.SourceContext<Integer> ctx = new 
ListSourceContext<Integer>(result, 2L);
+                       
+                       final Throwable[] error = new Throwable[1];
+                       
+                       // run the source asynchronously
+                       Thread runner = new Thread() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               source.run(ctx);
+                                       }
+                                       catch (Throwable t) {
+                                               error[0] = t;
+                                       }
+                               }
+                       };
+                       runner.start();
+                       
+                       // wait for a bit 
+                       Thread.sleep(1000);
+                       
+                       // make a checkpoint
+                       int count;
+                       List<Integer> checkpointData = new 
ArrayList<Integer>(NUM_ELEMENTS);
+                       
+                       synchronized (ctx.getCheckpointLock()) {
+                               count = source.snapshotState(566, 
System.currentTimeMillis());
+                               checkpointData.addAll(result);
+                       }
+                       
+                       // cancel the source
+                       source.cancel();
+                       runner.join();
+                       
+                       // check for errors
+                       if (error[0] != null) {
+                               System.err.println("Error in asynchronous 
source runner");
+                               error[0].printStackTrace();
+                               fail("Error in asynchronous source runner");
+                       }
+                       
+                       // recovery run
+                       SourceFunction.SourceContext<Integer> newCtx = new 
ListSourceContext<Integer>(checkpointData);
+                       sourceCopy.restoreState(count);
+                       
+                       sourceCopy.run(newCtx);
+                       
+                       assertEquals(data, checkpointData);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
        
        // 
------------------------------------------------------------------------
        //  Test Types

http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index e718633..f241955 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -33,14 +33,30 @@ public class ListSourceContext<T> implements 
SourceFunction.SourceContext<T> {
        
        private final List<T> target;
 
+       private final long delay;
+       
        
        public ListSourceContext(List<T> target) {
+               this(target, 0L);
+       }
+
+       public ListSourceContext(List<T> target, long delay) {
                this.target = target;
+               this.delay = delay;
        }
 
        @Override
        public void collect(T element) {
                target.add(element);
+               
+               if (delay > 0) {
+                       try {
+                               Thread.sleep(delay);
+                       }
+                       catch (InterruptedException e) {
+                               // ignore
+                       }
+               }
        }
 
        @Override

Reply via email to