[ 
https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14591480#comment-14591480
 ] 

ASF GitHub Bot commented on FLINK-2124:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/848#discussion_r32708806
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 ---
    @@ -17,37 +17,71 @@
     
     package org.apache.flink.streaming.api.functions.source;
     
    -import java.util.Arrays;
    -import java.util.Collection;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
    +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
    +
    +import java.io.*;
     import java.util.Iterator;
     
     public class FromElementsFunction<T> implements SourceFunction<T> {
        
        private static final long serialVersionUID = 1L;
     
    -   private Iterable<T> iterable;
    +   private final TypeSerializer<T> serializer;
    +   private final byte[] elements;
     
        private volatile boolean isRunning = true;
     
    -   public FromElementsFunction(T... elements) {
    -           this.iterable = Arrays.asList(elements);
    -   }
    +   public FromElementsFunction(TypeSerializer<T> serializer, final T... 
elements) {
    +           this(serializer, new Iterable<T>() {
    +                   @Override
    +                   public Iterator<T> iterator() {
    +                           return new Iterator<T>() {
    +                                   int index = 0;
    +
    +                                   @Override
    +                                   public boolean hasNext() {
    +                                           return index < elements.length;
    +                                   }
     
    -   public FromElementsFunction(Collection<T> elements) {
    -           this.iterable = elements;
    +                                   @Override
    +                                   public T next() {
    +                                           return elements[index++];
    +                                   }
    +                           };
    +                   }
    +           });
        }
     
    -   public FromElementsFunction(Iterable<T> elements) {
    -           this.iterable = elements;
    +   public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> 
elements) {
    +           ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +           OutputViewDataOutputStreamWrapper wrapper = new 
OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
    +
    +           try {
    +                   for (T element : elements)
    +                           serializer.serialize(element, wrapper);
    +           } catch (IOException e) {
    +                   // ByteArrayOutputStream doesn't throw IOExceptions 
when written to
    +           }
    +           // closing the DataOutputStream would just flush the 
ByteArrayOutputStream, which in turn doesn't do anything.
    +
    +           this.serializer = serializer;
    +           this.elements = baos.toByteArray();
        }
     
        @Override
        public void run(SourceContext<T> ctx) throws Exception {
    -           Iterator<T> it = iterable.iterator();
    +           T value = serializer.createInstance();
    +           ByteArrayInputStream bais = new ByteArrayInputStream(elements);
    +           DataInputView input = new InputViewDataInputStreamWrapper(new 
DataInputStream(bais));
     
    -           while (isRunning && it.hasNext()) {
    -                   ctx.collect(it.next());
    +           while (isRunning && bais.available() > 0) {
    +                   value = serializer.deserialize(value, input);
    +                   ctx.collect(value);
                }
    +           // closing the DataOutputStream would just close the 
ByteArrayInputStream, which doesn't do anything
    --- End diff --
    
    I guess you meant the `DataInputStream`.


> FromElementsFunction is not really Serializable
> -----------------------------------------------
>
>                 Key: FLINK-2124
>                 URL: https://issues.apache.org/jira/browse/FLINK-2124
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>
> The function stores an Iterable of T. T is not necessarily Serializable and 
> and Iterable is also not necessarily Serializable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to