sjwiesman commented on a change in pull request #17760:
URL: https://github.com/apache/flink/pull/17760#discussion_r747613078
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,170 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.RuntimeExecutionMode
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* The input is a plain text file with lines separated by newline characters.
*
* Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
*
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided, the program is run with default data from
[[WordCountData]].
*
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
*
- * - write a simple Flink Streaming program,
- * - use tuple data types,
- * - write and use transformation functions.
+ * {{{--output <path> }}}The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the
results
+ * to `stdout`
*
+ * This example shows how to:
+ *
+ * - Write a simple Flink DataStream program
+ * - Use tuple data types
+ * - Write and use a user-defined function
*/
object WordCount {
- def main(args: Array[String]) {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
- // Checking input parameters
- val params = ParameterTool.fromArgs(args)
+ def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing means
that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what final
means here: a job
+ // executing in STREAMING mode might produce incremental updates (think
upserts in
+ // a database) while a BATCH job would only produce one final result at
the end. The final
+ // result will be the same if interpreted correctly, but getting there can
be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs
that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and failure
recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose the BATCH
if all sources
+ // are bounded and otherwise STREAMING.
+ // Apache Flink’s unified approach to stream and batch processing means
that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what final
means here: a job
+ // executing in STREAMING mode might produce incremental updates (think
upserts in
+ // a database) while a BATCH job would only produce one final result at
the end. The final
+ // result will be the same if interpreted correctly, but getting there can
be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs
that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and failure
recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose the BATCH
if all sources
+ // are bounded and otherwise STREAMING.
Review comment:
whoops 😅
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]