This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 29f94ea3c [hotfix] Update javadoc urls to 1.16 in the article "Howto
migrate a real-life batch pipeline from the DataSet API to the DataStream API"
(#646)
29f94ea3c is described below
commit 29f94ea3ccd7962be62973add7ce3b5aa88cb51e
Author: Etienne Chauchot <[email protected]>
AuthorDate: Wed May 10 10:56:54 2023 +0200
[hotfix] Update javadoc urls to 1.16 in the article "Howto migrate a
real-life batch pipeline from the DataSet API to the DataStream API" (#646)
---
.../2023-05-09-howto-migrate-to-datastream.md | 28 +++++++++++-----------
1 file changed, 14 insertions(+), 14 deletions(-)
diff --git a/docs/content/posts/2023-05-09-howto-migrate-to-datastream.md
b/docs/content/posts/2023-05-09-howto-migrate-to-datastream.md
index 77f6f6cdc..b0f2c7d6b 100644
--- a/docs/content/posts/2023-05-09-howto-migrate-to-datastream.md
+++ b/docs/content/posts/2023-05-09-howto-migrate-to-datastream.md
@@ -69,8 +69,8 @@ operations.
### [Setting the execution
environment](https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L90-L96)
We start by moving
-from
[ExecutionEnvironment](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/ExecutionEnvironment.html)
-to
[StreamExecutionEnvironment](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html)
+from
[ExecutionEnvironment](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/ExecutionEnvironment.html)
+to
[StreamExecutionEnvironment](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html)
. Then, as the source in this pipeline is bounded, we can use either the
default
streaming [execution
mode](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode//)
or the batch mode. In batch mode the tasks of the job can be separated into
stages that can be
@@ -82,13 +82,13 @@ allow to run the same pipeline with no change on an
unbounded source.
### Using the streaming sources and datasets
-**Sources**:
[DataSource<T>](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/operators/DataSource.html)
-becomes
[DataStreamSource<T>](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html)
+**Sources**:
[DataSource<T>](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/operators/DataSource.html)
+becomes
[DataStreamSource<T>](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html)
after the call to _env.createInput()_.
-**Datasets**:
[DataSet<T>](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html)
+**Datasets**:
[DataSet<T>](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html)
are
-now
[DataStream<T>](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStream.html)
+now
[DataStream<T>](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html)
and subclasses.
### [Migrating the join
operation](https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135)
@@ -96,11 +96,11 @@ and subclasses.
The DataStream join operator does not yet support aggregations in batch mode (
see [FLINK-22587](https://issues.apache.org/jira/browse/FLINK-22587) for
details). Basically, the
problem is with the trigger of the
-default
[GlobalWindow](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html)
+default
[GlobalWindow](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html)
which never fires so the records are never output. We will workaround this
problem by applying a
custom
[EndOfStream](https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L246-L280)
window. It is a window assigner that assigns all the records to a
-single
[TimeWindow](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html)
+single
[TimeWindow](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html)
. So, like for the GlobalWindow, all the records are assigned to the same
window except that this
window's trigger is based on the end of the window (which is set to
_Long.MAX_VALUE_). As we are on
a bounded source, at some point the watermark will advance to +INFINITY
(Long.MAX_VALUE) and will
@@ -112,11 +112,11 @@ function.
### [Migrating the group by and reduce (sum)
operations](https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L145-L169)
DataStream API has no
-more
[groupBy()](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-)
+more
[groupBy()](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-)
method, we now use
-the
[keyBy()](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-)
+the
[keyBy()](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-)
method. An aggregation downstream will be applied on elements with the same
key exactly as
-a
[GroupReduceFunction](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html)
+a
[GroupReduceFunction](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html)
would have done on a DataSet except it will not need to materialize the
collection of data. Indeed, the following
operator is a reducer: the summing operation downstream is still done through
a
[ReduceFunction](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/ReduceFunction.html)
@@ -129,7 +129,7 @@ Also, please note that, as in the join case, we need to
specify windowing for th
### [Migrating the order by
operation](https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L171-L211)
The sort of the datastream is done by applying
-a
[KeyedProcessFunction](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html)
+a
[KeyedProcessFunction](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html)
.
But, as said above, the DataStream semantics are the ones of a streaming
pipeline. The arriving data
@@ -140,7 +140,7 @@ meaning that the timer will fire at the end of the batch.
To sort the data, we store the incoming rows inside
a
[ListState](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/state/ListState.html)
and sort them at output time, when the timer fires in
-the
[onTimer()](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html#onTimer-long-org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext-org.apache.flink.util.Collector-)
+the
[onTimer()](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html#onTimer-long-org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext-org.apache.flink.util.Collector-)
callback.
Another thing: to be able to use Flink state, we need to key the datastream
beforehand, even if
@@ -164,7 +164,7 @@ an
[Encoder](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java
. But the resulting code is very similar to the one using the DataSet API.
It's only
that
[Encoder#encode()](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html#encode-IN-java.io.OutputStream-)
method writes bytes
-when
[TextOutputFormat.TextFormatter#format()](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/TextOutputFormat.TextFormatter.html#format-IN-)
+when
[TextOutputFormat.TextFormatter#format()](https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/io/TextOutputFormat.TextFormatter.html#format-IN-)
wrote Strings.
## Conclusion