alibahadirzeybek commented on a change in pull request #17760:
URL: https://github.com/apache/flink/pull/17760#discussion_r748402094



##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
     // 
*************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        // Checking input parameters
-        final MultipleParameterTool params = 
MultipleParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // make parameters available in the web interface
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while a BATCH job would only produce one final result 
at the end. The final
+        // result will be the same if interpreted correctly, but getting there 
can be different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  
if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
         env.getConfig().setGlobalJobParameters(params);
 
-        // get input data
-        DataStream<String> text = null;
-        if (params.has("input")) {
-            // union all the inputs from text files
-            for (String input : params.getMultiParameterRequired("input")) {
-                if (text == null) {
-                    text = env.readTextFile(input);
-                } else {
-                    text = text.union(env.readTextFile(input));
-                }
-            }
-            Preconditions.checkNotNull(text, "Input DataStream should not be 
null.");
+        DataStream<String> text;
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set 
of directories.
+            // Each file will be processed as plain text and split based on 
newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(builder.build(), 
WatermarkStrategy.noWatermarks(), "file-input");
         } else {
-            System.out.println("Executing WordCount example with default input 
data set.");
-            System.out.println("Use --input to specify file input.");
-            // get default test text data
-            text = env.fromElements(WordCountData.WORDS);
+            text = 
env.fromElements(WordCountData.WORDS).name("in-memory-input");
         }
 
         DataStream<Tuple2<String, Integer>> counts =
-                // split up the lines in pairs (2-tuples) containing: (word,1)
+                // The text lines read from the source are split into words
+                // using a user-defined function. The tokenizer, implemented 
below,
+                // will output each words as a (2-tuple) containing (word, 1)

Review comment:
       ```suggestion
                   // will output each word as a (2-tuple) containing (word, 1)
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
     // 
*************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        // Checking input parameters
-        final MultipleParameterTool params = 
MultipleParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // make parameters available in the web interface
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while a BATCH job would only produce one final result 
at the end. The final
+        // result will be the same if interpreted correctly, but getting there 
can be different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  
if all sources

Review comment:
       ```suggestion
           // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH 
if all sources
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * The input is a plain text file with lines separated by newline characters.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided,  the program is run with default data from 
[[WordCountData]].
  *
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the 
results
+ * to `stdout`
  *
- *  - write a simple Flink Streaming program,
- *  - use tuple data types,
- *  - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or 
AUTOMATIC) of this
+ * pipeline.
  *
+ * This example shows how to:
+ *
+ *  - Write a simple Flink DataStream program
+ *  - Use tuple data types
+ *  - Write and use a user-defined function
  */
 object WordCount {
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
 
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // make parameters available in the web interface
+    // Apache Flink’s unified approach to stream and batch processing means 
that a DataStream
+    // application executed over bounded input will produce the same final 
results regardless
+    // of the configured execution mode. It is important to note what final 
means here: a job
+    // executing in STREAMING mode might produce incremental updates (think 
upserts in
+    // a database) while a BATCH job would only produce one final result at 
the end. The final
+    // result will be the same if interpreted correctly, but getting there can 
be different.
+    //
+    // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs 
that require
+    // continuous incremental processing and are expected to stay online 
indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+    // can only do when we know that our input is bounded. For example, 
different
+    // join/aggregation strategies can be used, in addition to a different 
shuffle
+    // implementation that allows more efficient task scheduling and failure 
recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if 
all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
+
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
 
     // get input data
-    val text =
-    // read the text file from given input path
-    if (params.has("input")) {
-      env.readTextFile(params.get("input"))
-    } else {
-      println("Executing WordCount example with default inputs data set.")
-      println("Use --input to specify file input.")
-      // get default test text data
-      env.fromElements(WordCountData.WORDS: _*)
+    val text = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of 
directories.
+        // Each file will be processed as plain text and split based on 
newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, 
input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), 
"file-input")
+      case None =>
+        env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
     }
 
-    val counts: DataStream[(String, Int)] = text
-      // split up the lines in pairs (2-tuples) containing: (word,1)
-      .flatMap(_.toLowerCase.split("\\W+"))
-      .filter(_.nonEmpty)
-      .map((_, 1))
-      // group by the tuple field "0" and sum up tuple field "1"
-      .keyBy(_._1)
-      .sum(1)
-
-    // emit result
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output 
path.")
-      counts.print()
+    val counts =
+      // The text lines read from the source are split into words
+      // using a user-defined function. The tokenizer, implemented below,
+      // will output each words as a (2-tuple) containing (word, 1)
+      text.flatMap(new Tokenizer)
+        .name("tokenizer")
+        // keyBy groups tuples based on the "_1" field, the word.
+        // Using a keyBy allows performing aggregations and other
+        // stateful transformations over data on a per-key basis.
+        // This is similar to a GROUP BY clause in a SQL query.
+        .keyBy(_._1)
+        // For each key, we perform a simple sum of the "1" field, the count.
+        // If the input data set is bounded, sum will output a final count for
+        // each word. If it is unbounded, it will continuously output updates
+        // each time it sees a new instance of each word in the stream.
+        .sum(1)
+        .name("counter")
+
+    params.output match {
+      case Some(output) =>
+        // Given an output directory, Flink will write the results to a file
+        // using a simple string encoding. In a production environment, this 
might
+        // be something more structured like CSV, Avro, JSON, or Parquet.
+        counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new 
SimpleStringEncoder())
+          .withRollingPolicy(DefaultRollingPolicy.builder()
+              .withMaxPartSize(MemorySize.ofMebiBytes(1))
+              .withRolloverInterval(Duration.ofSeconds(10))
+            .build())
+          .build())
+          .name("file-sink")
+
+      case None => counts.print().name("print-sink")
     }
 
-    // execute program
-    env.execute("Streaming WordCount")
+    // Apache Flink applications are composed lazily. Calling execute
+    // submits the Job and begins processing.
+    env.execute("WordCount")
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   * Implements the string tokenizer that splits sentences into words as a 
user-defined

Review comment:
       ```suggestion
      * Implements the string tokenizer that splits a sentence into words as a 
user-defined
   ```
   
   Same for the Java implementation

##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -17,30 +17,50 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
- * files in a streaming fashion.
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * <p>The input is a plain text file with lines separated by newline 
characters.

Review comment:
       ```suggestion
    * <p>The input is a [list of] plain text file[s] with lines separated by a 
newline character.
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * The input is a plain text file with lines separated by newline characters.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided,  the program is run with default data from 
[[WordCountData]].

Review comment:
       ```suggestion
    * If no input is provided, the program is run with default data from 
[[WordCountData]].
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * The input is a plain text file with lines separated by newline characters.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided,  the program is run with default data from 
[[WordCountData]].
  *
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the 
results
+ * to `stdout`
  *
- *  - write a simple Flink Streaming program,
- *  - use tuple data types,
- *  - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or 
AUTOMATIC) of this
+ * pipeline.
  *
+ * This example shows how to:
+ *
+ *  - Write a simple Flink DataStream program
+ *  - Use tuple data types
+ *  - Write and use a user-defined function
  */
 object WordCount {
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
 
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // make parameters available in the web interface
+    // Apache Flink’s unified approach to stream and batch processing means 
that a DataStream
+    // application executed over bounded input will produce the same final 
results regardless
+    // of the configured execution mode. It is important to note what final 
means here: a job
+    // executing in STREAMING mode might produce incremental updates (think 
upserts in
+    // a database) while a BATCH job would only produce one final result at 
the end. The final

Review comment:
       ```suggestion
       // a database) while in BATCH mode, it would only produce one final 
result at the end. The final
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * The input is a plain text file with lines separated by newline characters.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided,  the program is run with default data from 
[[WordCountData]].
  *
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the 
results
+ * to `stdout`
  *
- *  - write a simple Flink Streaming program,
- *  - use tuple data types,
- *  - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or 
AUTOMATIC) of this
+ * pipeline.
  *
+ * This example shows how to:
+ *
+ *  - Write a simple Flink DataStream program
+ *  - Use tuple data types
+ *  - Write and use a user-defined function
  */
 object WordCount {
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
 
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // make parameters available in the web interface
+    // Apache Flink’s unified approach to stream and batch processing means 
that a DataStream
+    // application executed over bounded input will produce the same final 
results regardless
+    // of the configured execution mode. It is important to note what final 
means here: a job
+    // executing in STREAMING mode might produce incremental updates (think 
upserts in
+    // a database) while a BATCH job would only produce one final result at 
the end. The final
+    // result will be the same if interpreted correctly, but getting there can 
be different.
+    //
+    // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs 
that require
+    // continuous incremental processing and are expected to stay online 
indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+    // can only do when we know that our input is bounded. For example, 
different
+    // join/aggregation strategies can be used, in addition to a different 
shuffle
+    // implementation that allows more efficient task scheduling and failure 
recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if 
all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
+
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
 
     // get input data
-    val text =
-    // read the text file from given input path
-    if (params.has("input")) {
-      env.readTextFile(params.get("input"))
-    } else {
-      println("Executing WordCount example with default inputs data set.")
-      println("Use --input to specify file input.")
-      // get default test text data
-      env.fromElements(WordCountData.WORDS: _*)
+    val text = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of 
directories.
+        // Each file will be processed as plain text and split based on 
newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, 
input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), 
"file-input")
+      case None =>
+        env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
     }
 
-    val counts: DataStream[(String, Int)] = text
-      // split up the lines in pairs (2-tuples) containing: (word,1)
-      .flatMap(_.toLowerCase.split("\\W+"))
-      .filter(_.nonEmpty)
-      .map((_, 1))
-      // group by the tuple field "0" and sum up tuple field "1"
-      .keyBy(_._1)
-      .sum(1)
-
-    // emit result
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output 
path.")
-      counts.print()
+    val counts =
+      // The text lines read from the source are split into words
+      // using a user-defined function. The tokenizer, implemented below,
+      // will output each words as a (2-tuple) containing (word, 1)
+      text.flatMap(new Tokenizer)
+        .name("tokenizer")
+        // keyBy groups tuples based on the "_1" field, the word.
+        // Using a keyBy allows performing aggregations and other
+        // stateful transformations over data on a per-key basis.
+        // This is similar to a GROUP BY clause in a SQL query.
+        .keyBy(_._1)
+        // For each key, we perform a simple sum of the "1" field, the count.
+        // If the input data set is bounded, sum will output a final count for

Review comment:
       ```suggestion
           // If the input data stream is bounded, sum will output a final 
count for
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * A simple CLI parser for the {@link 
org.apache.flink.streaming.examples.wordcount.WordCount}
+ * example application.
+ */
+public class CLI extends ExecutionConfig.GlobalJobParameters {
+
+    public static final String INPUT_KEY = "input";
+    public static final String OUTPUT_KEY = "output";
+    public static final String DISCOVERY_INTERVAL = "discovery-interval";
+    public static final String EXECUTION_MODE = "execution-mode";
+
+    public static CLI fromArgs(String[] args) throws Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+        Path[] inputs = null;
+        if (params.has(INPUT_KEY)) {
+            inputs =
+                    params.getMultiParameterRequired(INPUT_KEY).stream()
+                            .map(Path::new)
+                            .toArray(Path[]::new);
+        } else {
+            System.out.println("Executing example with default input data 
set.");

Review comment:
       ```suggestion
               System.out.println("Executing example with default input data.");
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
     // 
*************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        // Checking input parameters
-        final MultipleParameterTool params = 
MultipleParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // make parameters available in the web interface
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while a BATCH job would only produce one final result 
at the end. The final
+        // result will be the same if interpreted correctly, but getting there 
can be different.
+        //
+        // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+        // mode. Applications should use streaming execution for unbounded 
jobs that require
+        // continuous incremental processing and are expected to stay online 
indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+        // can only do when we know that our input is bounded. For example, 
different
+        // join/aggregation strategies can be used, in addition to a different 
shuffle
+        // implementation that allows more efficient task scheduling and 
failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  
if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
         env.getConfig().setGlobalJobParameters(params);
 
-        // get input data
-        DataStream<String> text = null;
-        if (params.has("input")) {
-            // union all the inputs from text files
-            for (String input : params.getMultiParameterRequired("input")) {
-                if (text == null) {
-                    text = env.readTextFile(input);
-                } else {
-                    text = text.union(env.readTextFile(input));
-                }
-            }
-            Preconditions.checkNotNull(text, "Input DataStream should not be 
null.");
+        DataStream<String> text;
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set 
of directories.
+            // Each file will be processed as plain text and split based on 
newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(builder.build(), 
WatermarkStrategy.noWatermarks(), "file-input");
         } else {
-            System.out.println("Executing WordCount example with default input 
data set.");
-            System.out.println("Use --input to specify file input.");
-            // get default test text data
-            text = env.fromElements(WordCountData.WORDS);
+            text = 
env.fromElements(WordCountData.WORDS).name("in-memory-input");
         }
 
         DataStream<Tuple2<String, Integer>> counts =
-                // split up the lines in pairs (2-tuples) containing: (word,1)
+                // The text lines read from the source are split into words
+                // using a user-defined function. The tokenizer, implemented 
below,
+                // will output each words as a (2-tuple) containing (word, 1)
                 text.flatMap(new Tokenizer())
-                        // group by the tuple field "0" and sum up tuple field 
"1"
+                        .name("tokenizer")
+                        // keyBy groups tuples based on the "0" field, the 
word.
+                        // Using a keyBy allows performing aggregations and 
other
+                        // stateful transformations over data on a per-key 
basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
                         .keyBy(value -> value.f0)
-                        .sum(1);
+                        // For each key, we perform a simple sum of the "1" 
field, the count.
+                        // If the input data set is bounded, sum will output a 
final count for

Review comment:
       ```suggestion
                           // If the input data stream is bounded, sum will 
output a final count for
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
     // 
*************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        // Checking input parameters
-        final MultipleParameterTool params = 
MultipleParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // make parameters available in the web interface
+        // Apache Flink’s unified approach to stream and batch processing 
means that a DataStream
+        // application executed over bounded input will produce the same final 
results regardless
+        // of the configured execution mode. It is important to note what 
final means here: a job
+        // executing in STREAMING mode might produce incremental updates 
(think upserts in
+        // a database) while a BATCH job would only produce one final result 
at the end. The final

Review comment:
       ```suggestion
           // a database) while in BATCH mode, it would only produce one final 
result at the end. The final
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * The input is a plain text file with lines separated by newline characters.

Review comment:
       ```suggestion
    * The input is a [list of] plain text file[s] with lines separated by a 
newline character.
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -17,30 +17,50 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
- * files in a streaming fashion.
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * <p>The input is a plain text file with lines separated by newline 
characters.
  *
- * <p>Usage: <code>WordCount --input &lt;path&gt; --output 
&lt;path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li><code>--input &lt;path&gt;</code>A list of input files and / or 
directories to read. If no
+ *       inputs are provided, the program is run with default data from {@link 
WordCountData}.

Review comment:
       ```suggestion
    *       input is provided, the program is run with default data from {@link 
WordCountData}.
   ```

##########
File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
 
 package org.apache.flink.streaming.scala.examples.wordcount
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
 
 /**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
  *
  * The input is a plain text file with lines separated by newline characters.
  *
  * Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
  *
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided,  the program is run with default data from 
[[WordCountData]].
  *
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the 
results
+ * to `stdout`
  *
- *  - write a simple Flink Streaming program,
- *  - use tuple data types,
- *  - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or 
AUTOMATIC) of this
+ * pipeline.
  *
+ * This example shows how to:
+ *
+ *  - Write a simple Flink DataStream program
+ *  - Use tuple data types
+ *  - Write and use a user-defined function
  */
 object WordCount {
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
 
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // make parameters available in the web interface
+    // Apache Flink’s unified approach to stream and batch processing means 
that a DataStream
+    // application executed over bounded input will produce the same final 
results regardless
+    // of the configured execution mode. It is important to note what final 
means here: a job
+    // executing in STREAMING mode might produce incremental updates (think 
upserts in
+    // a database) while a BATCH job would only produce one final result at 
the end. The final
+    // result will be the same if interpreted correctly, but getting there can 
be different.
+    //
+    // The “classic” execution behavior of the DataStream API is called 
STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs 
that require
+    // continuous incremental processing and are expected to stay online 
indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional 
optimizations that we
+    // can only do when we know that our input is bounded. For example, 
different
+    // join/aggregation strategies can be used, in addition to a different 
shuffle
+    // implementation that allows more efficient task scheduling and failure 
recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if 
all sources

Review comment:
       ```suggestion
       // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if 
all sources
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to