This is an automated email from the ASF dual-hosted git repository.
reswqa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6c756a084d4 [hotfix][docs] Fix code examples and minor issues in
DataStream documentation
6c756a084d4 is described below
commit 6c756a084d40f4716cdfdb4fdd71b0c919c448e6
Author: Vishal Kamlapure <[email protected]>
AuthorDate: Mon Jun 1 08:44:04 2026 +0530
[hotfix][docs] Fix code examples and minor issues in DataStream
documentation
---
.../docs/dev/datastream/application_parameters.md | 5 ++---
docs/content.zh/docs/dev/datastream/java_lambdas.md | 4 ++--
docs/content.zh/docs/dev/datastream/operators/asyncio.md | 2 +-
docs/content.zh/docs/dev/datastream/operators/overview.md | 8 ++++----
docs/content.zh/docs/dev/datastream/overview.md | 2 +-
docs/content.zh/docs/dev/datastream/testing.md | 4 ++--
docs/content/docs/dev/datastream/application_parameters.md | 9 +++------
docs/content/docs/dev/datastream/execution_mode.md | 6 +++---
docs/content/docs/dev/datastream/java_lambdas.md | 8 ++++----
docs/content/docs/dev/datastream/operators/asyncio.md | 6 +++---
docs/content/docs/dev/datastream/operators/overview.md | 8 ++++----
docs/content/docs/dev/datastream/overview.md | 2 +-
docs/content/docs/dev/datastream/testing.md | 14 +++++++-------
13 files changed, 37 insertions(+), 41 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/application_parameters.md
b/docs/content.zh/docs/dev/datastream/application_parameters.md
index 98b567429fd..c8ec94ed6ed 100644
--- a/docs/content.zh/docs/dev/datastream/application_parameters.md
+++ b/docs/content.zh/docs/dev/datastream/application_parameters.md
@@ -26,8 +26,6 @@ under the License.
# 应用程序参数处理
-应用程序参数处理
--------------------------------
几乎所有的批和流的 Flink
应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。
为解决以上问题,Flink 提供一个名为 `Parametertool` 的简单公共类,其中包含了一些基本的工具。请注意,这里说的
`Parametertool` 并不是必须使用的。[Commons
CLI](https://commons.apache.org/proper/commons-cli/) 和
[argparse4j](http://argparse4j.sourceforge.net/) 等其他框架也可以非常好地兼容 Flink。
@@ -48,7 +46,7 @@ ParameterTool parameter =
ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
-InputStream propertiesFileInputStream = new FileInputStream(file);
+InputStream propertiesFileInputStream = new FileInputStream(propertiesFile);
ParameterTool parameter =
ParameterTool.fromPropertiesFile(propertiesFileInputStream);
```
@@ -60,6 +58,7 @@ ParameterTool parameter =
ParameterTool.fromPropertiesFile(propertiesFileInputSt
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
+}
```
diff --git a/docs/content.zh/docs/dev/datastream/java_lambdas.md
b/docs/content.zh/docs/dev/datastream/java_lambdas.md
index de4e0b9d08f..511a4e7c1e2 100644
--- a/docs/content.zh/docs/dev/datastream/java_lambdas.md
+++ b/docs/content.zh/docs/dev/datastream/java_lambdas.md
@@ -107,7 +107,7 @@ env.fromElements(1, 2, 3)
.map(new MyTuple2Mapper())
.print();
-public static class MyTuple2Mapper extends MapFunction<Integer,
Tuple2<Integer, Integer>> {
+public static class MyTuple2Mapper implements MapFunction<Integer,
Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
return Tuple2.of(i, i);
@@ -116,7 +116,7 @@ public static class MyTuple2Mapper extends
MapFunction<Integer, Tuple2<Integer,
// 使用匿名类来替代
env.fromElements(1, 2, 3)
- .map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
+ .map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
return Tuple2.of(i, i);
diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md
b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
index e160a0d05e9..86034fb4480 100644
--- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
@@ -82,7 +82,7 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String,
Tuple2<String, Stri
@Override
public void open(OpenContext openContext) throws Exception {
- client = new DatabaseClient(host, post, credentials);
+ client = new DatabaseClient(host, port, credentials);
}
@Override
diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md
b/docs/content.zh/docs/dev/datastream/operators/overview.md
index c841a721dd5..bab82b1f6e5 100644
--- a/docs/content.zh/docs/dev/datastream/operators/overview.md
+++ b/docs/content.zh/docs/dev/datastream/operators/overview.md
@@ -224,7 +224,7 @@ windowedStream.apply(new
WindowFunction<Tuple2<String,Integer>, Integer, Tuple,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
- for (value t: values) {
+ for (Tuple2<String, Integer> t : values) {
sum += t.f1;
}
out.collect (new Integer(sum));
@@ -237,7 +237,7 @@ allWindowedStream.apply (new
AllWindowFunction<Tuple2<String,Integer>, Integer,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
- for (value t: values) {
+ for (Tuple2<String, Integer> t : values) {
sum += t.f1;
}
out.collect (new Integer(sum));
@@ -449,7 +449,7 @@ class MyCoMapFunction(CoMapFunction):
class MyCoFlatMapFunction(CoFlatMapFunction):
- def flat_map1(self, value)
+ def flat_map1(self, value):
for i in range(value[0]):
yield i
@@ -540,7 +540,7 @@ dataStream.partitionCustom(partitioner, 0);
{{< tab "Python" >}}
```python
data_stream = env.from_collection(collection=[(2, 'a'), (2, 'a'), (3, 'b')])
-data_stream.partition_custom(lambda key, num_partition: key % partition,
lambda x: x[0])
+data_stream.partition_custom(lambda key, num_partition: key % num_partition,
lambda x: x[0])
```
{{< /tab >}}
{{< /tabs>}}
diff --git a/docs/content.zh/docs/dev/datastream/overview.md
b/docs/content.zh/docs/dev/datastream/overview.md
index 6c4367436cf..fbca9cf0b89 100644
--- a/docs/content.zh/docs/dev/datastream/overview.md
+++ b/docs/content.zh/docs/dev/datastream/overview.md
@@ -330,7 +330,7 @@ Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系
LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
-env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
+env.fromSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content.zh/docs/dev/datastream/testing.md
b/docs/content.zh/docs/dev/datastream/testing.md
index 5891769881b..949f05956f4 100644
--- a/docs/content.zh/docs/dev/datastream/testing.md
+++ b/docs/content.zh/docs/dev/datastream/testing.md
@@ -103,7 +103,7 @@ public class IncrementFlatMapFunctionTest {
```java
public class StatefulFlatMapTest {
- private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
+ private KeyedOneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
@@ -274,7 +274,7 @@ public class ExampleIntegrationTest {
env.execute();
// verify your results
- assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
+ assertTrue(CollectSink.values.containsAll(List.of(2L, 22L, 23L)));
}
// create a testing sink
diff --git a/docs/content/docs/dev/datastream/application_parameters.md
b/docs/content/docs/dev/datastream/application_parameters.md
index 6583c5c8a71..6e7f8dd11f2 100644
--- a/docs/content/docs/dev/datastream/application_parameters.md
+++ b/docs/content/docs/dev/datastream/application_parameters.md
@@ -26,10 +26,6 @@ under the License.
# Handling Application Parameters
-
-
-Handling Application Parameters
--------------------------------
Almost all Flink applications, both batch and streaming, rely on external
configuration parameters.
They are used to specify input and output sources (like paths or addresses),
system parameters (parallelism, runtime configuration), and application
specific parameters (typically used within user functions).
@@ -53,7 +49,7 @@ ParameterTool parameters =
ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameters = ParameterTool.fromPropertiesFile(propertiesFile);
-InputStream propertiesFileInputStream = new FileInputStream(file);
+InputStream propertiesFileInputStream = new FileInputStream(propertiesFile);
ParameterTool parameters =
ParameterTool.fromPropertiesFile(propertiesFileInputStream);
```
@@ -65,6 +61,7 @@ This allows getting arguments like `--input hdfs:///mydata
--elements 42` from t
public static void main(String[] args) {
ParameterTool parameters = ParameterTool.fromArgs(args);
// .. regular code ..
+}
```
@@ -94,7 +91,7 @@ parameters.getNumberOfParameters();
```
You can use the return values of these methods directly in the `main()` method
of the client submitting the application.
-For example, you could set the parallelism of a operator like this:
+For example, you could set the parallelism of an operator like this:
```java
ParameterTool parameters = ParameterTool.fromArgs(args);
diff --git a/docs/content/docs/dev/datastream/execution_mode.md
b/docs/content/docs/dev/datastream/execution_mode.md
index 342637a6b52..a0ebb2fa6fc 100644
--- a/docs/content/docs/dev/datastream/execution_mode.md
+++ b/docs/content/docs/dev/datastream/execution_mode.md
@@ -207,7 +207,7 @@ correspond to the three tasks that are separated by the
shuffle barriers.
Instead of sending records immediately to downstream tasks, as explained above
for `STREAMING` mode, processing in stages requires Flink to materialize
intermediate results of tasks to some non-ephemeral storage which allows
-downstream tasks to read them after upstream tasks have already gone off line.
+downstream tasks to read them after upstream tasks have already gone offline.
This will increase the latency of processing but comes with other interesting
properties. For one, this allows Flink to backtrack to the latest available
results when a failure happens instead of restarting the whole job. Another
@@ -228,8 +228,8 @@ checkpointing works.
In `BATCH` mode, the configured state backend is ignored. Instead, the input of
a keyed operation is grouped by key (using sorting) and then we process all
-records of a key in turn. This allows keeping only the state of only one key at
-the same time. State for a given key will be discarded when moving on to the
+records of a key in turn. This allows keeping only one key's state at
+a time. State for a given key will be discarded when moving on to the
next key.
See [FLIP-140](https://cwiki.apache.org/confluence/x/kDh4CQ) for background
diff --git a/docs/content/docs/dev/datastream/java_lambdas.md
b/docs/content/docs/dev/datastream/java_lambdas.md
index a0423ba3a25..f90bf1d8a8d 100644
--- a/docs/content/docs/dev/datastream/java_lambdas.md
+++ b/docs/content/docs/dev/datastream/java_lambdas.md
@@ -37,7 +37,7 @@ Flink supports the usage of lambda expressions for all
operators of the Java API
This document shows how to use lambda expressions and describes current
limitations. For a general introduction to the Flink API, please refer to the
-[DataStream API overview]({{< ref "docs/dev/datastream/overview" >}})
+[DataStream API overview]({{< ref "docs/dev/datastream/overview" >}}).
### Examples and Limitations
@@ -83,7 +83,7 @@ input.flatMap((Integer number, Collector<String> out) -> {
.print();
```
-Similar problems occur when using a `map()` function with a generic return
type. A method signature `Tuple2<Integer, Integer> map(Integer value)` is
erasured to `Tuple2 map(Integer value)` in the example below.
+Similar problems occur when using a `map()` function with a generic return
type. A method signature `Tuple2<Integer, Integer> map(Integer value)` is
erased to `Tuple2 map(Integer value)` in the example below.
```java
import org.apache.flink.api.common.functions.MapFunction;
@@ -111,7 +111,7 @@ env.fromElements(1, 2, 3)
.map(new MyTuple2Mapper())
.print();
-public static class MyTuple2Mapper extends MapFunction<Integer,
Tuple2<Integer, Integer>> {
+public static class MyTuple2Mapper implements MapFunction<Integer,
Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
return Tuple2.of(i, i);
@@ -120,7 +120,7 @@ public static class MyTuple2Mapper extends
MapFunction<Integer, Tuple2<Integer,
// use an anonymous class instead
env.fromElements(1, 2, 3)
- .map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
+ .map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Integer i) {
return Tuple2.of(i, i);
diff --git a/docs/content/docs/dev/datastream/operators/asyncio.md
b/docs/content/docs/dev/datastream/operators/asyncio.md
index f7757fb633b..63778356c74 100644
--- a/docs/content/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content/docs/dev/datastream/operators/asyncio.md
@@ -97,7 +97,7 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String,
Tuple2<String, Stri
@Override
public void open(OpenContext openContext) throws Exception {
- client = new DatabaseClient(host, post, credentials);
+ client = new DatabaseClient(host, port, credentials);
}
@Override
@@ -237,7 +237,7 @@ The following three parameters control the asynchronous
operations:
### Timeout Handling
-When an async I/O request times out, by default an exception is thrown and job
is restarted.
+When an async I/O request times out, by default an exception is thrown and the
job is restarted.
If you want to handle timeouts, you can override the `AsyncFunction#timeout`
method.
In the Java API, make sure you call `ResultFuture.complete()` or
`ResultFuture.completeExceptionally()` when overriding
@@ -259,7 +259,7 @@ To control in which order the resulting records are
emitted, Flink offers two mo
Use `AsyncDataStream.unorderedWait(...)` or
`AsyncDataStream.unordered_wait(...)` for this mode.
- **Ordered**: In that case, the stream order is preserved. Result records
are emitted in the same order as the asynchronous
- requests are triggered (the order of the operators input records). To
achieve that, the operator buffers a result record
+ requests are triggered (the order of the operator's input records). To
achieve that, the operator buffers a result record
until all its preceding records are emitted (or timed out).
This usually introduces some amount of extra latency and some overhead in
checkpointing, because records or results are maintained
in the checkpointed state for a longer time, compared to the unordered
mode.
diff --git a/docs/content/docs/dev/datastream/operators/overview.md
b/docs/content/docs/dev/datastream/operators/overview.md
index 18e55e074d8..bca4b3aaca7 100644
--- a/docs/content/docs/dev/datastream/operators/overview.md
+++ b/docs/content/docs/dev/datastream/operators/overview.md
@@ -227,7 +227,7 @@ windowedStream.apply(new
WindowFunction<Tuple2<String,Integer>, Integer, Tuple,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
- for (value t: values) {
+ for (Tuple2<String, Integer> t : values) {
sum += t.f1;
}
out.collect (new Integer(sum));
@@ -240,7 +240,7 @@ allWindowedStream.apply (new
AllWindowFunction<Tuple2<String,Integer>, Integer,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
- for (value t: values) {
+ for (Tuple2<String, Integer> t : values) {
sum += t.f1;
}
out.collect (new Integer(sum));
@@ -452,7 +452,7 @@ class MyCoMapFunction(CoMapFunction):
class MyCoFlatMapFunction(CoFlatMapFunction):
- def flat_map1(self, value)
+ def flat_map1(self, value):
for i in range(value[0]):
yield i
@@ -547,7 +547,7 @@ dataStream.partitionCustom(partitioner, 0);
{{< tab "Python" >}}
```python
data_stream = env.from_collection(collection=[(2, 'a'), (2, 'a'), (3, 'b')])
-data_stream.partition_custom(lambda key, num_partition: key % partition,
lambda x: x[0])
+data_stream.partition_custom(lambda key, num_partition: key % num_partition,
lambda x: x[0])
```
{{< /tab >}}
{{< /tabs>}}
diff --git a/docs/content/docs/dev/datastream/overview.md
b/docs/content/docs/dev/datastream/overview.md
index 4ac9eceda22..8090804616e 100644
--- a/docs/content/docs/dev/datastream/overview.md
+++ b/docs/content/docs/dev/datastream/overview.md
@@ -395,7 +395,7 @@ Usage:
LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
-env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
+env.fromSequence(1, 10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content/docs/dev/datastream/testing.md
b/docs/content/docs/dev/datastream/testing.md
index 34be49d0820..d850cab6aae 100644
--- a/docs/content/docs/dev/datastream/testing.md
+++ b/docs/content/docs/dev/datastream/testing.md
@@ -111,7 +111,7 @@ public class StatefulFlatMapTest {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
- // wrap user defined function into a the corresponding operator
+ // wrap user-defined function into the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(new
StreamFlatMap<>(statefulFlatMapFunction));
// optionally configured the execution environment
@@ -124,7 +124,7 @@ public class StatefulFlatMapTest {
@Test
public void testingStatefulFlatMapFunction() throws Exception {
- //push (timestamped) elements into the operator (and hence user
defined function)
+ // push (timestamped) elements into the operator (and hence the
user-defined function)
testHarness.processElement(2L, 100L);
//trigger event time timers by advancing the event time of the
operator with a watermark
@@ -148,7 +148,7 @@ public class StatefulFlatMapTest {
```java
public class StatefulFlatMapFunctionTest {
- private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
+ private KeyedOneInputStreamOperatorTestHarness<String, Long, Long>
testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
@@ -157,7 +157,7 @@ public class StatefulFlatMapFunctionTest {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
- // wrap user defined function into a the corresponding operator
+ // wrap user-defined function into the corresponding operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new
StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(),
Types.STRING);
// open the test harness (will also call open() on RichFunctions)
@@ -203,11 +203,11 @@ public class PassThroughProcessFunctionTest {
//instantiate user-defined function
PassThroughProcessFunction processFunction = new
PassThroughProcessFunction();
- // wrap user defined function into a the corresponding operator
+ // wrap user-defined function into the corresponding operator
OneInputStreamOperatorTestHarness<Integer, Integer> harness =
ProcessFunctionTestHarnesses
.forProcessFunction(processFunction);
- //push (timestamped) elements into the operator (and hence user
defined function)
+ // push (timestamped) elements into the operator (and hence the
user-defined function)
harness.processElement(1, 10);
//retrieve list of emitted records for assertions
@@ -273,7 +273,7 @@ public class ExampleIntegrationTest {
env.execute();
// verify your results
- assertTrue(CollectSink.values.containsAll(2L, 22L, 23L));
+ assertTrue(CollectSink.values.containsAll(List.of(2L, 22L, 23L)));
}
// create a testing sink