[
https://issues.apache.org/jira/browse/FLINK-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14552049#comment-14552049
]
ASF GitHub Bot commented on FLINK-1687:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/521#discussion_r30683935
--- Diff:
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
---
@@ -19,396 +19,867 @@
package org.apache.flink.streaming.api.scala
import com.esotericsoftware.kryo.Serializer
+import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable,
TypeExtractor}
import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.streaming.api.datastream.DataStreamSource
import
org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment =>
JavaEnv}
import
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import
org.apache.flink.streaming.api.functions.source.{FromElementsFunction,
SourceFunction}
-import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.functions.source._
+import org.apache.flink.streaming.api.operators.StreamSource
+import org.apache.flink.types.StringValue
+import org.apache.flink.util.{Collector, SplittableIterator}
import scala.reflect.ClassTag
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
- * with x parallel instances. This value can be overridden by specific
operations using
- * [[DataStream.setParallelism]].
- * @deprecated Please use [[setParallelism]]
- */
- @deprecated
- def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
- javaEnv.setParallelism(degreeOfParallelism)
- }
-
- /**
- * Sets the parallelism for operations executed through this environment.
- * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
- * with x parallel instances. This value can be overridden by specific
operations using
- * [[DataStream.setParallelism]].
- */
- def setParallelism(parallelism: Int): Unit = {
- javaEnv.setParallelism(parallelism)
- }
-
- /**
- * Returns the default parallelism for this execution environment. Note
that this
- * value can be overridden by individual operations using
[[DataStream.setParallelism]]
- * @deprecated Please use [[getParallelism]]
- */
- @deprecated
- def getDegreeOfParallelism = javaEnv.getParallelism
-
- /**
- * Returns the default parallelism for this execution environment. Note
that this
- * value can be overridden by individual operations using
[[DataStream.setParallelism]]
- */
- def getParallelism = javaEnv.getParallelism
-
- /**
- * Sets the maximum time frequency (milliseconds) for the flushing of the
- * output buffers. By default the output buffers flush frequently to
provide
- * low latency and to aid smooth developer experience. Setting the
parameter
- * can result in three logical modes:
- *
- * <ul>
- * <li>
- * A positive integer triggers flushing periodically by that integer</li>
- * <li>
- * 0 triggers flushing after every record thus minimizing latency</li>
- * <li>
- * -1 triggers flushing only when the output buffer is full thus
maximizing
- * throughput</li>
- * </ul>
- *
- */
- def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
- javaEnv.setBufferTimeout(timeoutMillis)
- this
- }
-
- /**
- * Gets the default buffer timeout set for this environment
- */
- def getBufferTimout: Long = javaEnv.getBufferTimeout()
-
- /**
- * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
- * operator states. Time interval between state checkpoints is specified
in in millis.
- *
- * Setting this option assumes that the job is used in production and
thus if not stated
- * explicitly otherwise with calling with the
- * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
- * failure the job will be resubmitted to the cluster indefinitely.
- */
- def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing(interval)
- this
- }
-
- /**
- * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
- * operator states. Time interval between state checkpoints is specified
in in millis.
- *
- * Setting this option assumes that the job is used in production and
thus if not stated
- * explicitly otherwise with calling with the
- * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
- * failure the job will be resubmitted to the cluster indefinitely.
- */
- def enableCheckpointing() : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing()
- this
- }
-
- /**
- * Disables operator chaining for streaming operators. Operator chaining
- * allows non-shuffle operations to be co-located in the same thread
fully
- * avoiding serialization and de-serialization.
- *
- */
- def disableOperatorChaning(): StreamExecutionEnvironment = {
- javaEnv.disableOperatorChaning()
- this
- }
-
- /**
- * Sets the number of times that failed tasks are re-executed. A value
of zero
- * effectively disables fault tolerance. A value of "-1" indicates that
the system
- * default value (as defined in the configuration) should be used.
- */
- def setNumberOfExecutionRetries(numRetries: Int): Unit = {
- javaEnv.setNumberOfExecutionRetries(numRetries)
- }
-
- /**
- * Gets the number of times the system will try to re-execute failed
tasks. A value
- * of "-1" indicates that the system default value (as defined in the
configuration)
- * should be used.
- */
- def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
-
-
- /**
- * Registers the given type with the serializer at the
[[KryoSerializer]].
- *
- * Note that the serializer instance must be serializable (as defined by
java.io.Serializable),
- * because it may be distributed to the worker nodes by java
serialization.
- */
- def registerTypeWithKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
- javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers the given type with the serializer at the
[[KryoSerializer]].
- */
- def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
- javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
- }
-
-
- /**
- * Registers a default serializer for the given class and its
sub-classes at Kryo.
- */
- def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
- javaEnv.addDefaultKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers a default serializer for the given class and its
sub-classes at Kryo.
- *
- * Note that the serializer instance must be serializable (as defined by
java.io.Serializable),
- * because it may be distributed to the worker nodes by java
serialization.
- */
- def registerDefaultKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
- javaEnv.addDefaultKryoSerializer(clazz, serializer)
- }
-
- /**
- * Registers the given type with the serialization stack. If the type is
eventually
- * serialized as a POJO, then the type is registered with the POJO
serializer. If the
- * type ends up being serialized with Kryo, then it will be registered
at Kryo to make
- * sure that only tags are written.
- *
- */
- def registerType(typeClass: Class[_]) {
- javaEnv.registerType(typeClass)
- }
-
- /**
- * Creates a DataStream that represents the Strings produced by reading
the
- * given file line wise. The file will be read with the system's default
- * character set.
- *
- */
- def readTextFile(filePath: String): DataStream[String] =
- javaEnv.readTextFile(filePath)
-
- /**
- * Creates a DataStream that contains the contents of file created while
- * system watches the given path. The file will be read with the system's
- * default character set. The user can check the monitoring interval in
milliseconds,
- * and the way file modifications are handled. By default it checks for
only new files
- * every 100 milliseconds.
- *
- */
- def readFileStream(StreamPath: String, intervalMillis: Long = 100,
watchType: WatchType =
- WatchType.ONLY_NEW_FILES): DataStream[String] =
- javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
-
- /**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set.
- *
- */
- def socketTextStream(hostname: String, port: Int, delimiter: Char):
DataStream[String] =
- javaEnv.socketTextStream(hostname, port, delimiter)
-
- /**
- * Creates a new DataStream that contains the strings received infinitely
- * from socket. Received strings are decoded by the system's default
- * character set, uses '\n' as delimiter.
- *
- */
- def socketTextStream(hostname: String, port: Int): DataStream[String] =
- javaEnv.socketTextStream(hostname, port)
-
- /**
- * Creates a new DataStream that contains a sequence of numbers.
- *
- */
- def generateSequence(from: Long, to: Long): DataStream[Long] = {
- new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
- asInstanceOf[DataStream[Long]]
- }
-
- /**
- * Creates a DataStream that contains the given elements. The elements
must all be of the
- * same type and must be serializable.
- *
- * * Note that this operation will result in a non-parallel data source,
i.e. a data source with
- * a parallelism of one.
- */
- def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T]
= {
- val typeInfo = implicitly[TypeInformation[T]]
- fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
- }
-
- /**
- * Creates a DataStream from the given non-empty [[Seq]]. The elements
need to be serializable
- * because the framework may move the elements into the cluster if
needed.
- *
- * Note that this operation will result in a non-parallel data source,
i.e. a data source with
- * a parallelism of one.
- */
- def fromCollection[T: ClassTag: TypeInformation](
- data: Seq[T]): DataStream[T] = {
- require(data != null, "Data must not be null.")
- val typeInfo = implicitly[TypeInformation[T]]
-
- val sourceFunction = new
FromElementsFunction[T](scala.collection.JavaConversions
- .asJavaCollection(data))
-
- javaEnv.addSource(sourceFunction).returns(typeInfo)
- }
-
- /**
- * Create a DataStream using a user defined source function for arbitrary
- * source functionality. By default sources have a parallelism of 1.
- * To enable parallel execution, the user defined source should
implement
- * ParallelSourceFunction or extend RichParallelSourceFunction.
- * In these cases the resulting source will have the parallelism of the
environment.
- * To change this afterwards call DataStreamSource.setParallelism(int)
- *
- */
- def addSource[T: ClassTag: TypeInformation](function:
SourceFunction[T]): DataStream[T] = {
- require(function != null, "Function must not be null.")
- val cleanFun = StreamExecutionEnvironment.clean(function)
- val typeInfo = implicitly[TypeInformation[T]]
- javaEnv.addSource(cleanFun).returns(typeInfo)
- }
-
- /**
- * Create a DataStream using a user defined source function for arbitrary
- * source functionality.
- *
- */
- def addSource[T: ClassTag: TypeInformation](function: Collector[T] =>
Unit): DataStream[T] = {
- require(function != null, "Function must not be null.")
- val sourceFunction = new SourceFunction[T] {
- val cleanFun = StreamExecutionEnvironment.clean(function)
- override def run(out: Collector[T]) {
- cleanFun(out)
- }
- override def cancel() = {}
- }
- addSource(sourceFunction)
- }
-
- /**
- * Triggers the program execution. The environment will execute all
parts of
- * the program that have resulted in a "sink" operation. Sink operations
are
- * for example printing results or forwarding them to a message queue.
- * <p>
- * The program execution will be logged and displayed with a generated
- * default name.
- *
- */
- def execute() = javaEnv.execute()
-
- /**
- * Triggers the program execution. The environment will execute all
parts of
- * the program that have resulted in a "sink" operation. Sink operations
are
- * for example printing results or forwarding them to a message queue.
- * <p>
- * The program execution will be logged and displayed with the provided
name
- *
- */
- def execute(jobName: String) = javaEnv.execute(jobName)
-
- /**
- * Creates the plan with which the system will execute the program, and
- * returns it as a String using a JSON representation of the execution
data
- * flow graph. Note that this needs to be called, before the plan is
- * executed.
- *
- */
- def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
+ /**
+ * Sets the parallelism for operations executed through this
environment.
+ * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
+ * with x parallel instances. This value can be overridden by specific
operations using
+ * [[DataStream.setParallelism]].
+ * @deprecated Please use [[setParallelism]]
+ */
+ @deprecated
+ def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+ javaEnv.setParallelism(degreeOfParallelism)
+ }
+
+ /**
+ * Sets the parallelism for operations executed through this
environment.
+ * Setting a parallelism of x here will cause all operators (such as
join, map, reduce) to run
+ * with x parallel instances. This value can be overridden by specific
operations using
+ * [[DataStream.setParallelism]].
+ */
+ def setParallelism(parallelism: Int): Unit = {
+ javaEnv.setParallelism(parallelism)
+ }
+
+ /**
+ * Returns the default parallelism for this execution environment. Note
that this
+ * value can be overridden by individual operations using
[[DataStream.setParallelism]]
+ * @deprecated Please use [[getParallelism]]
+ */
+ @deprecated
+ def getDegreeOfParallelism = javaEnv.getParallelism
+
+ /**
+ * Returns the default parallelism for this execution environment. Note
that this
+ * value can be overridden by individual operations using
[[DataStream.setParallelism]]
+ */
+ def getParallelism = javaEnv.getParallelism
+
+ /**
+ * Sets the maximum time frequency (milliseconds) for the flushing of
the
+ * output buffers. By default the output buffers flush frequently to
provide
+ * low latency and to aid smooth developer experience. Setting the
parameter
+ * can result in three logical modes:
+ *
+ * <ul>
+ * <li>
+ * A positive integer triggers flushing periodically by that
integer</li>
+ * <li>
+ * 0 triggers flushing after every record thus minimizing latency</li>
+ * <li>
+ * -1 triggers flushing only when the output buffer is full thus
maximizing
+ * throughput</li>
+ * </ul>
+ *
+ */
+ def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment =
{
+ javaEnv.setBufferTimeout(timeoutMillis)
+ this
+ }
+
+ /**
+ * Gets the default buffer timeout set for this environment
+ */
+ def getBufferTimout: Long = javaEnv.getBufferTimeout()
+
+ /**
+ * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
+ * operator states. Time interval between state checkpoints is
specified in in millis.
+ *
+ * Setting this option assumes that the job is used in production and
thus if not stated
+ * explicitly otherwise with calling with the
+ * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
+ * failure the job will be resubmitted to the cluster indefinitely.
+ */
+ def enableCheckpointing(interval: Long): StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing(interval)
+ this
+ }
+
+ /**
+ * Method for enabling fault-tolerance. Activates monitoring and backup
of streaming
+ * operator states. Time interval between state checkpoints is
specified in in millis.
+ *
+ * Setting this option assumes that the job is used in production and
thus if not stated
+ * explicitly otherwise with calling with the
+ * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)}
method in case of
+ * failure the job will be resubmitted to the cluster indefinitely.
+ */
+ def enableCheckpointing(): StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing()
+ this
+ }
+
+ /**
+ * Disables operator chaining for streaming operators. Operator chaining
+ * allows non-shuffle operations to be co-located in the same thread
fully
+ * avoiding serialization and de-serialization.
+ *
+ */
+ def disableOperatorChaning(): StreamExecutionEnvironment = {
+ javaEnv.disableOperatorChaning()
+ this
+ }
+
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value
of zero
+ * effectively disables fault tolerance. A value of "-1" indicates that
the system
+ * default value (as defined in the configuration) should be used.
+ */
+ def setNumberOfExecutionRetries(numRetries: Int): Unit = {
+ javaEnv.setNumberOfExecutionRetries(numRetries)
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed
tasks. A value
+ * of "-1" indicates that the system default value (as defined in the
configuration)
+ * should be used.
+ */
+ def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
+
+
+ /**
+ * Registers the given type with the serializer at the
[[KryoSerializer]].
+ *
+ * Note that the serializer instance must be serializable (as defined
by java.io.Serializable),
+ * because it may be distributed to the worker nodes by java
serialization.
+ */
+ def registerTypeWithKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
+ javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
+ }
+
+ /**
+ * Registers the given type with the serializer at the
[[KryoSerializer]].
+ */
+ def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
+ javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
+ }
+
+
+ /**
+ * Registers a default serializer for the given class and its
sub-classes at Kryo.
+ */
+ def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_
<: Serializer[_]]) {
+ javaEnv.addDefaultKryoSerializer(clazz, serializer)
+ }
+
+ /**
+ * Registers a default serializer for the given class and its
sub-classes at Kryo.
+ *
+ * Note that the serializer instance must be serializable (as defined
by java.io.Serializable),
+ * because it may be distributed to the worker nodes by java
serialization.
+ */
+ def registerDefaultKryoSerializer(clazz: Class[_], serializer:
Serializer[_]): Unit = {
+ javaEnv.addDefaultKryoSerializer(clazz, serializer)
+ }
+
+ /**
+ * Registers the given type with the serialization stack. If the type
is eventually
+ * serialized as a POJO, then the type is registered with the POJO
serializer. If the
+ * type ends up being serialized with Kryo, then it will be registered
at Kryo to make
+ * sure that only tags are written.
+ *
+ */
+ def registerType(typeClass: Class[_]) {
+ javaEnv.registerType(typeClass)
+ }
+
+ /**
+ * Creates a data stream that represents the Strings produced by
reading the given file line wise. The file will be
+ * read with the system's default character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path").
+ * @return The data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFile(filePath: String): DataStream[String] =
+ javaEnv.readTextFile(filePath)
+
+ /**
+ * Creates a data stream that represents the Strings produced by
reading the given file line wise. The {@link
+ * java.nio.charset.Charset} with the given name will be used to read
the files.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @return The data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFile(filePath: String, charsetName: String):
DataStream[String] =
+ javaEnv.readTextFile(filePath, charsetName)
+
+ /**
+ * Creates a data stream that contains the contents of file created
while system watches the given path. The file
+ * will be read with the system's default character set.
+ *
+ * @param streamPath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path/")
+ * @param intervalMillis
+ * The interval of file watching in milliseconds
+ * @param watchType
+ * The watch type of file stream. When watchType is
+ * { @link
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES},
+ * the system processes only new files.
+ * { @link
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED}
+ * means that the system re-processes all contents of appended file.
+ * { @link
org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED}
+ * means that the system processes only appended contents of files.
+ * @return The DataStream containing the given directory.
+ */
+ def readFileStream(streamPath: String, intervalMillis: Long = 100,
watchType: WatchType =
+ WatchType.ONLY_NEW_FILES): DataStream[String] =
+ javaEnv.readFileStream(streamPath, intervalMillis, watchType)
+
+ /**
+ * Creates a data stream that represents the strings produced by
reading the given file line wise. This method is
+ * similar to {@link #readTextFile(String)}, but it produces a data
stream with mutable {@link StringValue}
+ * objects,
+ * rather than Java Strings. StringValues can be used to tune
implementations to be less object and garbage
+ * collection heavy.
+ * <p/>
+ * The file will be read with the system's default character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @return A data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFileWithValue(filePath: String): DataStream[StringValue] =
+ javaEnv.readTextFileWithValue(filePath)
+
+ /**
+ * Creates a data stream that represents the Strings produced by
reading the given file line wise. This method is
+ * similar to {@link #readTextFile(String, String)}, but it produces a
data stream with mutable {@link StringValue}
+ * objects, rather than Java Strings. StringValues can be used to tune
implementations to be less object and
+ * garbage
+ * collection heavy.
+ * <p/>
+ * The {@link java.nio.charset.Charset} with the given name will be
used to read the files.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param charsetName
+ * The name of the character set used to read the file
+ * @param skipInvalidLines
+ * A flag to indicate whether to skip lines that cannot be read with
the given character set
+ * @return A data stream that represents the data read from the given
file as text lines
+ */
+ def readTextFileWithValue(filePath: String, charsetName: String,
+ skipInvalidLines: Boolean): DataStream[StringValue] =
+ javaEnv.readTextFileWithValue(filePath, charsetName,
skipInvalidLines)
+
+ /**
+ * Reads the given file with the given imput format.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @return The data stream that represents the data read from the given
file
+ */
+ def readFile[T: ClassTag : TypeInformation](inputFormat:
FileInputFormat[T],
+ filePath: String): DataStream[T] = {
+ javaEnv.readFile(inputFormat, filePath)
+ }
+
+ /**
+ * Creates a data stream that represents the primitive type produced by
reading the given file line wise.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param typeClass
+ * The primitive type class to be read
+ * @return A data stream that represents the data read from the given
file as primitive type
+ */
+ def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath:
String, typeClass: Class[T]): DataStream[T] = {
+ javaEnv.readFileOfPrimitives(filePath, typeClass)
+ }
+
+ /**
+ * Creates a data stream that represents the primitive type produced by
reading the given file in delimited way.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g., "file:///some/local/file" or
"hdfs://host:port/file/path")
+ * @param delimiter
+ * The delimiter of the given file
+ * @param typeClass
+ * The primitive type class to be read
+ * @return A data stream that represents the data read from the given
file as primitive type.
+ */
+ def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath:
String, delimiter: String, typeClass: Class[T]): DataStream[T] = {
+ javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapred.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapred.JobConf} with the given inputPath is
created.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapred.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String, job:
org.apache.hadoop.mapred.JobConf):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
+ * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String, job:
org.apache.hadoop.mapreduce.Job):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value,
inputPath, job)
+ }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapreduce.InputFormat}.
+ */
+ def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat:
org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
+ key: Class[K], value: Class[V], inputPath: String):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
+ javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
+ }
+
+ /**
+ * Creates a new data stream that contains the strings received
infinitely from a socket. Received strings are
+ * decoded by the system's default character set. On the termination of
the socket server connection retries can be
+ * initiated.
+ * <p/>
+ * Let us note that the socket itself does not report on abort and as a
consequence retries are only initiated when
+ * the socket was gracefully terminated.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0
means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A character which splits received strings into records
+ * @param maxRetry
+ * The maximal retry interval in seconds while the program waits for a
socket that is temporarily down.
+ * Reconnection is initiated every second. A number of 0 means that the
reader is immediately terminated,
+ * while
+ * a negative value ensures retrying forever.
+ * @return A data stream containing the strings received from the socket
+ */
+ def socketTextStream(hostname: String, port: Int, delimiter: Char,
maxRetry: Long): DataStream[String] =
+ javaEnv.socketTextStream(hostname, port, delimiter, maxRetry)
+
+ /**
+ * Creates a new data stream that contains the strings received
infinitely from a socket. Received strings are
+ * decoded by the system's default character set. The reader is
terminated immediately when the socket is down.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0
means that the port number is automatically
+ * allocated.
+ * @param delimiter
+ * A character which splits received strings into records
+ * @return A data stream containing the strings received from the socket
+ */
+ def socketTextStream(hostname: String, port: Int, delimiter: Char):
DataStream[String] =
+ javaEnv.socketTextStream(hostname, port, delimiter)
+
+ /**
+ * Creates a new data stream that contains the strings received
infinitely from a socket. Received strings are
+ * decoded by the system's default character set, using'\n' as
delimiter. The reader is terminated immediately when
+ * the socket is down.
+ *
+ * @param hostname
+ * The host name which a server socket binds
+ * @param port
+ * The port number which a server socket binds. A port number of 0
means that the port number is automatically
+ * allocated.
+ * @return A data stream containing the strings received from the socket
+ */
+ def socketTextStream(hostname: String, port: Int): DataStream[String] =
+ javaEnv.socketTextStream(hostname, port)
+
+ // <K, V > DataStreamSource[Tuple2[K, V]]
createHadoopInput(mapredInputFormat: InputFormat[K, V], key: Class[K], value:
Class[V], job: JobConf) {
+ // val hadoopInputFormat: HadoopInputFormat[K, V] = new
HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+ // return createInput(hadoopInputFormat,
TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " + "Input
source")
+ // }
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.InputFormat}.
+ */
+ def createHadoopInput[K, V: ClassTag :
TypeInformation](mapredInputFormat: org.apache.hadoop.mapred.InputFormat[K, V],
+ key: Class[K], value: Class[V],
+ job: org.apache.hadoop.mapred.JobConf):
DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] =
+ javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
+
+ /**
+ * Creates a data stream from the given {@link
org.apache.hadoop.mapred.InputFormat}.
+ */
+ def createHadoopInput[K, V: ClassTag :
TypeInformation](mapredInputFormat: org.apache.hadoop.mapreduce.InputFormat[K,
V],
+ key: Class[K], value: Class[V], inputPath: String, job:
org.apache.hadoop.mapreduce.Job) =
+ javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
+
+ /**
+ * Generic method to create an input data stream with {@link
org.apache.flink.api.common.io.InputFormat}. The data stream will not be
immediately
+ * created - instead, this method returns a data stream that will be
lazily created from the input format once the
+ * program is executed.
+ * <p/>
+ * Since all data streams need specific information about their types,
this method needs to determine the type of
+ * the data produced by the input format. It will attempt to determine
the data type by reflection, unless the
+ * input
+ * format implements the {@link org.apache.flink.api.java.typeutils
.ResultTypeQueryable} interface. In the latter
+ * case, this method will invoke the {@link
org.apache.flink.api.java.typeutils
+ * .ResultTypeQueryable#getProducedType()} method to determine data
type produced by the input format.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @return The data stream that represents the data created by the
input format
+ */
+ def createInput[T: ClassTag : TypeInformation](inputFormat:
InputFormat[T, _]): DataStream[T] =
+ javaEnv.createInput(inputFormat)
+
+ /**
+ * Generic method to create an input data stream with {@link
org.apache.flink.api.common.io.InputFormat}. The data stream will not be
immediately
+ * created - instead, this method returns a data stream that will be
lazily created from the input format once the
+ * program is executed.
+ * <p/>
+ * The data stream is typed to the given TypeInformation. This method
is intended for input formats where the
+ * return
+ * type cannot be determined by reflection analysis, and that do not
implement the {@link
+ * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+ *
+ * @param inputFormat
+ * The input format used to create the data stream
+ * @return The data stream that represents the data created by the
input format
+ */
+ def createInput[T: ClassTag : TypeInformation](inputFormat:
InputFormat[T, _],
+ typeInfo: TypeInformation[T]): DataStream[T] =
+ javaEnv.createInput(inputFormat, typeInfo)
+
+ // private[flink] def createInput[T: ClassTag:
TypeInformation](inputFormat: InputFormat[T,_],
+ // typeInfo: TypeInformation[T], sourceName: String): DataStream[T]
= {
+ // val function = new FileSourceFunction[T](inputFormat, typeInfo)
+ // val returnStream = javaEnv.addSource(function,
sourceName).returns(typeInfo)
+ // javaEnv.getStreamGraph.setInputFormat(returnStream.getId,
inputFormat)
+ // returnStream
+ // }
+
+ /**
+ * Creates a new data stream that contains a sequence of numbers. The
data stream will be created in parallel, so
+ * there is no guarantee about the oder of the elements.
+ *
+ * @param from
+ * The number to start at (inclusive)
+ * @param to
+ * The number to stop at (inclusive)
+ * @return A data stream, containing all number in the [from, to]
interval
+ */
+ def generateSequence(from: Long, to: Long): DataStream[Long] = {
+ new DataStream[java.lang.Long](javaEnv.generateSequence(from,
to)).
+ asInstanceOf[DataStream[Long]]
+ }
+
+ /**
+ * Creates a new data stream that contains the given elements. The
elements must all be of the same type, for
+ * example, all of the {@link String} or {@link Integer}. The sequence
of elements must not be empty. Furthermore,
+ * the elements must be serializable (as defined in {@link
java.io.Serializable}), because the execution
+ * environment
+ * may ship the elements into the cluster.
+ * <p/>
+ * The framework will try and determine the exact type from the
elements. In case of generic elements, it may be
+ * necessary to manually supply the type information via {@link
#fromCollection(java.util.Collection,
+ * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The array of elements to create the data stream from.
+ * @return The data stream representing the given array of elements
+ */
+ def fromElements[T: ClassTag : TypeInformation](data: T*):
DataStream[T] = {
+ val typeInfo = implicitly[TypeInformation[T]]
+ fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection. The type
of the data stream is that of the
+ * elements in
+ * the collection. The elements need to be serializable (as defined by
{@link java.io.Serializable}), because the
+ * framework may move the elements into the cluster if needed.
+ * <p/>
+ * The framework will try and determine the exact type from the
collection elements. In case of generic
+ * elements, it
+ * may be necessary to manually supply the type information via {@link
#fromCollection(java.util.Collection,
+ * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from
+ * @return The data stream representing the given collection
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Seq[T]): DataStream[T] = {
+ require(data != null, "Data must not be null.")
+ val typeInfo = implicitly[TypeInformation[T]]
+
+ val sourceFunction = new
FromElementsFunction[T](scala.collection.JavaConversions
+ .asJavaCollection(data))
+
+ javaEnv.addSource(sourceFunction, "Collection
source").returns(typeInfo)
+ }
+
+ /**
+ * Creates a data stream from the given non-empty collection. The type
of the data stream is the type given by
+ * typeInfo. The elements need to be serializable (as defined by {@link
java.io.Serializable}), because the
+ * framework may move the elements into the cluster if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The collection of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @return The data stream representing the given collection
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Seq[T], typeInfo: TypeInformation[T]): DataStream[T] = {
+ require(data != null, "Data must not be null.")
+
+ val sourceFunction = new
FromElementsFunction[T](scala.collection.JavaConversions
+ .asJavaCollection(data))
+
+ javaEnv.addSource(sourceFunction, "Collection
source").returns(typeInfo)
+ }
+
+ /**
+ * Creates a data stream from the given iterator. Because the iterator
will remain unmodified until the actual
+ * execution happens, the type of data returned by the iterator must be
given explicitly in the form of the type
+ * class (this is due to the fact that the Java compiler erases the
generic type information).
+ * <p/>
+ * The iterator must be serializable (as defined in {@link
java.io.Serializable}), because the framework may
+ * move it
+ * to a remote environment, if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism of one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param typeClass
+ * The class of the data produced by the iterator. Must not be a
generic class.
+ * @return The data stream representing the elements in the iterator
+ * @see #fromCollection(java.util.Iterator,
org.apache.flink.api.common.typeinfo.TypeInformation)
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Iterator[T], typeClass: Class[T]): DataStream[T] = {
+ fromCollection(data, TypeExtractor.getForClass(typeClass))
+ }
+
+ /**
+ * Creates a data stream from the given iterator. Because the iterator
will remain unmodified until the actual
+ * execution happens, the type of data returned by the iterator must be
given explicitly in the form of the type
+ * information. This method is useful for cases where the type is
generic. In that case, the type class (as
+ * given in
+ * {@link #fromCollection(java.util.Iterator, Class)} does not supply
all type information.
+ * <p/>
+ * The iterator must be serializable (as defined in {@link
java.io.Serializable}), because the framework may
+ * move it
+ * to a remote environment, if needed.
+ * <p/>
+ * Note that this operation will result in a non-parallel data stream
source, i.e. a data stream source with a
+ * degree of parallelism one.
+ *
+ * @param data
+ * The iterator of elements to create the data stream from
+ * @param typeInfo
+ * The TypeInformation for the produced data stream
+ * @return The data stream representing the elements in the iterator
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Iterator[T], typeInfo: TypeInformation[T]): DataStream[T]
= {
+ require(data != null, "Data must not be null.")
+ if (!data.isInstanceOf[java.io.Serializable]) {
+ throw new IllegalArgumentException("The iterator must
be serializable.")
+ }
+
+ val sourceFunction = new
FromIteratorFunction[T](scala.collection.JavaConversions
+ .asJavaIterator(data))
+
+ javaEnv.addSource(sourceFunction, "Collection
source").returns(typeInfo)
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator.
The iterator is splittable, allowing the
+ * framework to create a parallel data stream source that returns the
elements in the iterator. The iterator
+ * must be
+ * serializable (as defined in {@link java.io.Serializable}, because
the execution environment may ship the
+ * elements
+ * into the cluster.
+ * <p/>
+ * Because the iterator will remain unmodified until the actual
execution happens, the type of data returned by the
+ * iterator must be given explicitly in the form of the type class
(this is due to the fact that the Java compiler
+ * erases the generic type information).
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param typeClass
+ * The class of the data produced by the iterator. Must not be a
generic class.
+ * @return A data stream representing the elements in the iterator
+ */
+ def fromParallelCollection[T: ClassTag : TypeInformation](iterator:
SplittableIterator[T],
+ typeClass: Class[T]): DataStream[T] = {
+ javaEnv.fromParallelCollection(iterator, typeClass)
+ }
+
+ /**
+ * Creates a new data stream that contains elements in the iterator.
The iterator is splittable, allowing the
+ * framework to create a parallel data stream source that returns the
elements in the iterator. The iterator
+ * must be
+ * serializable (as defined in {@link java.io.Serializable}, because
the execution environment may ship the
+ * elements
+ * into the cluster.
+ * <p/>
+ * Because the iterator will remain unmodified until the actual
execution happens, the type of data returned by the
+ * iterator must be given explicitly in the form of the type
information. This method is useful for cases where the
+ * type is generic. In that case, the type class (as given in {@link
#fromParallelCollection(SplittableIterator,
+ * Class)} does not supply all type information.
+ *
+ * @param iterator
+ * The iterator that produces the elements of the data stream
+ * @param typeInfo
+ * The TypeInformation for the produced data stream.
+ * @return A data stream representing the elements in the iterator
+ */
+ def fromParallelCollection[T: ClassTag : TypeInformation](iterator:
SplittableIterator[T],
+ typeInfo: TypeInformation[T]): DataStream[T] = {
+ javaEnv.fromParallelCollection(iterator, typeInfo)
+ }
+
+ // private helper for passing different names
+ private[flink] def fromParallelCollection[T: ClassTag :
TypeInformation](iterator: SplittableIterator[T],
+ typeInfo: TypeInformation[T], operatorName: String):
DataStream[T] = {
+ javaEnv.addSource(new FromIteratorFunction[T](iterator),
operatorName).returns(typeInfo)
+ }
+
+
+ /**
+ * Ads a data source with a custom type information thus opening a
{@link org.apache.flink.streaming.api
+ * .datastream.DataStream}. Only in very special cases does the user
need to support type information. Otherwise
+ * use
+ * {@link
#addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
+ * <p/>
+ * By default sources have a parallelism of 1. To enable parallel
execution, the user defined source should
+ * implement {@link
org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or
extend {@link
+ *
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In
these cases the resulting source
+ * will have the parallelism of the environment. To change this
afterwards call {@link
+ *
org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
+ *
+ * @param function
+ * the user defined function
+ * @return the data stream constructed
+ */
+ def addSource[T: ClassTag : TypeInformation](function:
SourceFunction[T]): DataStream[T] = {
+ require(function != null, "Function must not be null.")
+ val cleanFun = StreamExecutionEnvironment.clean(function)
+ javaEnv.addSource(cleanFun)
+ }
+
+
+ /**
+ * Ads a data source with a custom type information thus opening a
{@link org.apache.flink.streaming.api
+ * .datastream.DataStream}. Only in very special cases does the user
need to support type information. Otherwise
+ * use
+ * {@link
#addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
+ *
+ * @param function
+ * the user defined function
+ * @param sourceName
+ * Name of the data source
+ * @return the data stream constructed
+ */
+ def addSource[T: ClassTag : TypeInformation](function:
SourceFunction[T],
+ sourceName: String): DataStream[T] = {
+ require(function != null, "Function must not be null.")
+ val typeInfo: TypeInformation[T] =
+ if (function.isInstanceOf[ResultTypeQueryable[T]]) {
+
function.asInstanceOf[ResultTypeQueryable[T]].getProducedType
+ }
+ else {
+
TypeExtractor.createTypeInfo(classOf[SourceFunction[T]], function.getClass, 0,
null, null)
+ }
+
+ val isParallel =
function.isInstanceOf[ParallelSourceFunction[T]]
+ val cleanFun = StreamExecutionEnvironment.clean(function)
+ val sourceOperator = new StreamSource[T](cleanFun)
+ new DataStreamSource[T](this.javaEnv, sourceName, typeInfo,
sourceOperator,
+ isParallel, sourceName)
+ }
+
+ // @SuppressWarnings("unchecked")
+ // private <OUT> DataStreamSource<OUT>
addSource(SourceFunction<OUT> function,
+ // TypeInformation<OUT> typeInfo, String sourceName) {
+ //
+ // if (typeInfo == null) {
+ // if (function instanceof
GenericSourceFunction) {
+ // typeInfo =
((GenericSourceFunction<OUT>) function).getType();
+ // } else {
+ // typeInfo =
TypeExtractor.createTypeInfo(SourceFunction.class,
+ // function.getClass(), 0, null,
null);
+ // }
+ // }
+ //
+ // boolean isParallel = function instanceof
ParallelSourceFunction;
+ //
+ // ClosureCleaner.clean(function, true);
+ // StreamOperator<OUT, OUT> sourceOperator = new
StreamSource<OUT>(function);
+ //
+ // return new DataStreamSource<OUT>(this, sourceName, typeInfo,
sourceOperator,
+ // isParallel, sourceName);
+ // }
+
+ /**
+ * Create a DataStream using a user defined source function for
arbitrary
+ * source functionality.
+ *
+ */
+ def addSource[T: ClassTag : TypeInformation](function: Collector[T] =>
Unit): DataStream[T] = {
+ require(function != null, "Function must not be null.")
+ val sourceFunction = new SourceFunction[T] {
+ val cleanFun =
StreamExecutionEnvironment.clean(function)
+
+ override def run(out: Collector[T]) {
+ cleanFun(out)
+ }
+
+ override def cancel() = {}
+ }
+ addSource(sourceFunction)
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all
parts of
+ * the program that have resulted in a "sink" operation. Sink
operations are
+ * for example printing results or forwarding them to a message queue.
+ * <p>
+ * The program execution will be logged and displayed with a generated
+ * default name.
+ *
+ */
+ def execute() = javaEnv.execute()
+
+ /**
+ * Triggers the program execution. The environment will execute all
parts of
+ * the program that have resulted in a "sink" operation. Sink
operations are
+ * for example printing results or forwarding them to a message queue.
+ * <p>
+ * The program execution will be logged and displayed with the provided
name
+ *
+ */
+ def execute(jobName: String) = javaEnv.execute(jobName)
+
+ /**
+ * Creates the plan with which the system will execute the program, and
+ * returns it as a String using a JSON representation of the execution
data
+ * flow graph. Note that this needs to be called, before the plan is
+ * executed.
+ *
+ */
+ def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
--- End diff --
This change is altering the indentation from spaces to tabs. In scala we
are using spaces ;)
> Streaming file source/sink API is not in sync with the batch API
> ----------------------------------------------------------------
>
> Key: FLINK-1687
> URL: https://issues.apache.org/jira/browse/FLINK-1687
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Reporter: Gábor Hermann
> Assignee: Péter Szabó
>
> Streaming environment is missing file inputs like readFile, readCsvFile and
> also the more general createInput function, and outputs like writeAsCsv and
> write. Streaming and batch API should be consistent.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)