[FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't 
Serializable

This change rewrites the FromElementsFunction to work with arbitrary types. The 
elements
are serialized to a byte array (using a TypeSerializer) when the 
FromElementsFunction is
constructed and deserialized when run is called.

This closes #848


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a030c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a030c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a030c3

Branch: refs/heads/master
Commit: e4a030c307597ff7ebf7aa7cc4435947820db3a0
Parents: 0c49891
Author: Johannes Reifferscheid <[email protected]>
Authored: Wed Jun 17 15:35:37 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  6 +-
 .../functions/source/FromElementsFunction.java  | 70 ++++++++++++++++----
 .../flink/streaming/api/SourceFunctionTest.java | 18 +++--
 .../api/scala/StreamExecutionEnvironment.scala  |  4 +-
 4 files changed, 74 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5e7be8d..f98a9f0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -473,7 +473,7 @@ public abstract class StreamExecutionEnvironment {
 
                TypeInformation<OUT> typeInfo = 
TypeExtractor.getForObject(data[0]);
 
-               SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
+               SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 
                return addSource(function, "Elements source").returns(typeInfo);
        }
@@ -504,7 +504,7 @@ public abstract class StreamExecutionEnvironment {
                }
 
                TypeInformation<OUT> typeInfo = 
TypeExtractor.getForObject(data.iterator().next());
-               SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
+               SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
                checkCollection(data, typeInfo.getTypeClass());
 
                return addSource(function, "Collection 
Source").returns(typeInfo);
@@ -529,7 +529,7 @@ public abstract class StreamExecutionEnvironment {
                        throw new IllegalArgumentException("Collection must not 
be empty");
                }
 
-               SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
+               SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
                checkCollection(data, typeInfo.getTypeClass());
 
                return addSource(function, "Collection 
Source").returns(typeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 736cc73..63eb0ad 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -17,37 +17,81 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Iterator;
 
 public class FromElementsFunction<T> implements SourceFunction<T> {
        
        private static final long serialVersionUID = 1L;
 
-       private Iterable<T> iterable;
+       private final TypeSerializer<T> serializer;
+       private final byte[] elements;
 
        private volatile boolean isRunning = true;
 
-       public FromElementsFunction(T... elements) {
-               this.iterable = Arrays.asList(elements);
-       }
+       public FromElementsFunction(TypeSerializer<T> serializer, final T... 
elements) {
+               this(serializer, new Iterable<T>() {
+                       @Override
+                       public Iterator<T> iterator() {
+                               return new Iterator<T>() {
+                                       int index = 0;
+
+                                       @Override
+                                       public boolean hasNext() {
+                                               return index < elements.length;
+                                       }
+
+                                       @Override
+                                       public T next() {
+                                               return elements[index++];
+                                       }
 
-       public FromElementsFunction(Collection<T> elements) {
-               this.iterable = elements;
+                                       @Override
+                                       public void remove() {
+                                               throw new 
UnsupportedOperationException();
+                                       }
+                               };
+                       }
+               });
        }
 
-       public FromElementsFunction(Iterable<T> elements) {
-               this.iterable = elements;
+       public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> 
elements) {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               OutputViewDataOutputStreamWrapper wrapper = new 
OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+
+               try {
+                       for (T element : elements) {
+                               serializer.serialize(element, wrapper);
+                       }
+               } catch (IOException e) {
+                       // ByteArrayOutputStream doesn't throw IOExceptions 
when written to
+               }
+               // closing the DataOutputStream would just flush the 
ByteArrayOutputStream, which in turn doesn't do anything.
+
+               this.serializer = serializer;
+               this.elements = baos.toByteArray();
        }
 
        @Override
        public void run(SourceContext<T> ctx) throws Exception {
-               Iterator<T> it = iterable.iterator();
+               T value = serializer.createInstance();
+               ByteArrayInputStream bais = new ByteArrayInputStream(elements);
+               DataInputView input = new InputViewDataInputStreamWrapper(new 
DataInputStream(bais));
 
-               while (isRunning && it.hasNext()) {
-                       ctx.collect(it.next());
+               while (isRunning && bais.available() > 0) {
+                       value = serializer.deserialize(value, input);
+                       ctx.collect(value);
                }
+               // closing the DataInputStream would just close the 
ByteArrayInputStream, which doesn't do anything
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index 7a78205..f0fe63d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.util.SourceFunctionUtil;
@@ -33,18 +35,22 @@ public class SourceFunctionTest {
        @Test
        public void fromElementsTest() throws Exception {
                List<Integer> expectedList = Arrays.asList(1, 2, 3);
-               List<Integer> actualList = 
SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
-                               1,
-                               2,
-                               3));
+               List<Integer> actualList = 
SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable(
+                               new FromElementsFunction<Integer>(
+                                               IntSerializer.INSTANCE,
+                                               1,
+                                               2,
+                                               3)));
                assertEquals(expectedList, actualList);
        }
 
        @Test
        public void fromCollectionTest() throws Exception {
                List<Integer> expectedList = Arrays.asList(1, 2, 3);
-               List<Integer> actualList = 
SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
-                               Arrays.asList(1, 2, 3)));
+               List<Integer> actualList = SourceFunctionUtil.runSourceFunction(
+                               CommonTestUtils.createCopySerializable(new 
FromElementsFunction<Integer>(
+                                               IntSerializer.INSTANCE,
+                                               Arrays.asList(1, 2, 3))));
                assertEquals(expectedList, actualList);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 968e07f..7215a4d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -284,8 +284,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
-    val sourceFunction = new 
FromElementsFunction[T](scala.collection.JavaConversions
-      .asJavaCollection(data))
+    val sourceFunction = new 
FromElementsFunction[T](typeInfo.createSerializer(getConfig),
+      scala.collection.JavaConversions.asJavaCollection(data))
 
     javaEnv.addSource(sourceFunction).returns(typeInfo)
   }

Reply via email to