[
https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591480#comment-14591480
]
ASF GitHub Bot commented on FLINK-2124:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/848#discussion_r32708806
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
---
@@ -17,37 +17,71 @@
package org.apache.flink.streaming.api.functions.source;
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
+import java.io.*;
import java.util.Iterator;
public class FromElementsFunction<T> implements SourceFunction<T> {
private static final long serialVersionUID = 1L;
- private Iterable<T> iterable;
+ private final TypeSerializer<T> serializer;
+ private final byte[] elements;
private volatile boolean isRunning = true;
- public FromElementsFunction(T... elements) {
- this.iterable = Arrays.asList(elements);
- }
+ public FromElementsFunction(TypeSerializer<T> serializer, final T...
elements) {
+ this(serializer, new Iterable<T>() {
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return index < elements.length;
+ }
- public FromElementsFunction(Collection<T> elements) {
- this.iterable = elements;
+ @Override
+ public T next() {
+ return elements[index++];
+ }
+ };
+ }
+ });
}
- public FromElementsFunction(Iterable<T> elements) {
- this.iterable = elements;
+ public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T>
elements) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputViewDataOutputStreamWrapper wrapper = new
OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+
+ try {
+ for (T element : elements)
+ serializer.serialize(element, wrapper);
+ } catch (IOException e) {
+ // ByteArrayOutputStream doesn't throw IOExceptions
when written to
+ }
+ // closing the DataOutputStream would just flush the
ByteArrayOutputStream, which in turn doesn't do anything.
+
+ this.serializer = serializer;
+ this.elements = baos.toByteArray();
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
- Iterator<T> it = iterable.iterator();
+ T value = serializer.createInstance();
+ ByteArrayInputStream bais = new ByteArrayInputStream(elements);
+ DataInputView input = new InputViewDataInputStreamWrapper(new
DataInputStream(bais));
- while (isRunning && it.hasNext()) {
- ctx.collect(it.next());
+ while (isRunning && bais.available() > 0) {
+ value = serializer.deserialize(value, input);
+ ctx.collect(value);
}
+ // closing the DataOutputStream would just close the
ByteArrayInputStream, which doesn't do anything
--- End diff --
I guess you meant the `DataInputStream`.
> FromElementsFunction is not really Serializable
> -----------------------------------------------
>
> Key: FLINK-2124
> URL: https://issues.apache.org/jira/browse/FLINK-2124
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Reporter: Aljoscha Krettek
>
> The function stores an Iterable of T. T is not necessarily Serializable and
> and Iterable is also not necessarily Serializable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)