[ 
https://issues.apache.org/jira/browse/FLINK-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14552049#comment-14552049
 ] 

ASF GitHub Bot commented on FLINK-1687:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/521#discussion_r30683935
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
    @@ -19,396 +19,867 @@
     package org.apache.flink.streaming.api.scala
     
     import com.esotericsoftware.kryo.Serializer
    +import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
TypeExtractor}
     import org.apache.flink.api.scala.ClosureCleaner
    +import org.apache.flink.streaming.api.datastream.DataStreamSource
     import 
org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => 
JavaEnv}
     import 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
    -import 
org.apache.flink.streaming.api.functions.source.{FromElementsFunction, 
SourceFunction}
    -import org.apache.flink.util.Collector
    +import org.apache.flink.streaming.api.functions.source._
    +import org.apache.flink.streaming.api.operators.StreamSource
    +import org.apache.flink.types.StringValue
    +import org.apache.flink.util.{Collector, SplittableIterator}
     
     import scala.reflect.ClassTag
     
     class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     
    -  /**
    -   * Sets the parallelism for operations executed through this environment.
    -   * Setting a parallelism of x here will cause all operators (such as 
join, map, reduce) to run
    -   * with x parallel instances. This value can be overridden by specific 
operations using
    -   * [[DataStream.setParallelism]].
    -   * @deprecated Please use [[setParallelism]]
    -   */
    -  @deprecated
    -  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
    -    javaEnv.setParallelism(degreeOfParallelism)
    -  }
    -
    -  /**
    -   * Sets the parallelism for operations executed through this environment.
    -   * Setting a parallelism of x here will cause all operators (such as 
join, map, reduce) to run
    -   * with x parallel instances. This value can be overridden by specific 
operations using
    -   * [[DataStream.setParallelism]].
    -   */
    -  def setParallelism(parallelism: Int): Unit = {
    -    javaEnv.setParallelism(parallelism)
    -  }
    -
    -  /**
    -   * Returns the default parallelism for this execution environment. Note 
that this
    -   * value can be overridden by individual operations using 
[[DataStream.setParallelism]]
    -   * @deprecated Please use [[getParallelism]]
    -   */
    -  @deprecated
    -  def getDegreeOfParallelism = javaEnv.getParallelism
    -
    -  /**
    -   * Returns the default parallelism for this execution environment. Note 
that this
    -   * value can be overridden by individual operations using 
[[DataStream.setParallelism]]
    -   */
    -  def getParallelism = javaEnv.getParallelism
    -
    -  /**
    -   * Sets the maximum time frequency (milliseconds) for the flushing of the
    -   * output buffers. By default the output buffers flush frequently to 
provide
    -   * low latency and to aid smooth developer experience. Setting the 
parameter
    -   * can result in three logical modes:
    -   *
    -   * <ul>
    -   * <li>
    -   * A positive integer triggers flushing periodically by that integer</li>
    -   * <li>
    -   * 0 triggers flushing after every record thus minimizing latency</li>
    -   * <li>
    -   * -1 triggers flushing only when the output buffer is full thus 
maximizing
    -   * throughput</li>
    -   * </ul>
    -   *
    -   */
    -  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
    -    javaEnv.setBufferTimeout(timeoutMillis)
    -    this
    -  }
    -
    -  /**
    -   * Gets the default buffer timeout set for this environment
    -   */
    -  def getBufferTimout: Long = javaEnv.getBufferTimeout()
    -
    -  /**
    -   * Method for enabling fault-tolerance. Activates monitoring and backup 
of streaming
    -   * operator states. Time interval between state checkpoints is specified 
in in millis.
    -   *
    -   * Setting this option assumes that the job is used in production and 
thus if not stated
    -   * explicitly otherwise with calling with the
    -   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} 
method in case of
    -   * failure the job will be resubmitted to the cluster indefinitely.
    -   */
    -  def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
    -    javaEnv.enableCheckpointing(interval)
    -    this
    -  }
    -
    -  /**
    -   * Method for enabling fault-tolerance. Activates monitoring and backup 
of streaming
    -   * operator states. Time interval between state checkpoints is specified 
in in millis.
    -   *
    -   * Setting this option assumes that the job is used in production and 
thus if not stated
    -   * explicitly otherwise with calling with the
    -   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} 
method in case of
    -   * failure the job will be resubmitted to the cluster indefinitely.
    -   */
    -  def enableCheckpointing() : StreamExecutionEnvironment = {
    -    javaEnv.enableCheckpointing()
    -    this
    -  }
    -  
    -  /**
    -   * Disables operator chaining for streaming operators. Operator chaining
    -   * allows non-shuffle operations to be co-located in the same thread 
fully
    -   * avoiding serialization and de-serialization.
    -   * 
    -   */
    -  def disableOperatorChaning(): StreamExecutionEnvironment = {
    -    javaEnv.disableOperatorChaning()
    -    this
    -  }
    -
    -  /**
    -   * Sets the number of times that failed tasks are re-executed. A value 
of zero
    -   * effectively disables fault tolerance. A value of "-1" indicates that 
the system
    -   * default value (as defined in the configuration) should be used.
    -   */
    -  def setNumberOfExecutionRetries(numRetries: Int): Unit = {
    -    javaEnv.setNumberOfExecutionRetries(numRetries)
    -  }
    -
    -  /**
    -   * Gets the number of times the system will try to re-execute failed 
tasks. A value
    -   * of "-1" indicates that the system default value (as defined in the 
configuration)
    -   * should be used.
    -   */
    -  def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
    -
    -
    -  /**
    -   * Registers the given type with the serializer at the 
[[KryoSerializer]].
    -   *
    -   * Note that the serializer instance must be serializable (as defined by 
java.io.Serializable),
    -   * because it may be distributed to the worker nodes by java 
serialization.
    -   */
    -  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: 
Serializer[_]): Unit = {
    -    javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    -  }
    -
    -  /**
    -   * Registers the given type with the serializer at the 
[[KryoSerializer]].
    -   */
    -  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ 
<: Serializer[_]]) {
    -    javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    -  }
    -
    -
    -  /**
    -   * Registers a default serializer for the given class and its 
sub-classes at Kryo.
    -   */
    -  def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ 
<: Serializer[_]]) {
    -    javaEnv.addDefaultKryoSerializer(clazz, serializer)
    -  }
    -
    -  /**
    -   * Registers a default serializer for the given class and its 
sub-classes at Kryo.
    -   *
    -   * Note that the serializer instance must be serializable (as defined by 
java.io.Serializable),
    -   * because it may be distributed to the worker nodes by java 
serialization.
    -   */
    -  def registerDefaultKryoSerializer(clazz: Class[_], serializer: 
Serializer[_]): Unit = {
    -    javaEnv.addDefaultKryoSerializer(clazz, serializer)
    -  }
    -
    -  /**
    -   * Registers the given type with the serialization stack. If the type is 
eventually
    -   * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
    -   * type ends up being serialized with Kryo, then it will be registered 
at Kryo to make
    -   * sure that only tags are written.
    -   *
    -   */
    -  def registerType(typeClass: Class[_]) {
    -    javaEnv.registerType(typeClass)
    -  }
    -
    -  /**
    -   * Creates a DataStream that represents the Strings produced by reading 
the
    -   * given file line wise. The file will be read with the system's default
    -   * character set.
    -   *
    -   */
    -  def readTextFile(filePath: String): DataStream[String] =
    -    javaEnv.readTextFile(filePath)
    -
    -  /**
    -   * Creates a DataStream that contains the contents of file created while
    -   * system watches the given path. The file will be read with the system's
    -   * default character set. The user can check the monitoring interval in 
milliseconds,
    -   * and the way file modifications are handled. By default it checks for 
only new files
    -   * every 100 milliseconds.
    -   *
    -   */
    -  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
watchType: WatchType = 
    -    WatchType.ONLY_NEW_FILES): DataStream[String] =
    -    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
    -
    -  /**
    -   * Creates a new DataStream that contains the strings received infinitely
    -   * from socket. Received strings are decoded by the system's default
    -   * character set.
    -   *
    -   */
    -  def socketTextStream(hostname: String, port: Int, delimiter: Char): 
DataStream[String] =
    -    javaEnv.socketTextStream(hostname, port, delimiter)
    -
    -  /**
    -   * Creates a new DataStream that contains the strings received infinitely
    -   * from socket. Received strings are decoded by the system's default
    -   * character set, uses '\n' as delimiter.
    -   *
    -   */
    -  def socketTextStream(hostname: String, port: Int): DataStream[String] =
    -    javaEnv.socketTextStream(hostname, port)
    -
    -  /**
    -   * Creates a new DataStream that contains a sequence of numbers.
    -   *
    -   */
    -  def generateSequence(from: Long, to: Long): DataStream[Long] = {
    -    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
    -      asInstanceOf[DataStream[Long]]
    -  }
    -
    -  /**
    -   * Creates a DataStream that contains the given elements. The elements 
must all be of the
    -   * same type and must be serializable.
    -   *
    -   * * Note that this operation will result in a non-parallel data source, 
i.e. a data source with
    -   * a parallelism of one.
    -   */
    -  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] 
= {
    -    val typeInfo = implicitly[TypeInformation[T]]
    -    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
    -  }
    -
    -  /**
    -   * Creates a DataStream from the given non-empty [[Seq]]. The elements 
need to be serializable
    -   * because the framework may move the elements into the cluster if 
needed.
    -   *
    -   * Note that this operation will result in a non-parallel data source, 
i.e. a data source with
    -   * a parallelism of one.
    -   */
    -  def fromCollection[T: ClassTag: TypeInformation](
    -    data: Seq[T]): DataStream[T] = {
    -    require(data != null, "Data must not be null.")
    -    val typeInfo = implicitly[TypeInformation[T]]
    -
    -    val sourceFunction = new 
FromElementsFunction[T](scala.collection.JavaConversions
    -        .asJavaCollection(data))
    -        
    -    javaEnv.addSource(sourceFunction).returns(typeInfo)
    -  }
    -
    -  /**
    -   * Create a DataStream using a user defined source function for arbitrary
    -   * source functionality. By default sources have a parallelism of 1. 
    -   * To enable parallel execution, the user defined source should 
implement 
    -   * ParallelSourceFunction or extend RichParallelSourceFunction. 
    -   * In these cases the resulting source will have the parallelism of the 
environment. 
    -   * To change this afterwards call DataStreamSource.setParallelism(int)
    -   *
    -   */
    -  def addSource[T: ClassTag: TypeInformation](function: 
SourceFunction[T]): DataStream[T] = {
    -    require(function != null, "Function must not be null.")
    -    val cleanFun = StreamExecutionEnvironment.clean(function)
    -    val typeInfo = implicitly[TypeInformation[T]]
    -    javaEnv.addSource(cleanFun).returns(typeInfo)
    -  }
    -  
    -   /**
    -   * Create a DataStream using a user defined source function for arbitrary
    -   * source functionality.
    -   *
    -   */
    -  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => 
Unit): DataStream[T] = {
    -    require(function != null, "Function must not be null.")
    -    val sourceFunction = new SourceFunction[T] {
    -      val cleanFun = StreamExecutionEnvironment.clean(function)
    -      override def run(out: Collector[T]) {
    -        cleanFun(out)
    -      }
    -      override def cancel() = {}
    -    }
    -    addSource(sourceFunction)
    -  }
    -
    -  /**
    -   * Triggers the program execution. The environment will execute all 
parts of
    -   * the program that have resulted in a "sink" operation. Sink operations 
are
    -   * for example printing results or forwarding them to a message queue.
    -   * <p>
    -   * The program execution will be logged and displayed with a generated
    -   * default name.
    -   *
    -   */
    -  def execute() = javaEnv.execute()
    -
    -  /**
    -   * Triggers the program execution. The environment will execute all 
parts of
    -   * the program that have resulted in a "sink" operation. Sink operations 
are
    -   * for example printing results or forwarding them to a message queue.
    -   * <p>
    -   * The program execution will be logged and displayed with the provided 
name
    -   *
    -   */
    -  def execute(jobName: String) = javaEnv.execute(jobName)
    -
    -  /**
    -   * Creates the plan with which the system will execute the program, and
    -   * returns it as a String using a JSON representation of the execution 
data
    -   * flow graph. Note that this needs to be called, before the plan is
    -   * executed.
    -   *
    -   */
    -  def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
    +   /**
    +    * Sets the parallelism for operations executed through this 
environment.
    +    * Setting a parallelism of x here will cause all operators (such as 
join, map, reduce) to run
    +    * with x parallel instances. This value can be overridden by specific 
operations using
    +    * [[DataStream.setParallelism]].
    +    * @deprecated Please use [[setParallelism]]
    +    */
    +   @deprecated
    +   def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
    +           javaEnv.setParallelism(degreeOfParallelism)
    +   }
    +
    +   /**
    +    * Sets the parallelism for operations executed through this 
environment.
    +    * Setting a parallelism of x here will cause all operators (such as 
join, map, reduce) to run
    +    * with x parallel instances. This value can be overridden by specific 
operations using
    +    * [[DataStream.setParallelism]].
    +    */
    +   def setParallelism(parallelism: Int): Unit = {
    +           javaEnv.setParallelism(parallelism)
    +   }
    +
    +   /**
    +    * Returns the default parallelism for this execution environment. Note 
that this
    +    * value can be overridden by individual operations using 
[[DataStream.setParallelism]]
    +    * @deprecated Please use [[getParallelism]]
    +    */
    +   @deprecated
    +   def getDegreeOfParallelism = javaEnv.getParallelism
    +
    +   /**
    +    * Returns the default parallelism for this execution environment. Note 
that this
    +    * value can be overridden by individual operations using 
[[DataStream.setParallelism]]
    +    */
    +   def getParallelism = javaEnv.getParallelism
    +
    +   /**
    +    * Sets the maximum time frequency (milliseconds) for the flushing of 
the
    +    * output buffers. By default the output buffers flush frequently to 
provide
    +    * low latency and to aid smooth developer experience. Setting the 
parameter
    +    * can result in three logical modes:
    +    *
    +    * <ul>
    +    * <li>
    +    * A positive integer triggers flushing periodically by that 
integer</li>
    +    * <li>
    +    * 0 triggers flushing after every record thus minimizing latency</li>
    +    * <li>
    +    * -1 triggers flushing only when the output buffer is full thus 
maximizing
    +    * throughput</li>
    +    * </ul>
    +    *
    +    */
    +   def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = 
{
    +           javaEnv.setBufferTimeout(timeoutMillis)
    +           this
    +   }
    +
    +   /**
    +    * Gets the default buffer timeout set for this environment
    +    */
    +   def getBufferTimout: Long = javaEnv.getBufferTimeout()
    +
    +   /**
    +    * Method for enabling fault-tolerance. Activates monitoring and backup 
of streaming
    +    * operator states. Time interval between state checkpoints is 
specified in in millis.
    +    *
    +    * Setting this option assumes that the job is used in production and 
thus if not stated
    +    * explicitly otherwise with calling with the
    +    * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} 
method in case of
    +    * failure the job will be resubmitted to the cluster indefinitely.
    +    */
    +   def enableCheckpointing(interval: Long): StreamExecutionEnvironment = {
    +           javaEnv.enableCheckpointing(interval)
    +           this
    +   }
    +
    +   /**
    +    * Method for enabling fault-tolerance. Activates monitoring and backup 
of streaming
    +    * operator states. Time interval between state checkpoints is 
specified in in millis.
    +    *
    +    * Setting this option assumes that the job is used in production and 
thus if not stated
    +    * explicitly otherwise with calling with the
    +    * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} 
method in case of
    +    * failure the job will be resubmitted to the cluster indefinitely.
    +    */
    +   def enableCheckpointing(): StreamExecutionEnvironment = {
    +           javaEnv.enableCheckpointing()
    +           this
    +   }
    +
    +   /**
    +    * Disables operator chaining for streaming operators. Operator chaining
    +    * allows non-shuffle operations to be co-located in the same thread 
fully
    +    * avoiding serialization and de-serialization.
    +    *
    +    */
    +   def disableOperatorChaning(): StreamExecutionEnvironment = {
    +           javaEnv.disableOperatorChaning()
    +           this
    +   }
    +
    +   /**
    +    * Sets the number of times that failed tasks are re-executed. A value 
of zero
    +    * effectively disables fault tolerance. A value of "-1" indicates that 
the system
    +    * default value (as defined in the configuration) should be used.
    +    */
    +   def setNumberOfExecutionRetries(numRetries: Int): Unit = {
    +           javaEnv.setNumberOfExecutionRetries(numRetries)
    +   }
    +
    +   /**
    +    * Gets the number of times the system will try to re-execute failed 
tasks. A value
    +    * of "-1" indicates that the system default value (as defined in the 
configuration)
    +    * should be used.
    +    */
    +   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
    +
    +
    +   /**
    +    * Registers the given type with the serializer at the 
[[KryoSerializer]].
    +    *
    +    * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
    +    * because it may be distributed to the worker nodes by java 
serialization.
    +    */
    +   def registerTypeWithKryoSerializer(clazz: Class[_], serializer: 
Serializer[_]): Unit = {
    +           javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    +   }
    +
    +   /**
    +    * Registers the given type with the serializer at the 
[[KryoSerializer]].
    +    */
    +   def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ 
<: Serializer[_]]) {
    +           javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    +   }
    +
    +
    +   /**
    +    * Registers a default serializer for the given class and its 
sub-classes at Kryo.
    +    */
    +   def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ 
<: Serializer[_]]) {
    +           javaEnv.addDefaultKryoSerializer(clazz, serializer)
    +   }
    +
    +   /**
    +    * Registers a default serializer for the given class and its 
sub-classes at Kryo.
    +    *
    +    * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
    +    * because it may be distributed to the worker nodes by java 
serialization.
    +    */
    +   def registerDefaultKryoSerializer(clazz: Class[_], serializer: 
Serializer[_]): Unit = {
    +           javaEnv.addDefaultKryoSerializer(clazz, serializer)
    +   }
    +
    +   /**
    +    * Registers the given type with the serialization stack. If the type 
is eventually
    +    * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
    +    * type ends up being serialized with Kryo, then it will be registered 
at Kryo to make
    +    * sure that only tags are written.
    +    *
    +    */
    +   def registerType(typeClass: Class[_]) {
    +           javaEnv.registerType(typeClass)
    +   }
    +
    +   /**
    +    * Creates a data stream that represents the Strings produced by 
reading the given file line wise. The file will be
    +    * read with the system's default character set.
    +    *
    +    * @param filePath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path").
    +    * @return The data stream that represents the data read from the given 
file as text lines
    +    */
    +   def readTextFile(filePath: String): DataStream[String] =
    +           javaEnv.readTextFile(filePath)
    +
    +   /**
    +    * Creates a data stream that represents the Strings produced by 
reading the given file line wise. The {@link
    +    * java.nio.charset.Charset} with the given name will be used to read 
the files.
    +    *
    +    * @param filePath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path")
    +    * @param charsetName
    +    * The name of the character set used to read the file
    +    * @return The data stream that represents the data read from the given 
file as text lines
    +    */
    +   def readTextFile(filePath: String, charsetName: String): 
DataStream[String] =
    +           javaEnv.readTextFile(filePath, charsetName)
    +
    +   /**
    +    * Creates a data stream that contains the contents of file created 
while system watches the given path. The file
    +    * will be read with the system's default character set.
    +    *
    +    * @param streamPath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path/")
    +    * @param intervalMillis
    +    * The interval of file watching in milliseconds
    +    * @param watchType
    +    * The watch type of file stream. When watchType is
    +    * { @link 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES},
    +    * the system processes only new files.
    +    * { @link 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED}
    +    * means that the system re-processes all contents of appended file.
    +    * { @link 
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED}
    +    * means that the system processes only appended contents of files.
    +    * @return The DataStream containing the given directory.
    +    */
    +   def readFileStream(streamPath: String, intervalMillis: Long = 100, 
watchType: WatchType =
    +   WatchType.ONLY_NEW_FILES): DataStream[String] =
    +           javaEnv.readFileStream(streamPath, intervalMillis, watchType)
    +
    +   /**
    +    * Creates a data stream that represents the strings produced by 
reading the given file line wise. This method is
    +    * similar to {@link #readTextFile(String)}, but it produces a data 
stream with mutable {@link StringValue}
    +    * objects,
    +    * rather than Java Strings. StringValues can be used to tune 
implementations to be less object and garbage
    +    * collection heavy.
    +    * <p/>
    +    * The file will be read with the system's default character set.
    +    *
    +    * @param filePath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path")
    +    * @return A data stream that represents the data read from the given 
file as text lines
    +    */
    +   def readTextFileWithValue(filePath: String): DataStream[StringValue] =
    +           javaEnv.readTextFileWithValue(filePath)
    +
    +   /**
    +    * Creates a data stream that represents the Strings produced by 
reading the given file line wise. This method is
    +    * similar to {@link #readTextFile(String, String)}, but it produces a 
data stream with mutable {@link StringValue}
    +    * objects, rather than Java Strings. StringValues can be used to tune 
implementations to be less object and
    +    * garbage
    +    * collection heavy.
    +    * <p/>
    +    * The {@link java.nio.charset.Charset} with the given name will be 
used to read the files.
    +    *
    +    * @param filePath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path")
    +    * @param charsetName
    +    * The name of the character set used to read the file
    +    * @param skipInvalidLines
    +    * A flag to indicate whether to skip lines that cannot be read with 
the given character set
    +    * @return A data stream that represents the data read from the given 
file as text lines
    +    */
    +   def readTextFileWithValue(filePath: String, charsetName: String,
    +           skipInvalidLines: Boolean): DataStream[StringValue] =
    +           javaEnv.readTextFileWithValue(filePath, charsetName, 
skipInvalidLines)
    +
    +   /**
    +    * Reads the given file with the given imput format.
    +    *
    +    * @param filePath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path")
    +    * @param inputFormat
    +    * The input format used to create the data stream
    +    * @return The data stream that represents the data read from the given 
file
    +    */
    +   def readFile[T: ClassTag : TypeInformation](inputFormat: 
FileInputFormat[T],
    +           filePath: String): DataStream[T] = {
    +           javaEnv.readFile(inputFormat, filePath)
    +   }
    +
    +   /**
    +    * Creates a data stream that represents the primitive type produced by 
reading the given file line wise.
    +    *
    +    * @param filePath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path")
    +    * @param typeClass
    +    * The primitive type class to be read
    +    * @return A data stream that represents the data read from the given 
file as primitive type
    +    */
    +   def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: 
String, typeClass: Class[T]): DataStream[T] = {
    +           javaEnv.readFileOfPrimitives(filePath, typeClass)
    +   }
    +
    +   /**
    +    * Creates a data stream that represents the primitive type produced by 
reading the given file in delimited way.
    +    *
    +    * @param filePath
    +    * The path of the file, as a URI (e.g., "file:///some/local/file" or 
"hdfs://host:port/file/path")
    +    * @param delimiter
    +    * The delimiter of the given file
    +    * @param typeClass
    +    * The primitive type class to be read
    +    * @return A data stream that represents the data read from the given 
file as primitive type.
    +    */
    +   def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: 
String, delimiter: String, typeClass: Class[T]): DataStream[T] = {
    +           javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
    +   }
    +
    +   /**
    +    * Creates a data stream from the given {@link 
org.apache.hadoop.mapred.FileInputFormat}. A {@link
    +    * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
    +    */
    +   def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: 
org.apache.hadoop.mapred.FileInputFormat[K, V],
    +           key: Class[K], value: Class[V], inputPath: String): 
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +           javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
    +   }
    +
    +   /**
    +    * Creates a data stream from the given {@link 
org.apache.hadoop.mapred.FileInputFormat}. A {@link
    +    * org.apache.hadoop.mapred.JobConf} with the given inputPath is 
created.
    +    */
    +   def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: 
org.apache.hadoop.mapred.FileInputFormat[K, V],
    +           key: Class[K], value: Class[V], inputPath: String, job: 
org.apache.hadoop.mapred.JobConf): 
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +           javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
    +   }
    +
    +   /**
    +    * Creates a data stream from the given {@link 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
    +    * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
    +    */
    +   def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
    +           key: Class[K], value: Class[V], inputPath: String, job: 
org.apache.hadoop.mapreduce.Job): 
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +           javaEnv.readHadoopFile(mapredInputFormat, key, value, 
inputPath, job)
    +   }
    +
    +   /**
    +    * Creates a data stream from the given {@link 
org.apache.hadoop.mapreduce.InputFormat}.
    +    */
    +   def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
    +           key: Class[K], value: Class[V], inputPath: String): 
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +           javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
    +   }
    +
    +   /**
    +    * Creates a new data stream that contains the strings received 
infinitely from a socket. Received strings are
    +    * decoded by the system's default character set. On the termination of 
the socket server connection retries can be
    +    * initiated.
    +    * <p/>
    +    * Let us note that the socket itself does not report on abort and as a 
consequence retries are only initiated when
    +    * the socket was gracefully terminated.
    +    *
    +    * @param hostname
    +    * The host name which a server socket binds
    +    * @param port
    +    * The port number which a server socket binds. A port number of 0 
means that the port number is automatically
    +    * allocated.
    +    * @param delimiter
    +    * A character which splits received strings into records
    +    * @param maxRetry
    +    * The maximal retry interval in seconds while the program waits for a 
socket that is temporarily down.
    +    * Reconnection is initiated every second. A number of 0 means that the 
reader is immediately terminated,
    +    * while
    +    * a    negative value ensures retrying forever.
    +    * @return A data stream containing the strings received from the socket
    +    */
    +   def socketTextStream(hostname: String, port: Int, delimiter: Char, 
maxRetry: Long): DataStream[String] =
    +           javaEnv.socketTextStream(hostname, port, delimiter, maxRetry)
    +
    +   /**
    +    * Creates a new data stream that contains the strings received 
infinitely from a socket. Received strings are
    +    * decoded by the system's default character set. The reader is 
terminated immediately when the socket is down.
    +    *
    +    * @param hostname
    +    * The host name which a server socket binds
    +    * @param port
    +    * The port number which a server socket binds. A port number of 0 
means that the port number is automatically
    +    * allocated.
    +    * @param delimiter
    +    * A character which splits received strings into records
    +    * @return A data stream containing the strings received from the socket
    +    */
    +   def socketTextStream(hostname: String, port: Int, delimiter: Char): 
DataStream[String] =
    +           javaEnv.socketTextStream(hostname, port, delimiter)
    +
    +   /**
    +    * Creates a new data stream that contains the strings received 
infinitely from a socket. Received strings are
    +    * decoded by the system's default character set, using'\n' as 
delimiter. The reader is terminated immediately when
    +    * the socket is down.
    +    *
    +    * @param hostname
    +    * The host name which a server socket binds
    +    * @param port
    +    * The port number which a server socket binds. A port number of 0 
means that the port number is automatically
    +    * allocated.
    +    * @return A data stream containing the strings received from the socket
    +    */
    +   def socketTextStream(hostname: String, port: Int): DataStream[String] =
    +           javaEnv.socketTextStream(hostname, port)
    +
    +   //  <K, V > DataStreamSource[Tuple2[K, V]] 
createHadoopInput(mapredInputFormat: InputFormat[K, V], key: Class[K], value: 
Class[V], job: JobConf) {
    +   //    val hadoopInputFormat: HadoopInputFormat[K, V] = new 
HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
    +   //    return createInput(hadoopInputFormat, 
TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " + "Input 
source")
    +   //  }
    +
    +   /**
    +    * Creates a data stream from the given {@link 
org.apache.hadoop.mapred.InputFormat}.
    +    */
    +   def createHadoopInput[K, V: ClassTag : 
TypeInformation](mapredInputFormat: org.apache.hadoop.mapred.InputFormat[K, V],
    +           key: Class[K], value: Class[V],
    +           job: org.apache.hadoop.mapred.JobConf): 
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] =
    +           javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
    +
    +   /**
    +    * Creates a data stream from the given {@link 
org.apache.hadoop.mapred.InputFormat}.
    +    */
    +   def createHadoopInput[K, V: ClassTag : 
TypeInformation](mapredInputFormat: org.apache.hadoop.mapreduce.InputFormat[K, 
V],
    +           key: Class[K], value: Class[V], inputPath: String, job: 
org.apache.hadoop.mapreduce.Job) =
    +           javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
    +
    +   /**
    +    * Generic method to create an input data stream with {@link 
org.apache.flink.api.common.io.InputFormat}. The data stream will not be 
immediately
    +    * created - instead, this method returns a data stream that will be 
lazily created from the input format once the
    +    * program is executed.
    +    * <p/>
    +    * Since all data streams need specific information about their types, 
this method needs to determine the type of
    +    * the data produced by the input format. It will attempt to determine 
the data type by reflection, unless the
    +    * input
    +    * format implements the {@link org.apache.flink.api.java.typeutils 
.ResultTypeQueryable} interface. In the latter
    +    * case, this method will invoke the {@link 
org.apache.flink.api.java.typeutils
    +    * .ResultTypeQueryable#getProducedType()} method to determine data 
type produced by the input format.
    +    *
    +    * @param inputFormat
    +    * The input format used to create the data stream
    +    * @return The data stream that represents the data created by the 
input format
    +    */
    +   def createInput[T: ClassTag : TypeInformation](inputFormat: 
InputFormat[T, _]): DataStream[T] =
    +           javaEnv.createInput(inputFormat)
    +
    +   /**
    +    * Generic method to create an input data stream with {@link 
org.apache.flink.api.common.io.InputFormat}. The data stream will not be 
immediately
    +    * created - instead, this method returns a data stream that will be 
lazily created from the input format once the
    +    * program is executed.
    +    * <p/>
    +    * The data stream is typed to the given TypeInformation. This method 
is intended for input formats where the
    +    * return
    +    * type cannot be determined by reflection analysis, and that do not 
implement the {@link
    +    * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
    +    *
    +    * @param inputFormat
    +    * The input format used to create the data stream
    +    * @return The data stream that represents the data created by the 
input format
    +    */
    +   def createInput[T: ClassTag : TypeInformation](inputFormat: 
InputFormat[T, _],
    +           typeInfo: TypeInformation[T]): DataStream[T] =
    +           javaEnv.createInput(inputFormat, typeInfo)
    +
    +   //  private[flink] def createInput[T: ClassTag: 
TypeInformation](inputFormat: InputFormat[T,_],
    +   //    typeInfo: TypeInformation[T], sourceName: String): DataStream[T] 
= {
    +   //    val function = new FileSourceFunction[T](inputFormat, typeInfo)
    +   //    val returnStream = javaEnv.addSource(function, 
sourceName).returns(typeInfo)
    +   //    javaEnv.getStreamGraph.setInputFormat(returnStream.getId, 
inputFormat)
    +   //    returnStream
    +   //  }
    +
    +   /**
    +    * Creates a new data stream that contains a sequence of numbers. The 
data stream will be created in parallel, so
    +    * there is no guarantee about the oder of the elements.
    +    *
    +    * @param from
    +    * The number to start at (inclusive)
    +    * @param to
    +    * The number to stop at (inclusive)
    +    * @return A data stream, containing all number in the [from, to] 
interval
    +    */
    +   def generateSequence(from: Long, to: Long): DataStream[Long] = {
    +           new DataStream[java.lang.Long](javaEnv.generateSequence(from, 
to)).
    +                   asInstanceOf[DataStream[Long]]
    +   }
    +
    +   /**
    +    * Creates a new data stream that contains the given elements. The 
elements must all be of the same type, for
    +    * example, all of the {@link String} or {@link Integer}. The sequence 
of elements must not be empty. Furthermore,
    +    * the elements must be serializable (as defined in {@link 
java.io.Serializable}), because the execution
    +    * environment
    +    * may ship the elements into the cluster.
    +    * <p/>
    +    * The framework will try and determine the exact type from the 
elements. In case of generic elements, it may be
    +    * necessary to manually supply the type information via {@link 
#fromCollection(java.util.Collection,
    +    * org.apache.flink.api.common.typeinfo.TypeInformation)}.
    +    * <p/>
    +    * Note that this operation will result in a non-parallel data stream 
source, i.e. a data stream source with a
    +    * degree of parallelism one.
    +    *
    +    * @param data
    +    * The array of elements to create the data stream from.
    +    * @return The data stream representing the given array of elements
    +    */
    +   def fromElements[T: ClassTag : TypeInformation](data: T*): 
DataStream[T] = {
    +           val typeInfo = implicitly[TypeInformation[T]]
    +           fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
    +   }
    +
    +   /**
    +    * Creates a data stream from the given non-empty collection. The type 
of the data stream is that of the
    +    * elements in
    +    * the collection. The elements need to be serializable (as defined by 
{@link java.io.Serializable}), because the
    +    * framework may move the elements into the cluster if needed.
    +    * <p/>
    +    * The framework will try and determine the exact type from the 
collection elements. In case of generic
    +    * elements, it
    +    * may be necessary to manually supply the type information via {@link 
#fromCollection(java.util.Collection,
    +    * org.apache.flink.api.common.typeinfo.TypeInformation)}.
    +    * <p/>
    +    * Note that this operation will result in a non-parallel data stream 
source, i.e. a data stream source with a
    +    * degree of parallelism one.
    +    *
    +    * @param data
    +    * The collection of elements to create the data stream from
    +    * @return The data stream representing the given collection
    +    */
    +   def fromCollection[T: ClassTag : TypeInformation](
    +           data: Seq[T]): DataStream[T] = {
    +           require(data != null, "Data must not be null.")
    +           val typeInfo = implicitly[TypeInformation[T]]
    +
    +           val sourceFunction = new 
FromElementsFunction[T](scala.collection.JavaConversions
    +                   .asJavaCollection(data))
    +
    +           javaEnv.addSource(sourceFunction, "Collection 
source").returns(typeInfo)
    +   }
    +
    +   /**
    +    * Creates a data stream from the given non-empty collection. The type 
of the data stream is the type given by
    +    * typeInfo. The elements need to be serializable (as defined by {@link 
java.io.Serializable}), because the
    +    * framework may move the elements into the cluster if needed.
    +    * <p/>
    +    * Note that this operation will result in a non-parallel data stream 
source, i.e. a data stream source with a
    +    * degree of parallelism one.
    +    *
    +    * @param data
    +    * The collection of elements to create the data stream from
    +    * @param typeInfo
    +    * The TypeInformation for the produced data stream
    +    * @return The data stream representing the given collection
    +    */
    +   def fromCollection[T: ClassTag : TypeInformation](
    +           data: Seq[T], typeInfo: TypeInformation[T]): DataStream[T] = {
    +           require(data != null, "Data must not be null.")
    +
    +           val sourceFunction = new 
FromElementsFunction[T](scala.collection.JavaConversions
    +                   .asJavaCollection(data))
    +
    +           javaEnv.addSource(sourceFunction, "Collection 
source").returns(typeInfo)
    +   }
    +
    +   /**
    +    * Creates a data stream from the given iterator. Because the iterator 
will remain unmodified until the actual
    +    * execution happens, the type of data returned by the iterator must be 
given explicitly in the form of the type
    +    * class (this is due to the fact that the Java compiler erases the 
generic type information).
    +    * <p/>
    +    * The iterator must be serializable (as defined in {@link 
java.io.Serializable}), because the framework may
    +    * move it
    +    * to a remote environment, if needed.
    +    * <p/>
    +    * Note that this operation will result in a non-parallel data stream 
source, i.e. a data stream source with a
    +    * degree of parallelism of one.
    +    *
    +    * @param data
    +    * The iterator of elements to create the data stream from
    +    * @param typeClass
    +    * The class of the data produced by the iterator. Must not be a 
generic class.
    +    * @return The data stream representing the elements in the iterator
    +    * @see #fromCollection(java.util.Iterator, 
org.apache.flink.api.common.typeinfo.TypeInformation)
    +    */
    +   def fromCollection[T: ClassTag : TypeInformation](
    +           data: Iterator[T], typeClass: Class[T]): DataStream[T] = {
    +           fromCollection(data, TypeExtractor.getForClass(typeClass))
    +   }
    +
    +   /**
    +    * Creates a data stream from the given iterator. Because the iterator 
will remain unmodified until the actual
    +    * execution happens, the type of data returned by the iterator must be 
given explicitly in the form of the type
    +    * information. This method is useful for cases where the type is 
generic. In that case, the type class (as
    +    * given in
    +    * {@link #fromCollection(java.util.Iterator, Class)} does not supply 
all type information.
    +    * <p/>
    +    * The iterator must be serializable (as defined in {@link 
java.io.Serializable}), because the framework may
    +    * move it
    +    * to a remote environment, if needed.
    +    * <p/>
    +    * Note that this operation will result in a non-parallel data stream 
source, i.e. a data stream source with a
    +    * degree of parallelism one.
    +    *
    +    * @param data
    +    * The iterator of elements to create the data stream from
    +    * @param typeInfo
    +    * The TypeInformation for the produced data stream
    +    * @return The data stream representing the elements in the iterator
    +    */
    +   def fromCollection[T: ClassTag : TypeInformation](
    +           data: Iterator[T], typeInfo: TypeInformation[T]): DataStream[T] 
= {
    +           require(data != null, "Data must not be null.")
    +           if (!data.isInstanceOf[java.io.Serializable]) {
    +                   throw new IllegalArgumentException("The iterator must 
be serializable.")
    +           }
    +
    +           val sourceFunction = new 
FromIteratorFunction[T](scala.collection.JavaConversions
    +                   .asJavaIterator(data))
    +
    +           javaEnv.addSource(sourceFunction, "Collection 
source").returns(typeInfo)
    +   }
    +
    +   /**
    +    * Creates a new data stream that contains elements in the iterator. 
The iterator is splittable, allowing the
    +    * framework to create a parallel data stream source that returns the 
elements in the iterator. The iterator
    +    * must be
    +    * serializable (as defined in {@link java.io.Serializable}, because 
the execution environment may ship the
    +    * elements
    +    * into the cluster.
    +    * <p/>
    +    * Because the iterator will remain unmodified until the actual 
execution happens, the type of data returned by the
    +    * iterator must be given explicitly in the form of the type class 
(this is due to the fact that the Java compiler
    +    * erases the generic type information).
    +    *
    +    * @param iterator
    +    * The iterator that produces the elements of the data stream
    +    * @param typeClass
    +    * The class of the data produced by the iterator. Must not be a 
generic class.
    +    * @return A data stream representing the elements in the iterator
    +    */
    +   def fromParallelCollection[T: ClassTag : TypeInformation](iterator: 
SplittableIterator[T],
    +           typeClass: Class[T]): DataStream[T] = {
    +           javaEnv.fromParallelCollection(iterator, typeClass)
    +   }
    +
    +   /**
    +    * Creates a new data stream that contains elements in the iterator. 
The iterator is splittable, allowing the
    +    * framework to create a parallel data stream source that returns the 
elements in the iterator. The iterator
    +    * must be
    +    * serializable (as defined in {@link java.io.Serializable}, because 
the execution environment may ship the
    +    * elements
    +    * into the cluster.
    +    * <p/>
    +    * Because the iterator will remain unmodified until the actual 
execution happens, the type of data returned by the
    +    * iterator must be given explicitly in the form of the type 
information. This method is useful for cases where the
    +    * type is generic. In that case, the type class (as given in {@link 
#fromParallelCollection(SplittableIterator,
    +    * Class)} does not supply all type information.
    +    *
    +    * @param iterator
    +    * The iterator that produces the elements of the data stream
    +    * @param typeInfo
    +    * The TypeInformation for the produced data stream.
    +    * @return A data stream representing the elements in the iterator
    +    */
    +   def fromParallelCollection[T: ClassTag : TypeInformation](iterator: 
SplittableIterator[T],
    +           typeInfo: TypeInformation[T]): DataStream[T] = {
    +           javaEnv.fromParallelCollection(iterator, typeInfo)
    +   }
    +
    +   // private helper for passing different names
    +   private[flink] def fromParallelCollection[T: ClassTag : 
TypeInformation](iterator: SplittableIterator[T],
    +           typeInfo: TypeInformation[T], operatorName: String): 
DataStream[T] = {
    +           javaEnv.addSource(new FromIteratorFunction[T](iterator), 
operatorName).returns(typeInfo)
    +   }
    +
    +
    +   /**
    +    * Ads a data source with a custom type information thus opening a 
{@link org.apache.flink.streaming.api
    +    * .datastream.DataStream}. Only in very special cases does the user 
need to support type information. Otherwise
    +    * use
    +    * {@link 
#addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
    +    * <p/>
    +    * By default sources have a parallelism of 1. To enable parallel 
execution, the user defined source should
    +    * implement {@link 
org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or 
extend {@link
    +    * 
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In 
these cases the resulting source
    +    * will have the parallelism of the environment. To change this 
afterwards call {@link
    +    * 
org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
    +    *
    +    * @param function
    +    * the user defined function
    +    * @return the data stream constructed
    +    */
    +   def addSource[T: ClassTag : TypeInformation](function: 
SourceFunction[T]): DataStream[T] = {
    +           require(function != null, "Function must not be null.")
    +           val cleanFun = StreamExecutionEnvironment.clean(function)
    +           javaEnv.addSource(cleanFun)
    +   }
    +
    +
    +   /**
    +    * Ads a data source with a custom type information thus opening a 
{@link org.apache.flink.streaming.api
    +    * .datastream.DataStream}. Only in very special cases does the user 
need to support type information. Otherwise
    +    * use
    +    * {@link 
#addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
    +    *
    +    * @param function
    +    * the user defined function
    +    * @param sourceName
    +    * Name of the data source
    +    * @return the data stream constructed
    +    */
    +   def addSource[T: ClassTag : TypeInformation](function: 
SourceFunction[T],
    +           sourceName: String): DataStream[T] = {
    +           require(function != null, "Function must not be null.")
    +           val typeInfo: TypeInformation[T] =
    +                   if (function.isInstanceOf[ResultTypeQueryable[T]]) {
    +                           
function.asInstanceOf[ResultTypeQueryable[T]].getProducedType
    +                   }
    +                   else {
    +                           
TypeExtractor.createTypeInfo(classOf[SourceFunction[T]], function.getClass, 0, 
null, null)
    +                   }
    +
    +           val isParallel = 
function.isInstanceOf[ParallelSourceFunction[T]]
    +           val cleanFun = StreamExecutionEnvironment.clean(function)
    +           val sourceOperator = new StreamSource[T](cleanFun)
    +           new DataStreamSource[T](this.javaEnv, sourceName, typeInfo, 
sourceOperator,
    +                   isParallel, sourceName)
    +   }
    +
    +   //      @SuppressWarnings("unchecked")
    +   //      private <OUT> DataStreamSource<OUT> 
addSource(SourceFunction<OUT> function,
    +   //              TypeInformation<OUT> typeInfo, String sourceName) {
    +   //
    +   //                      if (typeInfo == null) {
    +   //                              if (function instanceof 
GenericSourceFunction) {
    +   //                                      typeInfo = 
((GenericSourceFunction<OUT>) function).getType();
    +   //                              } else {
    +   //                                      typeInfo = 
TypeExtractor.createTypeInfo(SourceFunction.class,
    +   //                                      function.getClass(), 0, null, 
null);
    +   //                              }
    +   //                      }
    +   //
    +   //                      boolean isParallel = function instanceof 
ParallelSourceFunction;
    +   //
    +   //                      ClosureCleaner.clean(function, true);
    +   //                      StreamOperator<OUT, OUT> sourceOperator = new 
StreamSource<OUT>(function);
    +   //
    +   //      return new DataStreamSource<OUT>(this, sourceName, typeInfo, 
sourceOperator,
    +   //              isParallel, sourceName);
    +   //      }
    +
    +   /**
    +    * Create a DataStream using a user defined source function for 
arbitrary
    +    * source functionality.
    +    *
    +    */
    +   def addSource[T: ClassTag : TypeInformation](function: Collector[T] => 
Unit): DataStream[T] = {
    +           require(function != null, "Function must not be null.")
    +           val sourceFunction = new SourceFunction[T] {
    +                   val cleanFun = 
StreamExecutionEnvironment.clean(function)
    +
    +                   override def run(out: Collector[T]) {
    +                           cleanFun(out)
    +                   }
    +
    +                   override def cancel() = {}
    +           }
    +           addSource(sourceFunction)
    +   }
    +
    +   /**
    +    * Triggers the program execution. The environment will execute all 
parts of
    +    * the program that have resulted in a "sink" operation. Sink 
operations are
    +    * for example printing results or forwarding them to a message queue.
    +    * <p>
    +    * The program execution will be logged and displayed with a generated
    +    * default name.
    +    *
    +    */
    +   def execute() = javaEnv.execute()
    +
    +   /**
    +    * Triggers the program execution. The environment will execute all 
parts of
    +    * the program that have resulted in a "sink" operation. Sink 
operations are
    +    * for example printing results or forwarding them to a message queue.
    +    * <p>
    +    * The program execution will be logged and displayed with the provided 
name
    +    *
    +    */
    +   def execute(jobName: String) = javaEnv.execute(jobName)
    +
    +   /**
    +    * Creates the plan with which the system will execute the program, and
    +    * returns it as a String using a JSON representation of the execution 
data
    +    * flow graph. Note that this needs to be called, before the plan is
    +    * executed.
    +    *
    +    */
    +   def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
    --- End diff --
    
    This change is altering the indentation from spaces to tabs. In scala we 
are using spaces ;)


> Streaming file source/sink API is not in sync with the batch API
> ----------------------------------------------------------------
>
>                 Key: FLINK-1687
>                 URL: https://issues.apache.org/jira/browse/FLINK-1687
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>            Reporter: Gábor Hermann
>            Assignee: Péter Szabó
>
> Streaming environment is missing file inputs like readFile, readCsvFile and 
> also the more general createInput function, and outputs like writeAsCsv and 
> write. Streaming and batch API should be consistent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to