[ 
https://issues.apache.org/jira/browse/FLINK-2124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592987#comment-14592987
 ] 

ASF GitHub Bot commented on FLINK-2124:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/848#discussion_r32801136
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
 ---
    @@ -17,37 +17,81 @@
     
     package org.apache.flink.streaming.api.functions.source;
     
    -import java.util.Arrays;
    -import java.util.Collection;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
    +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
     import java.util.Iterator;
     
     public class FromElementsFunction<T> implements SourceFunction<T> {
        
        private static final long serialVersionUID = 1L;
     
    -   private Iterable<T> iterable;
    +   private final TypeSerializer<T> serializer;
    +   private final byte[] elements;
     
        private volatile boolean isRunning = true;
     
    -   public FromElementsFunction(T... elements) {
    -           this.iterable = Arrays.asList(elements);
    -   }
    +   public FromElementsFunction(TypeSerializer<T> serializer, final T... 
elements) {
    +           this(serializer, new Iterable<T>() {
    +                   @Override
    +                   public Iterator<T> iterator() {
    +                           return new Iterator<T>() {
    +                                   int index = 0;
    +
    +                                   @Override
    +                                   public boolean hasNext() {
    +                                           return index < elements.length;
    +                                   }
    +
    +                                   @Override
    +                                   public T next() {
    +                                           return elements[index++];
    +                                   }
     
    -   public FromElementsFunction(Collection<T> elements) {
    -           this.iterable = elements;
    +                                   @Override
    +                                   public void remove() {
    +                                           throw new 
UnsupportedOperationException();
    +                                   }
    +                           };
    +                   }
    +           });
        }
     
    -   public FromElementsFunction(Iterable<T> elements) {
    -           this.iterable = elements;
    +   public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> 
elements) {
    +           ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +           OutputViewDataOutputStreamWrapper wrapper = new 
OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
    +
    +           try {
    +                   for (T element : elements) {
    +                           serializer.serialize(element, wrapper);
    +                   }
    +           } catch (IOException e) {
    +                   // ByteArrayOutputStream doesn't throw IOExceptions 
when written to
    --- End diff --
    
    I think not forwarding the exception here is a bad idea.


> FromElementsFunction is not really Serializable
> -----------------------------------------------
>
>                 Key: FLINK-2124
>                 URL: https://issues.apache.org/jira/browse/FLINK-2124
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Aljoscha Krettek
>            Assignee: Johannes Reifferscheid
>
> The function stores an Iterable of T. T is not necessarily Serializable and 
> and Iterable is also not necessarily Serializable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to