alibahadirzeybek commented on a change in pull request #17760:
URL: https://github.com/apache/flink/pull/17760#discussion_r748402094
##########
File path:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
//
*************************************************************************
public static void main(String[] args) throws Exception {
+ final CLI params = CLI.fromArgs(args);
- // Checking input parameters
- final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
-
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing
means that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what
final means here: a job
+ // executing in STREAMING mode might produce incremental updates
(think upserts in
+ // a database) while a BATCH job would only produce one final result
at the end. The final
+ // result will be the same if interpreted correctly, but getting there
can be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded
jobs that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and
failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH
if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.getExecutionMode());
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig().setGlobalJobParameters(params);
- // get input data
- DataStream<String> text = null;
- if (params.has("input")) {
- // union all the inputs from text files
- for (String input : params.getMultiParameterRequired("input")) {
- if (text == null) {
- text = env.readTextFile(input);
- } else {
- text = text.union(env.readTextFile(input));
- }
- }
- Preconditions.checkNotNull(text, "Input DataStream should not be
null.");
+ DataStream<String> text;
+ if (params.getInputs().isPresent()) {
+ // Create a new file source that will read files from a given set
of directories.
+ // Each file will be processed as plain text and split based on
newlines.
+ FileSource.FileSourceBuilder<String> builder =
+ FileSource.forRecordStreamFormat(
+ new TextLineFormat(), params.getInputs().get());
+
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+ text = env.fromSource(builder.build(),
WatermarkStrategy.noWatermarks(), "file-input");
} else {
- System.out.println("Executing WordCount example with default input
data set.");
- System.out.println("Use --input to specify file input.");
- // get default test text data
- text = env.fromElements(WordCountData.WORDS);
+ text =
env.fromElements(WordCountData.WORDS).name("in-memory-input");
}
DataStream<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented
below,
+ // will output each words as a (2-tuple) containing (word, 1)
Review comment:
```suggestion
// will output each word as a (2-tuple) containing (word, 1)
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
//
*************************************************************************
public static void main(String[] args) throws Exception {
+ final CLI params = CLI.fromArgs(args);
- // Checking input parameters
- final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
-
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing
means that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what
final means here: a job
+ // executing in STREAMING mode might produce incremental updates
(think upserts in
+ // a database) while a BATCH job would only produce one final result
at the end. The final
+ // result will be the same if interpreted correctly, but getting there
can be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded
jobs that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and
failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH
if all sources
Review comment:
```suggestion
// By setting the runtime mode to AUTOMATIC, Flink will choose BATCH
if all sources
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* The input is a plain text file with lines separated by newline characters.
*
* Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
*
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided, the program is run with default data from
[[WordCountData]].
*
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the
results
+ * to `stdout`
*
- * - write a simple Flink Streaming program,
- * - use tuple data types,
- * - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or
AUTOMATIC) of this
+ * pipeline.
*
+ * This example shows how to:
+ *
+ * - Write a simple Flink DataStream program
+ * - Use tuple data types
+ * - Write and use a user-defined function
*/
object WordCount {
- def main(args: Array[String]) {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
- // Checking input parameters
- val params = ParameterTool.fromArgs(args)
+ def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing means
that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what final
means here: a job
+ // executing in STREAMING mode might produce incremental updates (think
upserts in
+ // a database) while a BATCH job would only produce one final result at
the end. The final
+ // result will be the same if interpreted correctly, but getting there can
be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs
that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and failure
recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if
all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.executionMode)
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig.setGlobalJobParameters(params)
// get input data
- val text =
- // read the text file from given input path
- if (params.has("input")) {
- env.readTextFile(params.get("input"))
- } else {
- println("Executing WordCount example with default inputs data set.")
- println("Use --input to specify file input.")
- // get default test text data
- env.fromElements(WordCountData.WORDS: _*)
+ val text = params.input match {
+ case Some(input) =>
+ // Create a new file source that will read files from a given set of
directories.
+ // Each file will be processed as plain text and split based on
newlines.
+ val builder = FileSource.forRecordStreamFormat(new TextLineFormat,
input:_*)
+ params.discoveryInterval.foreach { duration =>
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ builder.monitorContinuously(duration)
+ }
+ env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(),
"file-input")
+ case None =>
+ env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
}
- val counts: DataStream[(String, Int)] = text
- // split up the lines in pairs (2-tuples) containing: (word,1)
- .flatMap(_.toLowerCase.split("\\W+"))
- .filter(_.nonEmpty)
- .map((_, 1))
- // group by the tuple field "0" and sum up tuple field "1"
- .keyBy(_._1)
- .sum(1)
-
- // emit result
- if (params.has("output")) {
- counts.writeAsText(params.get("output"))
- } else {
- println("Printing result to stdout. Use --output to specify output
path.")
- counts.print()
+ val counts =
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented below,
+ // will output each words as a (2-tuple) containing (word, 1)
+ text.flatMap(new Tokenizer)
+ .name("tokenizer")
+ // keyBy groups tuples based on the "_1" field, the word.
+ // Using a keyBy allows performing aggregations and other
+ // stateful transformations over data on a per-key basis.
+ // This is similar to a GROUP BY clause in a SQL query.
+ .keyBy(_._1)
+ // For each key, we perform a simple sum of the "1" field, the count.
+ // If the input data set is bounded, sum will output a final count for
+ // each word. If it is unbounded, it will continuously output updates
+ // each time it sees a new instance of each word in the stream.
+ .sum(1)
+ .name("counter")
+
+ params.output match {
+ case Some(output) =>
+ // Given an output directory, Flink will write the results to a file
+ // using a simple string encoding. In a production environment, this
might
+ // be something more structured like CSV, Avro, JSON, or Parquet.
+ counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new
SimpleStringEncoder())
+ .withRollingPolicy(DefaultRollingPolicy.builder()
+ .withMaxPartSize(MemorySize.ofMebiBytes(1))
+ .withRolloverInterval(Duration.ofSeconds(10))
+ .build())
+ .build())
+ .name("file-sink")
+
+ case None => counts.print().name("print-sink")
}
- // execute program
- env.execute("Streaming WordCount")
+ // Apache Flink applications are composed lazily. Calling execute
+ // submits the Job and begins processing.
+ env.execute("WordCount")
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a
user-defined
Review comment:
```suggestion
* Implements the string tokenizer that splits a sentence into words as a
user-defined
```
Same for the Java implementation
##########
File path:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -17,30 +17,50 @@
package org.apache.flink.streaming.examples.wordcount;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
/**
* Implements the "WordCount" program that computes a simple word occurrence
histogram over text
- * files in a streaming fashion.
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* <p>The input is a plain text file with lines separated by newline
characters.
Review comment:
```suggestion
* <p>The input is a [list of] plain text file[s] with lines separated by a
newline character.
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* The input is a plain text file with lines separated by newline characters.
*
* Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
*
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided, the program is run with default data from
[[WordCountData]].
Review comment:
```suggestion
* If no input is provided, the program is run with default data from
[[WordCountData]].
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* The input is a plain text file with lines separated by newline characters.
*
* Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
*
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided, the program is run with default data from
[[WordCountData]].
*
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the
results
+ * to `stdout`
*
- * - write a simple Flink Streaming program,
- * - use tuple data types,
- * - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or
AUTOMATIC) of this
+ * pipeline.
*
+ * This example shows how to:
+ *
+ * - Write a simple Flink DataStream program
+ * - Use tuple data types
+ * - Write and use a user-defined function
*/
object WordCount {
- def main(args: Array[String]) {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
- // Checking input parameters
- val params = ParameterTool.fromArgs(args)
+ def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing means
that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what final
means here: a job
+ // executing in STREAMING mode might produce incremental updates (think
upserts in
+ // a database) while a BATCH job would only produce one final result at
the end. The final
Review comment:
```suggestion
// a database) while in BATCH mode, it would only produce one final
result at the end. The final
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* The input is a plain text file with lines separated by newline characters.
*
* Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
*
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided, the program is run with default data from
[[WordCountData]].
*
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the
results
+ * to `stdout`
*
- * - write a simple Flink Streaming program,
- * - use tuple data types,
- * - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or
AUTOMATIC) of this
+ * pipeline.
*
+ * This example shows how to:
+ *
+ * - Write a simple Flink DataStream program
+ * - Use tuple data types
+ * - Write and use a user-defined function
*/
object WordCount {
- def main(args: Array[String]) {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
- // Checking input parameters
- val params = ParameterTool.fromArgs(args)
+ def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing means
that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what final
means here: a job
+ // executing in STREAMING mode might produce incremental updates (think
upserts in
+ // a database) while a BATCH job would only produce one final result at
the end. The final
+ // result will be the same if interpreted correctly, but getting there can
be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs
that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and failure
recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if
all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.executionMode)
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig.setGlobalJobParameters(params)
// get input data
- val text =
- // read the text file from given input path
- if (params.has("input")) {
- env.readTextFile(params.get("input"))
- } else {
- println("Executing WordCount example with default inputs data set.")
- println("Use --input to specify file input.")
- // get default test text data
- env.fromElements(WordCountData.WORDS: _*)
+ val text = params.input match {
+ case Some(input) =>
+ // Create a new file source that will read files from a given set of
directories.
+ // Each file will be processed as plain text and split based on
newlines.
+ val builder = FileSource.forRecordStreamFormat(new TextLineFormat,
input:_*)
+ params.discoveryInterval.foreach { duration =>
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+ builder.monitorContinuously(duration)
+ }
+ env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(),
"file-input")
+ case None =>
+ env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
}
- val counts: DataStream[(String, Int)] = text
- // split up the lines in pairs (2-tuples) containing: (word,1)
- .flatMap(_.toLowerCase.split("\\W+"))
- .filter(_.nonEmpty)
- .map((_, 1))
- // group by the tuple field "0" and sum up tuple field "1"
- .keyBy(_._1)
- .sum(1)
-
- // emit result
- if (params.has("output")) {
- counts.writeAsText(params.get("output"))
- } else {
- println("Printing result to stdout. Use --output to specify output
path.")
- counts.print()
+ val counts =
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented below,
+ // will output each words as a (2-tuple) containing (word, 1)
+ text.flatMap(new Tokenizer)
+ .name("tokenizer")
+ // keyBy groups tuples based on the "_1" field, the word.
+ // Using a keyBy allows performing aggregations and other
+ // stateful transformations over data on a per-key basis.
+ // This is similar to a GROUP BY clause in a SQL query.
+ .keyBy(_._1)
+ // For each key, we perform a simple sum of the "1" field, the count.
+ // If the input data set is bounded, sum will output a final count for
Review comment:
```suggestion
// If the input data stream is bounded, sum will output a final
count for
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TimeUtils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * A simple CLI parser for the {@link
org.apache.flink.streaming.examples.wordcount.WordCount}
+ * example application.
+ */
+public class CLI extends ExecutionConfig.GlobalJobParameters {
+
+ public static final String INPUT_KEY = "input";
+ public static final String OUTPUT_KEY = "output";
+ public static final String DISCOVERY_INTERVAL = "discovery-interval";
+ public static final String EXECUTION_MODE = "execution-mode";
+
+ public static CLI fromArgs(String[] args) throws Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+ Path[] inputs = null;
+ if (params.has(INPUT_KEY)) {
+ inputs =
+ params.getMultiParameterRequired(INPUT_KEY).stream()
+ .map(Path::new)
+ .toArray(Path[]::new);
+ } else {
+ System.out.println("Executing example with default input data
set.");
Review comment:
```suggestion
System.out.println("Executing example with default input data.");
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
//
*************************************************************************
public static void main(String[] args) throws Exception {
+ final CLI params = CLI.fromArgs(args);
- // Checking input parameters
- final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
-
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing
means that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what
final means here: a job
+ // executing in STREAMING mode might produce incremental updates
(think upserts in
+ // a database) while a BATCH job would only produce one final result
at the end. The final
+ // result will be the same if interpreted correctly, but getting there
can be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded
jobs that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and
failure recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH
if all sources
+ // are bounded and otherwise STREAMING.
+ env.setRuntimeMode(params.getExecutionMode());
+
+ // This optional step makes the input parameters
+ // available in the Flink UI.
env.getConfig().setGlobalJobParameters(params);
- // get input data
- DataStream<String> text = null;
- if (params.has("input")) {
- // union all the inputs from text files
- for (String input : params.getMultiParameterRequired("input")) {
- if (text == null) {
- text = env.readTextFile(input);
- } else {
- text = text.union(env.readTextFile(input));
- }
- }
- Preconditions.checkNotNull(text, "Input DataStream should not be
null.");
+ DataStream<String> text;
+ if (params.getInputs().isPresent()) {
+ // Create a new file source that will read files from a given set
of directories.
+ // Each file will be processed as plain text and split based on
newlines.
+ FileSource.FileSourceBuilder<String> builder =
+ FileSource.forRecordStreamFormat(
+ new TextLineFormat(), params.getInputs().get());
+
+ // If a discovery interval is provided, the source will
+ // continuously watch the given directories for new files.
+
params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+ text = env.fromSource(builder.build(),
WatermarkStrategy.noWatermarks(), "file-input");
} else {
- System.out.println("Executing WordCount example with default input
data set.");
- System.out.println("Use --input to specify file input.");
- // get default test text data
- text = env.fromElements(WordCountData.WORDS);
+ text =
env.fromElements(WordCountData.WORDS).name("in-memory-input");
}
DataStream<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
+ // The text lines read from the source are split into words
+ // using a user-defined function. The tokenizer, implemented
below,
+ // will output each words as a (2-tuple) containing (word, 1)
text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field
"1"
+ .name("tokenizer")
+ // keyBy groups tuples based on the "0" field, the
word.
+ // Using a keyBy allows performing aggregations and
other
+ // stateful transformations over data on a per-key
basis.
+ // This is similar to a GROUP BY clause in a SQL query.
.keyBy(value -> value.f0)
- .sum(1);
+ // For each key, we perform a simple sum of the "1"
field, the count.
+ // If the input data set is bounded, sum will output a
final count for
Review comment:
```suggestion
// If the input data stream is bounded, sum will
output a final count for
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -50,51 +70,92 @@
//
*************************************************************************
public static void main(String[] args) throws Exception {
+ final CLI params = CLI.fromArgs(args);
- // Checking input parameters
- final MultipleParameterTool params =
MultipleParameterTool.fromArgs(args);
-
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing
means that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what
final means here: a job
+ // executing in STREAMING mode might produce incremental updates
(think upserts in
+ // a database) while a BATCH job would only produce one final result
at the end. The final
Review comment:
```suggestion
// a database) while in BATCH mode, it would only produce one final
result at the end. The final
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* The input is a plain text file with lines separated by newline characters.
Review comment:
```suggestion
* The input is a [list of] plain text file[s] with lines separated by a
newline character.
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
##########
@@ -17,30 +17,50 @@
package org.apache.flink.streaming.examples.wordcount;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
/**
* Implements the "WordCount" program that computes a simple word occurrence
histogram over text
- * files in a streaming fashion.
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* <p>The input is a plain text file with lines separated by newline
characters.
*
- * <p>Usage: <code>WordCount --input <path> --output
<path></code><br>
- * If no parameters are provided, the program is run with default data from
{@link WordCountData}.
+ * <p>Usage:
+ *
+ * <ul>
+ * <li><code>--input <path></code>A list of input files and / or
directories to read. If no
+ * inputs are provided, the program is run with default data from {@link
WordCountData}.
Review comment:
```suggestion
* input is provided, the program is run with default data from {@link
WordCountData}.
```
##########
File path:
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
##########
@@ -18,74 +18,154 @@
package org.apache.flink.streaming.scala.examples.wordcount
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+import org.apache.flink.util.Collector
+
+import java.time.Duration
/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over text files in a streaming fashion.
+ * Implements the "WordCount" program that computes a simple word occurrence
histogram over text
+ * files. This Job can be executed in both streaming and batch execution modes.
*
* The input is a plain text file with lines separated by newline characters.
*
* Usage:
- * {{{
- * WordCount --input <path> --output <path>
- * }}}
*
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
+ * {{{ --input <path> }}} A list of input files and / or directories to read.
+ * If no inputs are provided, the program is run with default data from
[[WordCountData]].
*
- * This example shows how to:
+ * {{{--discovery-interval <duration> }}} Turns the file reader
+ * into a continuous source that will monitor the provided input directories
+ * every interval and read any new files.
+ *
+ * {{{--output <path> }}} The output directory where the Job will
+ * write the results. If no output path is provided, the Job will print the
results
+ * to `stdout`
*
- * - write a simple Flink Streaming program,
- * - use tuple data types,
- * - write and use transformation functions.
+ * {{{--execution-mode <mode> }}} The execution mode (BATCH, STREAMING, or
AUTOMATIC) of this
+ * pipeline.
*
+ * This example shows how to:
+ *
+ * - Write a simple Flink DataStream program
+ * - Use tuple data types
+ * - Write and use a user-defined function
*/
object WordCount {
- def main(args: Array[String]) {
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
- // Checking input parameters
- val params = ParameterTool.fromArgs(args)
+ def main(args: Array[String]): Unit = {
+ val params = CLI.fromArgs(args)
- // set up the execution environment
+ // Create the execution environment. This is the main entrypoint
+ // to building a Flink application.
val env = StreamExecutionEnvironment.getExecutionEnvironment
- // make parameters available in the web interface
+ // Apache Flink’s unified approach to stream and batch processing means
that a DataStream
+ // application executed over bounded input will produce the same final
results regardless
+ // of the configured execution mode. It is important to note what final
means here: a job
+ // executing in STREAMING mode might produce incremental updates (think
upserts in
+ // a database) while a BATCH job would only produce one final result at
the end. The final
+ // result will be the same if interpreted correctly, but getting there can
be different.
+ //
+ // The “classic” execution behavior of the DataStream API is called
STREAMING execution
+ // mode. Applications should use streaming execution for unbounded jobs
that require
+ // continuous incremental processing and are expected to stay online
indefinitely.
+ //
+ // By enabling BATCH execution, we allow Flink to apply additional
optimizations that we
+ // can only do when we know that our input is bounded. For example,
different
+ // join/aggregation strategies can be used, in addition to a different
shuffle
+ // implementation that allows more efficient task scheduling and failure
recovery behavior.
+ //
+ // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if
all sources
Review comment:
```suggestion
// By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if
all sources
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]