[FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't Serializable
This change rewrites the FromElementsFunction to work with arbitrary types. The elements are serialized to a byte array (using a TypeSerializer) when the FromElementsFunction is constructed and deserialized when run is called. This closes #848 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a030c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a030c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a030c3 Branch: refs/heads/master Commit: e4a030c307597ff7ebf7aa7cc4435947820db3a0 Parents: 0c49891 Author: Johannes Reifferscheid <[email protected]> Authored: Wed Jun 17 15:35:37 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Jul 8 20:28:40 2015 +0200 ---------------------------------------------------------------------- .../environment/StreamExecutionEnvironment.java | 6 +- .../functions/source/FromElementsFunction.java | 70 ++++++++++++++++---- .../flink/streaming/api/SourceFunctionTest.java | 18 +++-- .../api/scala/StreamExecutionEnvironment.scala | 4 +- 4 files changed, 74 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 5e7be8d..f98a9f0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -473,7 +473,7 @@ public abstract class StreamExecutionEnvironment { TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]); - SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); + SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data); return addSource(function, "Elements source").returns(typeInfo); } @@ -504,7 +504,7 @@ public abstract class StreamExecutionEnvironment { } TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next()); - SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); + SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data); checkCollection(data, typeInfo.getTypeClass()); return addSource(function, "Collection Source").returns(typeInfo); @@ -529,7 +529,7 @@ public abstract class StreamExecutionEnvironment { throw new IllegalArgumentException("Collection must not be empty"); } - SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); + SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data); checkCollection(data, typeInfo.getTypeClass()); return addSource(function, "Collection Source").returns(typeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 736cc73..63eb0ad 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -17,37 +17,81 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Arrays; -import java.util.Collection; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.Iterator; public class FromElementsFunction<T> implements SourceFunction<T> { private static final long serialVersionUID = 1L; - private Iterable<T> iterable; + private final TypeSerializer<T> serializer; + private final byte[] elements; private volatile boolean isRunning = true; - public FromElementsFunction(T... elements) { - this.iterable = Arrays.asList(elements); - } + public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) { + this(serializer, new Iterable<T>() { + @Override + public Iterator<T> iterator() { + return new Iterator<T>() { + int index = 0; + + @Override + public boolean hasNext() { + return index < elements.length; + } + + @Override + public T next() { + return elements[index++]; + } - public FromElementsFunction(Collection<T> elements) { - this.iterable = elements; + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }); } - public FromElementsFunction(Iterable<T> elements) { - this.iterable = elements; + public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos)); + + try { + for (T element : elements) { + serializer.serialize(element, wrapper); + } + } catch (IOException e) { + // ByteArrayOutputStream doesn't throw IOExceptions when written to + } + // closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything. + + this.serializer = serializer; + this.elements = baos.toByteArray(); } @Override public void run(SourceContext<T> ctx) throws Exception { - Iterator<T> it = iterable.iterator(); + T value = serializer.createInstance(); + ByteArrayInputStream bais = new ByteArrayInputStream(elements); + DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais)); - while (isRunning && it.hasNext()) { - ctx.collect(it.next()); + while (isRunning && bais.available() > 0) { + value = serializer.deserialize(value, input); + ctx.collect(value); } + // closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java index 7a78205..f0fe63d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java @@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock; import java.util.Arrays; import java.util.List; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.util.SourceFunctionUtil; @@ -33,18 +35,22 @@ public class SourceFunctionTest { @Test public void fromElementsTest() throws Exception { List<Integer> expectedList = Arrays.asList(1, 2, 3); - List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>( - 1, - 2, - 3)); + List<Integer> actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable( + new FromElementsFunction<Integer>( + IntSerializer.INSTANCE, + 1, + 2, + 3))); assertEquals(expectedList, actualList); } @Test public void fromCollectionTest() throws Exception { List<Integer> expectedList = Arrays.asList(1, 2, 3); - List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>( - Arrays.asList(1, 2, 3))); + List<Integer> actualList = SourceFunctionUtil.runSourceFunction( + CommonTestUtils.createCopySerializable(new FromElementsFunction<Integer>( + IntSerializer.INSTANCE, + Arrays.asList(1, 2, 3)))); assertEquals(expectedList, actualList); } http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 968e07f..7215a4d 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -284,8 +284,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { require(data != null, "Data must not be null.") val typeInfo = implicitly[TypeInformation[T]] - val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions - .asJavaCollection(data)) + val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig), + scala.collection.JavaConversions.asJavaCollection(data)) javaEnv.addSource(sourceFunction).returns(typeInfo) }
