echauchot commented on code in PR #631:
URL: https://github.com/apache/flink-web/pull/631#discussion_r1172266323


##########
docs/content/posts/2023-04-12-howto-migrate-to-datastream.md:
##########
@@ -0,0 +1,190 @@
+---
+title:  "Howto migrate a real-life batch pipeline from the DataSet API to the 
DataStream API"
+date: "2023-04-12T08:00:00.000Z"
+authors:
+
+- echauchot:
+  name: "Etienne Chauchot"
+  twitter: "echauchot"
+  aliases:
+- /2023/04/12/2023-04-12-howto-migrate-to-datastream.html
+
+---
+
+## Introduction
+
+The Flink community has been deprecating the DataSet API since version 1.12 as 
part of the work on
+[FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the 
DataSet 
API)](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741)
+.
+This blog article illustrates the migration of a real-life batch DataSet 
pipeline to a batch
+DataStream pipeline.
+All the code presented in this article is available in
+the [tpcds-benchmark-flink 
repo](https://github.com/echauchot/tpcds-benchmark-flink).
+The use case shown here is extracted from a broader work comparing Flink 
performances of different
+APIs
+by implementing [TPCDS](https://www.tpc.org/tpcds/) queries using these APIs.
+
+## What is TPCDS?
+
+TPC-DS is a decision support benchmark that models several generally 
applicable aspects of a
+decision support system. The purpose of TPCDS benchmarks is to provide 
relevant, objective
+performance data of Big Data engines to industry users.
+
+## Chosen TPCDS query
+
+The chosen query for this article is **Query3**  because it contains all the 
more common analytics
+operators (filter, join, aggregation, group by, order by, limit). It 
represents an analytic query on
+store sales. Its SQL code is presented here:
+
+`SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand 
brand,SUM(ss_ext_sales_price) sum_agg
+FROM  date_dim dt, store_sales, item
+WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
+AND store_sales.ss_item_sk = item.i_item_sk
+AND item.i_manufact_id = 128
+AND dt.d_moy=11
+GROUP BY dt.d_year, item.i_brand, item.i_brand_id
+ORDER BY dt.d_year, sum_agg desc, brand_id
+LIMIT 100`
+
+## The initial DataSet pipeline
+
+The pipeline we are migrating
+is [this 
one](https://github.com/echauchot/tpcds-benchmark-flink/blob/master/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDataset.java)
+, it is a batch pipeline that implements the above query using the DataSet API
+and 
[Row](https://javadoc.io/static/org.apache.flink/flink-core/1.16.0/org/apache/flink/types/Row.html)
+as dataset element type.
+
+## Migrating the DataSet pipeline to a DataStream pipeline
+
+There is no point in going through all of the code which is
+available 
[here](https://github.com/echauchot/tpcds-benchmark-flink/blob/master/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java)
+. We will rather focus on some key areas of the migration. The code is based 
on the latest release
+of Flink at the time this article was written: version 1.16.0.
+
+DataStream is a unified API that allows to run pipelines in both batch and 
streaming modes. To
+execute a DataStream pipeline in batch mode, it is not enough to set the 
execution mode in the Flink
+execution environment, it is also needed to migrate some operations. Indeed, 
the DataStream API
+semantics are the ones of a streaming pipeline. The arriving data is thus 
considered infinite. So,
+compared to the DataSet API which operates on finite data, there are 
adaptations to be made on some
+operations.
+
+### [Setting the execution 
environment](https://github.com/echauchot/tpcds-benchmark-flink/blob/master/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L92-L98)
+
+We start by moving
+from 
[ExecutionEnvironment](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/ExecutionEnvironment.html)
+to 
[StreamExecutionEnvironment](https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.0/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html)
+. Then, as the source in this pipeline is bounded, we can use either the 
default
+streaming [execution 
mode](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/)
+or the batch mode. In batch mode the tasks of the job can be separated into 
stages that can be
+executed one after another. In streaming mode all tasks need to be running all 
the time and records
+are sent to downstream tasks as soon as they are available.
+
+Here we keep the default streaming mode that gives good performance on this 
pipeline and that would
+allow to run the same pipeline with no change on an unbounded source.
+
+### Using the streaming sources and datasets
+
+**
+Sources**: 
[DataSource<T>](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/operators/DataSource.html)
+becomes 
[DataStreamSource<T>](https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.0/org/apache/flink/streaming/api/datastream/DataStreamSource.html)
+after the call to _env.createInput()_.
+
+**
+Datasets**: 
[DataSet<T>](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html)
+are
+now 
[DataStream<T>](https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.0/org/apache/flink/streaming/api/datastream/DataStream.html)
+and subclasses.
+
+### [Migrating the join 
operation](https://github.com/echauchot/tpcds-benchmark-flink/blob/master/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L131-L137)
+
+The DataStream join operator does not yet support aggregations in batch mode (
+see [FLINK-22587](https://issues.apache.org/jira/browse/FLINK-22587) for 
details). Basically, the
+problem is with the trigger of the
+default 
[GlobalWindow](https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.0/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html)
+which never fires so the records are never output. We will workaround this 
problem by applying a
+custom 
[EndOfStream](https://github.com/echauchot/tpcds-benchmark-flink/blob/master/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L254-L295)
+window. It is a window assigner that assigns all the records to a
+single 
[TimeWindow](https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.0/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html)
+. So, like for the GlobalWindow, all the records are assigned to the same 
window except that this
+window's trigger is based on the end of the window (which is set to 
_Long.MAX_VALUE_). As we are on
+a bounded source, at some point the watermark will advance to +INFINITY 
(Long.MAX_VALUE) and will
+thus cross the end of the time window and consequently fire the trigger and 
output the records.
+
+Now that we have a working triggering, we just need to call a standard join 
with the  _Row::join_
+function.
+
+### [Migrating the group by and reduce (sum) 
operations](https://github.com/echauchot/tpcds-benchmark-flink/blob/master/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L147-L170)
+
+DataStream API has no
+more 
[groupBy()](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-)
+method, we now use
+the 
[keyBy()](https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.0/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-)
+method. An aggregation downstream will be applied on elements with the same 
key exactly as
+a 
[GroupReduceFunction](https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html)
+would have done on a DataSet unless it will not materialize the collection of 
data. The summing

Review Comment:
   Makes sense. So in my case, as the following operator is a reduce, there 
will be indeed no materialization. Thanks for the details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to