Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/521#discussion_r30683935
--- Diff:
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
---
@@ -19,396 +19,867 @@
package org.apache.flink.streaming.api.scala
import com.esotericsoftware.kryo.Serializer
+import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable,
TypeExtractor}
import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.streaming.api.datastream.DataStreamSource
import
org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment =>
JavaEnv}
import
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import
org.apache.flink.streaming.api.functions.source.{FromElementsFunction,
SourceFunction}
-import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.functions.source._
+import org.apache.flink.streaming.api.operators.StreamSource
+import org.apache.flink.types.StringValue
+import org.apache.flink.util.{Collector, SplittableIterator}
import scala.reflect.ClassTag
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
- * with x parallel instances. This value can be overridden by specific
operations using
- * [[DataStream.setParallelism]].
- * @deprecated Please use [[setParallelism]]
- */
- @deprecated
- def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
- javaEnv.setParallelism(degreeOfParallelism)
- }
-
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
- * with x parallel instances. This value can be overridden by specific
operations using
- * [[DataStream.setParallelism]].
- */
- def setParallelism(parallelism: Int): Unit = {
- javaEnv.setParallelism(parallelism)
- }
-
- /**
- * Returns the default parallelism for this execution environment. Note
that this
- * value can be overridden by individual operations using
[[DataStream.setParallelism]]
- * @deprecated Please use [[getParallelism]]
- */
- @deprecated
- def getDegreeOfParallelism = javaEnv.getParallelism
-
- /**
- * Returns the default parallelism for this execution environment. Note
that this
- * value can be overridden by individual operations using
[[DataStream.setParallelism]]
- */
- def getParallelism = javaEnv.getParallelism
-
- /**
- * Sets the maximum time frequency (milliseconds) for the flushing of the
- * output buffers. By default the output buffers flush frequently to
provide
- * low latency and to aid smooth developer experience. Setting the
parameter
- * can result in three logical modes:
- *
- * <ul>
- * <li>
- * A positive integer triggers flushing periodically by that integer</li>
- * <li>
- * 0 triggers flushing after every record thus minimizing latency</li>
- * <li>
- * -1 triggers flushing only when the output buffer is full thus
maximizing
- * throughput</li>
- * </ul>
- *
- */
- def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
- javaEnv.setBufferTimeout(timeoutMillis)
- this
- }
-
- /**
- * Gets the default buffer timeout set for this environment
- */
- def getBufferTimout: Long = javaEnv.getBufferTimeout()
-
- /**
- * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
- * operator states. Time interval between state checkpoints is specified
in in millis.
- *
- * Setting this option assumes that the job is used in production and
thus if not stated
- * explicitly otherwise with calling with the
- * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
- * failure the job will be resubmitted to the cluster indefinitely.
- */
- def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing(interval)
- this
- }
-
- /**
- * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
- * operator states. Time interval between state checkpoints is specified
in in millis.
- *
- * Setting this option assumes that the job is used in production and
thus if not stated
- * explicitly otherwise with calling with the
- * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
- * failure the job will be resubmitted to the cluster indefinitely.
- */
- def enableCheckpointing() : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing()
- this
- }
-
- /**
- * Disables operator chaining for streaming operators. Operator chaining
- * allows non-shuffle operations to be co-located in the same thread
fully
- * avoiding serialization and de-serialization.
- *
- */
- def disableOperatorChaning(): StreamExecutionEnvironment = {
- javaEnv.disableOperatorChaning()
- this
- }
-
- /**
- * Sets the number of times that failed tasks are re-executed. A value
of zero
- * effectively disables fault tolerance. A value of "-1" indicates that
the system
- * default value (as defined in the configuration) should be used.
- */
- def setNumberOfExecutionRetries(numRetries: Int): Unit = {
- javaEnv.setNumberOfExecutionRetries(numRetries)
- }
-
- /**
- * Gets the number of times the system will try to re-execute failed
tasks. A value
- * of "-1" indicates that the system default value (as defined in the
configuration)
- * should be used.
- */
- def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
-
-
- /**
- * Registers the given type with the serializer at the
[[KryoSerializer]].
- *
- * Note that the serializer instance must be serializable (as defined by
java.io.Serializable),
- * because it may be distributed to the worker nodes by java
serialization.
- */
- def registerTypeWithKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
- javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers the given type with the serializer at the
[[KryoSerializer]].
- */
- def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
- javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
- }
-
-
- /**
- * Registers a default serializer for the given class and its
sub-classes at Kryo.
- */
- def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
- javaEnv.addDefaultKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers a default serializer for the given class and its
sub-classes at Kryo.
- *
- * Note that the serializer instance must be serializable (as defined by
java.io.Serializable),
- * because it may be distributed to the worker nodes by java
serialization.
- */
- def registerDefaultKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
- javaEnv.addDefaultKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers the given type with the serialization stack. If the type is
eventually
- * serialized as a POJO, then the type is registered with the POJO
serializer. If the
- * type ends up being serialized with Kryo, then it will be registered
at Kryo to make
- * sure that only tags are written.
- *
- */
- def registerType(typeClass: Class[_]) {
- javaEnv.registerType(typeClass)
- }
-
- /**
- * Creates a DataStream that represents the Strings produced by reading
the
- * given file line wise. The file will be read with the system's default
- * character set.
- *
- */
- def readTextFile(filePath: String): DataStream[String] =
- javaEnv.readTextFile(filePath)
-
- /**
- * Creates a DataStream that contains the contents of file created while
- * system watches the given path. The file will be read with the system's
- * default character set. The user can check the monitoring interval in
milliseconds,
- * and the way file modifications are handled. By default it checks for
only new files
- * every 100 milliseconds.
- *
- */
- def readFileStream(StreamPath: String, intervalMillis: Long = 100,
watchType: WatchType =
- WatchType.ONLY_NEW_FILES): DataStream[String] =
- javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
-
- /**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set.
- *
- */
- def socketTextStream(hostname: String, port: Int, delimiter: Char):
DataStream[String] =
- javaEnv.socketTextStream(hostname, port, delimiter)
-
- /**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set, uses '\n' as delimiter.
- *
- */
- def socketTextStream(hostname: String, port: Int): DataStream[String] =
- javaEnv.socketTextStream(hostname, port)
-
- /**
- * Creates a new DataStream that contains a sequence of numbers.
- *
- */
- def generateSequence(from: Long, to: Long): DataStream[Long] = {
- new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
- asInstanceOf[DataStream[Long]]
- }
-
- /**
- * Creates a DataStream that contains the given elements. The elements
must all be of the
- * same type and must be serializable.
- *
- * * Note that this operation will result in a non-parallel data source,
i.e. a data source with
- * a parallelism of one.
- */
- def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T]
= {
- val typeInfo = implicitly[TypeInformation[T]]
- fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
- }
-
- /**
- * Creates a DataStream from the given non-empty [[Seq]]. The elements
need to be serializable
- * because the framework may move the elements into the cluster if
needed.
- *
- * Note that this operation will result in a non-parallel data source,
i.e. a data source with
- * a parallelism of one.
- */
- def fromCollection[T: ClassTag: TypeInformation](
- data: Seq[T]): DataStream[T] = {
- require(data != null, "Data must not be null.")
- val typeInfo = implicitly[TypeInformation[T]]
-
- val sourceFunction = new
FromElementsFunction[T](scala.collection.JavaConversions
- .asJavaCollection(data))
-
- javaEnv.addSource(sourceFunction).returns(typeInfo)
- }
-
- /**
- * Create a DataStream using a user defined source function for arbitrary
- * source functionality. By default sources have a parallelism of 1.
- * To enable parallel execution, the user defined source should
implement
- * ParallelSourceFunction or extend RichParallelSourceFunction.
- * In these cases the resulting source will have the parallelism of the
environment.
- * To change this afterwards call DataStreamSource.setParallelism(int)
- *
- */
- def addSource[T: ClassTag: TypeInformation](function:
SourceFunction[T]): DataStream[T] = {
- require(function != null, "Function must not be null.")
- val cleanFun = StreamExecutionEnvironment.clean(function)
- val typeInfo = implicitly[TypeInformation[T]]
- javaEnv.addSource(cleanFun).returns(typeInfo)
- }
-
- /**
- * Create a DataStream using a user defined source function for arbitrary
- * source functionality.
- *
- */
- def addSource[T: ClassTag: TypeInformation](function: Collector[T] =>
Unit): DataStream[T] = {
- require(function != null, "Function must not be null.")
- val sourceFunction = new SourceFunction[T] {
- val cleanFun = StreamExecutionEnvironment.clean(function)
- override def run(out: Collector[T]) {
- cleanFun(out)
- }
- override def cancel() = {}
- }
- addSource(sourceFunction)
- }
-
- /**
- * Triggers the program execution. The environment will execute all
parts of
- * the program that have resulted in a "sink" operation. Sink operations
are
- * for example printing results or forwarding them to a message queue.
- * <p>
- * The program execution will be logged and displayed with a generated
- * default name.
- *
- */
- def execute() = javaEnv.execute()
-
- /**
- * Triggers the program execution. The environment will execute all
parts of
- * the program that have resulted in a "sink" operation. Sink operations
are
- * for example printing results or forwarding them to a message queue.
- * <p>
- * The program execution will be logged and displayed with the provided
name
- *
- */
- def execute(jobName: String) = javaEnv.execute(jobName)
-
- /**
- * Creates the plan with which the system will execute the program, and
- * returns it as a String using a JSON representation of the execution
data
- * flow graph. Note that this needs to be called, before the plan is
- * executed.
- *
- */
- def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
+ /**
+ * Sets the parallelism for operations executed through this
environment.
+ * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
+ * with x parallel instances. This value can be overridden by specific
operations using
+ * [[DataStream.setParallelism]].
+ * @deprecated Please use [[setParallelism]]
+ */
+ @deprecated
+ def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+ javaEnv.setParallelism(degreeOfParallelism)
+ }
+
+ /**
+ * Sets the parallelism for operations executed through this
environment.
+ * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
+ * with x parallel instances. This value can be overridden by specific
operations using
+ * [[DataStream.setParallelism]].
+ */
+ def setParallelism(parallelism: Int): Unit = {
+ javaEnv.setParallelism(parallelism)
+ }
+
+ /**
+ * Returns the default parallelism for this execution environment. Note
that this
+ * value can be overridden by individual operations using
[[DataStream.setParallelism]]
+ * @deprecated Please use [[getParallelism]]
+ */
+ @deprecated
+ def getDegreeOfParallelism = javaEnv.getParallelism
+
+ /**
+ * Returns the default parallelism for this execution environment. Note
that this
+ * value can be overridden by individual operations using
[[DataStream.setParallelism]]
+ */
+ def getParallelism = javaEnv.getParallelism
+
+ /**
+ * Sets the maximum time frequency (milliseconds) for the flushing of
the
+ * output buffers. By default the output buffers flush frequently to
provide
+ * low latency and to aid smooth developer experience. Setting the
parameter
+ * can result in three logical modes:
+ *
+ * <ul>
+ * <li>
+ * A positive integer triggers flushing periodically by that
integer</li>
+ * <li>
+ * 0 triggers flushing after every record thus minimizing latency</li>
+ * <li>
+ * -1 triggers flushing only when the output buffer is full thus
maximizing
+ * throughput</li>
+ * </ul>
+ *
+ */
+ def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment =
{
+ javaEnv.setBufferTimeout(timeoutMillis)
+ this
+ }
+
+ /**
+ * Gets the default buffer timeout set for this environment
+ */
+ def getBufferTimout: Long = javaEnv.getBufferTimeout()
+
+ /**
+ * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
+ * operator states. Time interval between state checkpoints is
specified in in millis.
+ *
+ * Setting this option assumes that the job is used in production and
thus if not stated
+ * explicitly otherwise with calling with the
+ * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
+ * failure the job will be resubmitted to the cluster indefinitely.
+ */
+ def enableCheckpointing(interval: Long): StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing(interval)
+ this
+ }
+
+ /**
+ * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
+ * operator states. Time interval between state checkpoints is
specified in in millis.
+ *
+ * Setting this option assumes that the job is used in production and
thus if not stated
+ * explicitly otherwise with calling with the
+ * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
+ * failure the job will be resubmitted to the cluster indefinitely.
+ */
+ def enableCheckpointing(): StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing()
+ this
+ }
+
+ /**
+ * Disables operator chaining for streaming operators. Operator chaining
+ * allows non-shuffle operations to be co-located in the same thread
fully
+ * avoiding serialization and de-serialization.
+ *
+ */
+ def disableOperatorChaning(): StreamExecutionEnvironment = {
+ javaEnv.disableOperatorChaning()
+ this
+ }
+
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value
of zero
+ * effectively disables fault tolerance. A value of "-1" indicates that
the system
+ * default value (as defined in the configuration) should be used.
+ */
+ def setNumberOfExecutionRetries(numRetries: Int): Unit = {
+ javaEnv.setNumberOfExecutionRetries(numRetries)
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed
tasks. A value
+ * of "-1" indicates that the system default value (as defined in the
configuration)
+ * should be used.
+ */
+ def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
+
+
+ /**
+ * Registers the given type with the serializer at the
[[KryoSerializer]].
+ *
+ * Note that the serializer instance must be serializable (as defined
by java.io.Serializable),
+ * because it may be distributed to the worker nodes by java
serialization.
+ */
+ def registerTypeWithKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
+ javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
+ }
+
+ /**
+ * Registers the given type with the serializer at the
[[KryoSerializer]].
+ */
+ def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
+ javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
+ }
+
+
+ /**
+ * Registers a default serializer for the given class and its
sub-classes at Kryo.
+ */
+ def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
+ javaEnv.addDefaultKryoSerializer(clazz, serializer)
+ }
+
+ /**
+ * Registers a default serializer for the given class and its
sub-classes at Kryo.
+ *
+ * Note that the serializer instance must be serializable (as defined
by java.io.Serializable),
+ * because it may be distributed to the worker nodes by java
serialization.
+ */
+ def registerDefaultKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
+ javaEnv.addDefaultKryoSerializer(clazz, serializer)
+ }
+
+ /**
+ * Registers the given type with the serialization stack. If the type
is eventually
+ * serialized as a POJO, then the type is registered with the POJO
serializer. If the
+ * type ends up being serialized with Kryo, then it will be registered
at Kryo to make
+ * sure that only tags are written.
+ *
+ */
+ def registerType(typeClass: Class[_]) {
+ javaEnv.registerType(typeClass)
+ }
+
+ /**
+ * Creates a data stream that represents the Strings produced by
reading the given file line wise. The file will be
+ * read with the system's default character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path").
+ * @return The data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFile(filePath: String): DataStream[String] =
+ javaEnv.readTextFile(filePath)
+
+ /**
+ * Creates a data stream that represents the Strings produced by
reading the given file line wise. The {@link
+ * java.nio.charset.Charset} with the given name will be used to read
the files.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @return The data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFile(filePath: String, charsetName: String):
DataStream[String] =
+ javaEnv.readTextFile(filePath, charsetName)
+
+ /**
+ * Creates a data stream that contains the contents of file created
while system watches the given path. The file
+ * will be read with the system's default character set.
+ *
+ * @param streamPath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path/")
+ * @param intervalMillis
+ * The interval of file watching in milliseconds
+ * @param watchType
+ * The watch type of file stream. When watchType is
+ * { @link
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES},
+ * the system processes only new files.
+ * { @link
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED}
+ * means that the system re-processes all contents of appended file.
+ * { @link
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED}
+ * means that the system processes only appended contents of files.
+ * @return The DataStream containing the given directory.
+ */
+ def readFileStream(streamPath: String, intervalMillis: Long = 100,
watchType: WatchType =
+ WatchType.ONLY_NEW_FILES): DataStream[String] =
+ javaEnv.readFileStream(streamPath, intervalMillis, watchType)
+
+ /**
+ * Creates a data stream that represents the strings produced by
reading the given file line wise. This method is
+ * similar to {@link #readTextFile(String)}, but it produces a data
stream with mutable {@link StringValue}
+ * objects,
+ * rather than Java Strings. StringValues can be used to tune
implementations to be less object and garbage
+ * collection heavy.
+ * <p/>
+ * The file will be read with the system's default character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @return A data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFileWithValue(filePath: String): DataStream[StringValue] =
+ javaEnv.readTextFileWithValue(filePath)
+
+ /**
+ * Creates a data stream that represents the Strings produced by
reading the given file line wise. This method is
+ * similar to {@link #readTextFile(String, String)}, but it produces a
data stream with mutable {@link StringValue}
+ * objects, rather than Java Strings. StringValues can be used to tune
implementations to be less object and
+ * garbage
+ * collection heavy.
+ * <p/>
+ * The {@link java.nio.charset.Charset} with the given name will be
used to read the files.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @param skipInvalidLines
+ * A flag to indicate whether to skip lines that cannot be read with
the given character set
+ * @return A data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFileWithValue(filePath: String, charsetName: String,
+ skipInvalidLines: Boolean): DataStream[StringValue] =
+ javaEnv.readTextFileWithValue(filePath, charsetName,
skipInvalidLines)
+
+ /**
+ * Reads the given file with the given imput format.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @return The data stream that represents the data read from the given
file
+ */
+ def readFile[T: ClassTag : TypeInformation](inputFormat:
FileInputFormat[T],
+ filePath: String): DataStream[T] = {
+ javaEnv.readFile(inputFormat, filePath)
+ }
+
+ /**
+ * Creates a data stream that represents the primitive type produced by
reading the given file line wise.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param typeClass
+ * The primitive type class to be read
+ * @return A data stream that represents the data read from the given
file as primitive type
+ */
+ def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath:
String, typeClass: Class[T]): DataStream[T] = {
+ javaEnv.readFileOfPrimitives(filePath, typeClass)
+ }
+
+ /**
+ * Creates a data stream that represents the primitive type produced by
reading the given file in delimited way.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param delimiter
+ * The delimiter of the given file
+ * @param typeClass
+ * The primitive type class to be read
+ * @return A data stream that represents the data read from the given
file as primitive type.
+ */
+ def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath:
String, delimiter: String, typeClass: Class[T]): DataStream[T] = {
+ javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapred.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapred.JobConf} with the given inputPath is
created.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapred.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String, job:
org.apache.hadoop.mapred.JobConf):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String, job:
org.apache.hadoop.mapreduce.Job):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value,
inputPath, job)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapreduce.InputFormat}.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received
infinitely from a socket. Received strings are
+ * decoded by the system's default character set. On the termination of
the socket server connection retries can be
+ * initiated.
+ * <p/>
+ * Let us note that the socket itself does not report on abort and as a
consequence retries are only initiated when
+ * the socket was gracefully terminated.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0
means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A character which splits received strings into records
+ * @param maxRetry
+ * The maximal retry interval in seconds while the program waits for a
socket that is temporarily down.
+ * Reconnection is initiated every second. A number of 0 means that the
reader is immediately terminated,
+ * while
+ * a negative value ensures retrying forever.
+ * @return A data stream containing the strings received from the socket
+ */
+ def socketTextStream(hostname: String, port: Int, delimiter: Char,
maxRetry: Long): DataStream[String] =
+ javaEnv.socketTextStream(hostname, port, delimiter, maxRetry)
+
+ /**
+ * Creates a new data stream that contains the strings received
infinitely from a socket. Received strings are
+ * decoded by the system's default character set. The reader is
terminated immediately when the socket is down.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0
means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A character which splits received strings into records
+ * @return A data stream containing the strings received from the socket
+ */
+ def socketTextStream(hostname: String, port: Int, delimiter: Char):
DataStream[String] =
+ javaEnv.socketTextStream(hostname, port, delimiter)
+
+ /**
+ * Creates a new data stream that contains the strings received
infinitely from a socket. Received strings are
+ * decoded by the system's default character set, using'\n' as
delimiter. The reader is terminated immediately when
+ * the socket is down.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0
means that the port number is automatically
+ * allocated.
+ * @return A data stream containing the strings received from the socket
+ */
+ def socketTextStream(hostname: String, port: Int): DataStream[String] =
+ javaEnv.socketTextStream(hostname, port)
+
+ // <K, V > DataStreamSource[Tuple2[K, V]]
createHadoopInput(mapredInputFormat: InputFormat[K, V], key: Class[K], value:
Class[V], job: JobConf) {
+ // val hadoopInputFormat: HadoopInputFormat[K, V] = new
HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+ // return createInput(hadoopInputFormat,
TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " + "Input
source")
+ // }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.InputFormat}.
+ */
+ def createHadoopInput[K, V: ClassTag :
TypeInformation](mapredInputFormat: org.apache.hadoop.mapred.InputFormat[K, V],
+ key: Class[K], value: Class[V],
+ job: org.apache.hadoop.mapred.JobConf):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] =
+ javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.InputFormat}.
+ */
+ def createHadoopInput[K, V: ClassTag :
TypeInformation](mapredInputFormat: org.apache.hadoop.mapreduce.InputFormat[K,
V],
+ key: Class[K], value: Class[V], inputPath: String, job:
org.apache.hadoop.mapreduce.Job) =
+ javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
+
+ /**
+ * Generic method to create an input data stream with {@link
org.apache.flink.api.common.io.InputFormat}. The data stream will not be
immediately
+ * created - instead, this method returns a data stream that will be
lazily created from the input format once the
+ * program is executed.
+ * <p/>
+ * Since all data streams need specific information about their types,
this method needs to determine the type of
+ * the data produced by the input format. It will attempt to determine
the data type by reflection, unless the
+ * input
+ * format implements the {@link org.apache.flink.api.java.typeutils
.ResultTypeQueryable} interface. In the latter
+ * case, this method will invoke the {@link
org.apache.flink.api.java.typeutils
+ * .ResultTypeQueryable#getProducedType()} method to determine data
type produced by the input format.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @return The data stream that represents the data created by the
input format
+ */
+ def createInput[T: ClassTag : TypeInformation](inputFormat:
InputFormat[T, _]): DataStream[T] =
+ javaEnv.createInput(inputFormat)
+
+ /**
+ * Generic method to create an input data stream with {@link
org.apache.flink.api.common.io.InputFormat}. The data stream will not be
immediately
+ * created - instead, this method returns a data stream that will be
lazily created from the input format once the
+ * program is executed.
+ * <p/>
+ * The data stream is typed to the given TypeInformation. This method
is intended for input formats where the
+ * return
+ * type cannot be determined by reflection analysis, and that do not
implement the {@link
+ * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @return The data stream that represents the data created by the
input format
+ */
+ def createInput[T: ClassTag : TypeInformation](inputFormat:
InputFormat[T, _],
+ typeInfo: TypeInformation[T]): DataStream[T] =
+ javaEnv.createInput(inputFormat, typeInfo)
+
+ // private[flink] def createInput[T: ClassTag:
TypeInformation](inputFormat: InputFormat[T,_],
+ // typeInfo: TypeInformation[T], sourceName: String): DataStream[T]
= {
+ // val function = new FileSourceFunction[T](inputFormat, typeInfo)
+ // val returnStream = javaEnv.addSource(function,
sourceName).returns(typeInfo)
+ // javaEnv.getStreamGraph.setInputFormat(returnStream.getId,
inputFormat)
+ // returnStream
+ // }
+
+ /**
+ * Creates a new data stream that contains a sequence of numbers. The
data stream will be created in parallel, so
+ * there is no guarantee about the oder of the elements.
+ *
+ * @param from
+ * The number to start at (inclusive)
+ * @param to
+ * The number to stop at (inclusive)
+ * @return A data stream, containing all number in the [from, to]
interval
+ */
+ def generateSequence(from: Long, to: Long): DataStream[Long] = {
+ new DataStream[java.lang.Long](javaEnv.generateSequence(from,
to)).
+ asInstanceOf[DataStream[Long]]
+ }
+
+ /**
+ * Creates a new data stream that contains the given elements. The
elements must all be of the same type, for
+ * example, all of the {@link String} or {@link Integer}. The sequence
of elements must not be empty. Furthermore,
+ * the elements must be serializable (as defined in {@link
java.io.Serializable}), because the execution
+ * environment
+ * may ship the elements into the cluster.
+ * <p/>
+ * The framework will try and determine the exact type from the
elements. In case of generic elements, it may be
+ * necessary to manually supply the type information via {@link
#fromCollection(java.util.Collection,
+ * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The array of elements to create the data stream from.
+ * @return The data stream representing the given array of elements
+ */
+ def fromElements[T: ClassTag : TypeInformation](data: T*):
DataStream[T] = {
+ val typeInfo = implicitly[TypeInformation[T]]
+ fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection. The type
of the data stream is that of the
+ * elements in
+ * the collection. The elements need to be serializable (as defined by
{@link java.io.Serializable}), because the
+ * framework may move the elements into the cluster if needed.
+ * <p/>
+ * The framework will try and determine the exact type from the
collection elements. In case of generic
+ * elements, it
+ * may be necessary to manually supply the type information via {@link
#fromCollection(java.util.Collection,
+ * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from
+ * @return The data stream representing the given collection
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Seq[T]): DataStream[T] = {
+ require(data != null, "Data must not be null.")
+ val typeInfo = implicitly[TypeInformation[T]]
+
+ val sourceFunction = new
FromElementsFunction[T](scala.collection.JavaConversions
+ .asJavaCollection(data))
+
+ javaEnv.addSource(sourceFunction, "Collection
source").returns(typeInfo)
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection. The type
of the data stream is the type given by
+ * typeInfo. The elements need to be serializable (as defined by {@link
java.io.Serializable}), because the
+ * framework may move the elements into the cluster if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @return The data stream representing the given collection
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Seq[T], typeInfo: TypeInformation[T]): DataStream[T] = {
+ require(data != null, "Data must not be null.")
+
+ val sourceFunction = new
FromElementsFunction[T](scala.collection.JavaConversions
+ .asJavaCollection(data))
+
+ javaEnv.addSource(sourceFunction, "Collection
source").returns(typeInfo)
+ }
+
+ /**
+ * Creates a data stream from the given iterator. Because the iterator
will remain unmodified until the actual
+ * execution happens, the type of data returned by the iterator must be
given explicitly in the form of the type
+ * class (this is due to the fact that the Java compiler erases the
generic type information).
+ * <p/>
+ * The iterator must be serializable (as defined in {@link
java.io.Serializable}), because the framework may
+ * move it
+ * to a remote environment, if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism of one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param typeClass
+ * The class of the data produced by the iterator. Must not be a
generic class.
+ * @return The data stream representing the elements in the iterator
+ * @see #fromCollection(java.util.Iterator,
org.apache.flink.api.common.typeinfo.TypeInformation)
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Iterator[T], typeClass: Class[T]): DataStream[T] = {
+ fromCollection(data, TypeExtractor.getForClass(typeClass))
+ }
+
+ /**
+ * Creates a data stream from the given iterator. Because the iterator
will remain unmodified until the actual
+ * execution happens, the type of data returned by the iterator must be
given explicitly in the form of the type
+ * information. This method is useful for cases where the type is
generic. In that case, the type class (as
+ * given in
+ * {@link #fromCollection(java.util.Iterator, Class)} does not supply
all type information.
+ * <p/>
+ * The iterator must be serializable (as defined in {@link
java.io.Serializable}), because the framework may
+ * move it
+ * to a remote environment, if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @return The data stream representing the elements in the iterator
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Iterator[T], typeInfo: TypeInformation[T]): DataStream[T]
= {
+ require(data != null, "Data must not be null.")
+ if (!data.isInstanceOf[java.io.Serializable]) {
+ throw new IllegalArgumentException("The iterator must
be serializable.")
+ }
+
+ val sourceFunction = new
FromIteratorFunction[T](scala.collection.JavaConversions
+ .asJavaIterator(data))
+
+ javaEnv.addSource(sourceFunction, "Collection
source").returns(typeInfo)
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator.
The iterator is splittable, allowing the
+ * framework to create a parallel data stream source that returns the
elements in the iterator. The iterator
+ * must be
+ * serializable (as defined in {@link java.io.Serializable}, because
the execution environment may ship the
+ * elements
+ * into the cluster.
+ * <p/>
+ * Because the iterator will remain unmodified until the actual
execution happens, the type of data returned by the
+ * iterator must be given explicitly in the form of the type class
(this is due to the fact that the Java compiler
+ * erases the generic type information).
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param typeClass
+ * The class of the data produced by the iterator. Must not be a
generic class.
+ * @return A data stream representing the elements in the iterator
+ */
+ def fromParallelCollection[T: ClassTag : TypeInformation](iterator:
SplittableIterator[T],
+ typeClass: Class[T]): DataStream[T] = {
+ javaEnv.fromParallelCollection(iterator, typeClass)
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator.
The iterator is splittable, allowing the
+ * framework to create a parallel data stream source that returns the
elements in the iterator. The iterator
+ * must be
+ * serializable (as defined in {@link java.io.Serializable}, because
the execution environment may ship the
+ * elements
+ * into the cluster.
+ * <p/>
+ * Because the iterator will remain unmodified until the actual
execution happens, the type of data returned by the
+ * iterator must be given explicitly in the form of the type
information. This method is useful for cases where the
+ * type is generic. In that case, the type class (as given in {@link
#fromParallelCollection(SplittableIterator,
+ * Class)} does not supply all type information.
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param typeInfo
+ * The TypeInformation for the produced data stream.
+ * @return A data stream representing the elements in the iterator
+ */
+ def fromParallelCollection[T: ClassTag : TypeInformation](iterator:
SplittableIterator[T],
+ typeInfo: TypeInformation[T]): DataStream[T] = {
+ javaEnv.fromParallelCollection(iterator, typeInfo)
+ }
+
+ // private helper for passing different names
+ private[flink] def fromParallelCollection[T: ClassTag :
TypeInformation](iterator: SplittableIterator[T],
+ typeInfo: TypeInformation[T], operatorName: String):
DataStream[T] = {
+ javaEnv.addSource(new FromIteratorFunction[T](iterator),
operatorName).returns(typeInfo)
+ }
+
+
+ /**
+ * Ads a data source with a custom type information thus opening a
{@link org.apache.flink.streaming.api
+ * .datastream.DataStream}. Only in very special cases does the user
need to support type information. Otherwise
+ * use
+ * {@link
#addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
+ * <p/>
+ * By default sources have a parallelism of 1. To enable parallel
execution, the user defined source should
+ * implement {@link
org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or
extend {@link
+ *
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In
these cases the resulting source
+ * will have the parallelism of the environment. To change this
afterwards call {@link
+ *
org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
+ *
+ * @param function
+ * the user defined function
+ * @return the data stream constructed
+ */
+ def addSource[T: ClassTag : TypeInformation](function:
SourceFunction[T]): DataStream[T] = {
+ require(function != null, "Function must not be null.")
+ val cleanFun = StreamExecutionEnvironment.clean(function)
+ javaEnv.addSource(cleanFun)
+ }
+
+
+ /**
+ * Ads a data source with a custom type information thus opening a
{@link org.apache.flink.streaming.api
+ * .datastream.DataStream}. Only in very special cases does the user
need to support type information. Otherwise
+ * use
+ * {@link
#addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
+ *
+ * @param function
+ * the user defined function
+ * @param sourceName
+ * Name of the data source
+ * @return the data stream constructed
+ */
+ def addSource[T: ClassTag : TypeInformation](function:
SourceFunction[T],
+ sourceName: String): DataStream[T] = {
+ require(function != null, "Function must not be null.")
+ val typeInfo: TypeInformation[T] =
+ if (function.isInstanceOf[ResultTypeQueryable[T]]) {
+
function.asInstanceOf[ResultTypeQueryable[T]].getProducedType
+ }
+ else {
+
TypeExtractor.createTypeInfo(classOf[SourceFunction[T]], function.getClass, 0,
null, null)
+ }
+
+ val isParallel =
function.isInstanceOf[ParallelSourceFunction[T]]
+ val cleanFun = StreamExecutionEnvironment.clean(function)
+ val sourceOperator = new StreamSource[T](cleanFun)
+ new DataStreamSource[T](this.javaEnv, sourceName, typeInfo,
sourceOperator,
+ isParallel, sourceName)
+ }
+
+ // @SuppressWarnings("unchecked")
+ // private <OUT> DataStreamSource<OUT>
addSource(SourceFunction<OUT> function,
+ // TypeInformation<OUT> typeInfo, String sourceName) {
+ //
+ // if (typeInfo == null) {
+ // if (function instanceof
GenericSourceFunction) {
+ // typeInfo =
((GenericSourceFunction<OUT>) function).getType();
+ // } else {
+ // typeInfo =
TypeExtractor.createTypeInfo(SourceFunction.class,
+ // function.getClass(), 0, null,
null);
+ // }
+ // }
+ //
+ // boolean isParallel = function instanceof
ParallelSourceFunction;
+ //
+ // ClosureCleaner.clean(function, true);
+ // StreamOperator<OUT, OUT> sourceOperator = new
StreamSource<OUT>(function);
+ //
+ // return new DataStreamSource<OUT>(this, sourceName, typeInfo,
sourceOperator,
+ // isParallel, sourceName);
+ // }
+
+ /**
+ * Create a DataStream using a user defined source function for
arbitrary
+ * source functionality.
+ *
+ */
+ def addSource[T: ClassTag : TypeInformation](function: Collector[T] =>
Unit): DataStream[T] = {
+ require(function != null, "Function must not be null.")
+ val sourceFunction = new SourceFunction[T] {
+ val cleanFun =
StreamExecutionEnvironment.clean(function)
+
+ override def run(out: Collector[T]) {
+ cleanFun(out)
+ }
+
+ override def cancel() = {}
+ }
+ addSource(sourceFunction)
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all
parts of
+ * the program that have resulted in a "sink" operation. Sink
operations are
+ * for example printing results or forwarding them to a message queue.
+ * <p>
+ * The program execution will be logged and displayed with a generated
+ * default name.
+ *
+ */
+ def execute() = javaEnv.execute()
+
+ /**
+ * Triggers the program execution. The environment will execute all
parts of
+ * the program that have resulted in a "sink" operation. Sink
operations are
+ * for example printing results or forwarding them to a message queue.
+ * <p>
+ * The program execution will be logged and displayed with the provided
name
+ *
+ */
+ def execute(jobName: String) = javaEnv.execute(jobName)
+
+ /**
+ * Creates the plan with which the system will execute the program, and
+ * returns it as a String using a JSON representation of the execution
data
+ * flow graph. Note that this needs to be called, before the plan is
+ * executed.
+ *
+ */
+ def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
--- End diff --
This change is altering the indentation from spaces to tabs. In scala we
are using spaces ;)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---