yunfengzhou-hub commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1355943797
########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply Review Comment: Could you please confirm whether it is optional or compulsory to set `RuntimeMode.BATCH`? My concern is that some DataStream operations like `reduce()` displays different behavior in different runtime mode[1], and the behavior is only the same as that in DataSet API when `RuntimeMode.BATCH` is configured. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#important-considerations ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. Review Comment: In my understanding, this document should guarantee that so long as dataset job developers replace the dataset api with the datastream api described in this job, the resulting job's behavior and correctness is unaffected. I'm not sure how to further divide this correctness into "semantics" and "behavior". Given this assumption, I think we should only divide transformations into two types: supported and not supported. What do you think? Overheads like shuffling costs might exist generally in all operators given the different shuffle strategies in DataStream and DataSet, thus categorities 1 and 2 cannot achieve the same performance as well. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream Review Comment: `DataSet.groupBy().combineGroup()`[1] seems not mentioned in this document. Let's compare this document with the legacy DataSet API's document to make sure all DataSet methods have a corresponding migration guideline. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/transformations/#groupcombine-on-a-grouped-dataset ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 Review Comment: As the division of categories might be removed according to previous comments, it might be better to remove these subtitles, and order the following sections according to the order in the legacy DataSet API's document[1]. This can also make this document more friendly to current DataSet users who has already been familiar with DataSet's documents. Another way of organization is to add subtitles like "reduce & aggregation", "join", "partitioning". [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/transformations/ ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html Review Comment: To my understanding, aliases are used for compatibility with previous document structures. Newly added document need not add this property at the beginning. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +#### Map + +{{< tabs mapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### FlatMap + +{{< tabs flatmapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Filter + +{{< tabs filterfunc >}} +{{< tab "DataSet">}} +```java +dataSet.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Union + +{{< tabs unionfunc >}} +{{< tab "DataSet">}} +```java +dataSet1.union(dataSet2); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream1.union(dataStream2); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Rebalance + +{{< tabs rebalancefunc >}} +{{< tab "DataSet">}} +```java +dataSet.rebalance(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.rebalance(); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Grouped DataSet + +{{< tabs reducegroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction(){ Review Comment: nit: `ReduceFunction` has a generic type. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false Review Comment: It might be better to keep bookToc for convenient lookup. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Review Comment: It might be better to also provide alternatives for `ExecutionEnvironment#getLocalEnvironment` and `new CollectionEnvironment()`. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +#### Map + +{{< tabs mapfunc >}} Review Comment: Tabs are usually used to present the Java/Scala/Python/SQL APIs to achieve the same functionality. A user only needs to refer to one of all tabs instead of switching tabs frequently. Given this existing practice, I'm not sure it is proper to use tabs to switch between DataSet and DataStream API. It might be better to display both DataSet and DataStream API simutaneously on the page, in the format of two adjacent paragraphs or two columns in a table. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +#### Map + +{{< tabs mapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### FlatMap + +{{< tabs flatmapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Filter + +{{< tabs filterfunc >}} +{{< tab "DataSet">}} +```java +dataSet.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Union + +{{< tabs unionfunc >}} +{{< tab "DataSet">}} +```java +dataSet1.union(dataSet2); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream1.union(dataStream2); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Rebalance + +{{< tabs rebalancefunc >}} +{{< tab "DataSet">}} +```java +dataSet.rebalance(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.rebalance(); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Grouped DataSet + +{{< tabs reducegroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Aggregate on Grouped DataSet + +{{< tabs aggregategroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0).aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0).aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0).aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0).sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0).min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +### Category 2 + +For category 2, these DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. +The developers need to adapt their code to accommodate these variations, which introduces additional complexity. + +#### Project + +{{< tabs projectfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple3<Integer, Double, String>> dataSet = // [...] +dataSet.project(2,0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple3<Integer, Double, String>> dataStream = // [...] +dataStream.map(value -> Tuple2.of(value.f2, value.f0)); Review Comment: DataStream also has a `project` method. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +#### Map + +{{< tabs mapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### FlatMap + +{{< tabs flatmapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Filter + +{{< tabs filterfunc >}} +{{< tab "DataSet">}} +```java +dataSet.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Union + +{{< tabs unionfunc >}} +{{< tab "DataSet">}} +```java +dataSet1.union(dataSet2); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream1.union(dataStream2); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Rebalance + +{{< tabs rebalancefunc >}} +{{< tab "DataSet">}} +```java +dataSet.rebalance(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.rebalance(); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Grouped DataSet + +{{< tabs reducegroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Aggregate on Grouped DataSet + +{{< tabs aggregategroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0).aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0).aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0).aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0).sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0).min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +### Category 2 + +For category 2, these DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. +The developers need to adapt their code to accommodate these variations, which introduces additional complexity. + +#### Project + +{{< tabs projectfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple3<Integer, Double, String>> dataSet = // [...] +dataSet.project(2,0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple3<Integer, Double, String>> dataStream = // [...] +dataStream.map(value -> Tuple2.of(value.f2, value.f0)); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Distinct + +{{< tabs distinctfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +dataSet.distinct(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +DataStream<Integer> output = dataStream + .keyBy(value -> value) + .reduce((value1, value2) -> value1); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Hash-Partition + +{{< tabs hashpartitionfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.partitionByHash(value -> value.f0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// partition by the hashcode of key +dataStream.partitionCustom((key, numSubpartition) -> key.hashCode() % numSubpartition, value -> value.f0); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Full DataSet + +If developers want to compute data of full datastream, GlobalWindow could be used to collect all records of datastream. +However, a special trigger is also required to trigger the computation of GlobalWindow at the end of its inputs. Here is an example +code snippet of the trigger. + +```java +public class EOFTrigger extends Trigger<Object, GlobalWindow> { + + private boolean hasRegistered; + + @Override + public TriggerResult onElement( + Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { + if (!hasRegistered) { + ctx.registerEventTimeTimer(Long.MAX_VALUE); + hasRegistered = true; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.FIRE; Review Comment: `TriggerResult.FIRE_AND_PURGE` might be better. If a Flink job has ever failed over, the trigger might have multiple `Long.MAX_VALUE` timers registered. Given that `TriggerResult.FIRE` does not clear input records after emitting the window aggregation result, multiple results with the same value might be emitted in this case. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream Review Comment: It might be better to also add guidelines for adding source and sinks. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +#### Map + +{{< tabs mapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### FlatMap + +{{< tabs flatmapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Filter + +{{< tabs filterfunc >}} +{{< tab "DataSet">}} +```java +dataSet.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Union + +{{< tabs unionfunc >}} +{{< tab "DataSet">}} +```java +dataSet1.union(dataSet2); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream1.union(dataStream2); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Rebalance + +{{< tabs rebalancefunc >}} +{{< tab "DataSet">}} +```java +dataSet.rebalance(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.rebalance(); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Grouped DataSet + +{{< tabs reducegroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Aggregate on Grouped DataSet + +{{< tabs aggregategroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0).aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0).aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0).aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0).sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0).min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +### Category 2 + +For category 2, these DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. +The developers need to adapt their code to accommodate these variations, which introduces additional complexity. + +#### Project + +{{< tabs projectfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple3<Integer, Double, String>> dataSet = // [...] +dataSet.project(2,0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple3<Integer, Double, String>> dataStream = // [...] +dataStream.map(value -> Tuple2.of(value.f2, value.f0)); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Distinct + +{{< tabs distinctfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +dataSet.distinct(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +DataStream<Integer> output = dataStream + .keyBy(value -> value) + .reduce((value1, value2) -> value1); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Hash-Partition + +{{< tabs hashpartitionfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.partitionByHash(value -> value.f0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// partition by the hashcode of key +dataStream.partitionCustom((key, numSubpartition) -> key.hashCode() % numSubpartition, value -> value.f0); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Full DataSet + +If developers want to compute data of full datastream, GlobalWindow could be used to collect all records of datastream. +However, a special trigger is also required to trigger the computation of GlobalWindow at the end of its inputs. Here is an example +code snippet of the trigger. + +```java +public class EOFTrigger extends Trigger<Object, GlobalWindow> { + + private boolean hasRegistered; + + @Override + public TriggerResult onElement( + Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { + if (!hasRegistered) { + ctx.registerEventTimeTimer(Long.MAX_VALUE); + hasRegistered = true; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.FIRE; + } + + @Override + public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} +} +``` +Then the reduce operation on full datastream could be performed by EOFTrigger. + +{{< tabs reducefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<String> dataSet = // [...] +dataSet.reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<String> dataStream = // [...] +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Aggregate on Full DataSet + +The aggregate on full datastream could also be performed by EOFTrigger. + +{{< tabs aggregatefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<Integer, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.aggregate(SUM, 1); +// compute min of the second field +dataSet.aggregate(MIN, 1); +// compute max of the second field +dataSet.aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<Integer, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).sum(1); +// compute min of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).min(1); +// compute max of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### GroupReduce on Full DataSet + +The grpup reduce on full datastream could also be performed by EOFTrigger. + +{{< tabs groupreducefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +dataSet.reduceGroup(new GroupReduceFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream); +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()) + .apply(new WindowFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +### Category 3 + +For category 3, these DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. Additional +calculation steps will be added. + +To collect records from each key, the DataStream assigns a same timestamp to all records and uses a fixed-length time window to gather them. +However, this introduces additional computational costs due to timestamp processing. + +Here is an example code snippet. The function assignSameTimestamp is used to explain the detailed behavior and will be utilized in the +subsequent sections: +```java +// assign a same timestamp to all records +void assignSameTimestamp(DataStream<Tuple2<String, Integer>> dataStream) { + dataStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> 0)); +} +``` + +To collect records from each subtask, every record needs to be assigned a unique subtask ID and grouped accordingly within the window. +This additional step of assigning the subtask ID and performing a groupby operation introduces shuffle costs. Here is an example code +snippet showing how to assign a subtask ID to each record. The function assignSubtaskID is used to explain the detailed behavior and will +be utilized in the subsequent sections: +```java +// assign subtask ID to all records +DataStream<Tuple2<String, Integer>> assignSubtaskID(DataStream<Integer> dataStream) { + return dataStream.map(new RichMapFunction<Integer, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> map(Integer value) { + return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value); + } + }); +} +``` + +#### MapPartition/SortPartition + +{{< tabs mapsortpartitionfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +// MapPartition +dataSet.mapPartition(new MapPartitionFunction(){ + // implement user-defined map partition logic + }); +// SortPartition +dataSet.sortPartition(0, Order.ASCENDING); +dataSet.sortPartition(0, Order.DESCENDING) +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +// assign subtask ID to all records +DataStream<Tuple2<String, Integer>> dataStream1 = assignSubtaskID(dataStream); +// assign a same timestamp to all records +assignSameTimestamp(dataStream1); +dataStream1.keyBy(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new WindowFunction(){ + // implement user-defined map partition or sort partition logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### GroupReduce on Grouped DataSet + +{{< tabs groupreducefunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<Integer, String>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduceGroup(new GroupReduceFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream); +dataStream.keyBy(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new WindowFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Join + +{{< tabs joinfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet1 = // [...] +DataSet<Tuple2<String, Integer>> dataSet2 = // [...] +dataSet1.join(dataSet2) + .where(data -> data.f0) + .equalTo(data -> data.f0) + .with(new JoinFunction(){ + // implement user-defined join logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream1 = // [...] +DataStream<Tuple2<String, Integer>> dataStream2 = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream1); +assignSameTimestamp(dataStream2); +dataStream1.join(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new JoinFunction(){ + // implement user-defined join logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +The Join operator can be efficiently implemented using the Table object in the Table API. The Table object allows seamless +conversion from a DataStream. Once the join computation on the Table is completed, the Table object can be converted back to +a DataStream. Here is an example code snippet: +```java +DataStream<Tuple2<String, Integer>> dataStream1 = // [...] +DataStream<Tuple2<String, Integer>> dataStream2 = // [...] +Table inputTable1 = tableEnv.fromDataStream(dataStream1, $("t1.f0"), $("t1.f1")); +Table inputTable2 = tableEnv.fromDataStream(dataStream2, $("t2.f0"), $("t2.f1")); +Table result = inputTable1.join(inputTable2) + .where($("t1.f0").isEqual($("t2.f0"))) + .select($("*")); +// Convert Table to DataStream +DataStream<Row> dataStream = tableEnv.toDataStream(result.select($("*"))); +``` + +#### Cross + +To implement the Cross operation between two DataStreams, datastream1 and datastream2, the parallism of two datastreams should be +same and then follow these steps: + +1. Broadcast datastream1 to a map operator. This ensures that each subtask in the map operator contains all the records of datastream1. +Each record will also be assigned a subtask ID in map operator. + +2. Assign a subtask ID to each record in datastream2. + +3. Finally, assign the same timestamp to the datastreams above and join them based on the subtask ID within a fixed-length time window. + +{{< tabs crossfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet1 = // [...] +DataSet<Integer> dataSet2 = // [...] +// Cross +dataSet1.cross(dataSet2) + .with(new CrossFunction(){ + // implement user-defined cross logic + }) +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream1 = // [...] +DataStream<Integer> dataStream2 = // [...] +DataStream<Tuple2<String, Integer>> datastream3 = assignSubtaskID(dataStream1.broadcast()); +DataStream<Tuple2<String, Integer>> datastream4 = assignSubtaskID(dataStream2); +// assign the same timestamp +assignSameTimestamp(datastream3); +assignSameTimestamp(datastream4); +dataStream3.join(dataStream4) + // join the two streams according to the subtask ID + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new JoinFunction(){ + // implement user-defined cross logic + }) +``` +{{< /tab >}} +{{< /tabs>}} + +#### CoGroup + +{{< tabs cogroupfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet1 = // [...] +DataSet<Tuple2<String, Integer>> dataSet2 = // [...] +dataSet1.coGroup(dataSet2).where(value -> value.f0) + .equalTo(value -> value.f0) + .with(new CoGroupFunction(){ + // implement user-defined co group logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream1 = // [...] +DataStream<Tuple2<String, Integer>> dataStream2 = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream1); +assignSameTimestamp(dataStream2); +dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new CoGroupFunction(){ + // implement user-defined co group logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### OuterJoin + +To implement the OuterJoin operation between two DataStreams, datastream1 and datastream2, follow these steps: + +1. Assign the same timestamp to the two input datastreams + +2. Co-group the two idatastreams in a fixed length event time window to collect elements with the specific key of each datastream. + +3. Finally, implement left/right outer join in the user-defined CoGroupFunction. + +{{< tabs outerjoinfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet1 = // [...] +DataSet<Tuple2<String, Integer>> dataSet2 = // [...] +dataSet1.leftOuterJoin(dataSet2).where(dataSet1.f0) + .equalTo(dataSet2.f0) + .with(new JoinFunction(){ + // implement user-defined left outer join logic + }); +dataSet1.leftOuterJoin(dataSet2).where(input1.f0) + .equalTo(input2.f0) + .with(new JoinFunction(){ + // implement user-defined right outer join logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream1 = // [...] +DataStream<Tuple2<String, Integer>> dataStream2 = // [...] +assignSameTimestamp(dataStream1); +assignSameTimestamp(dataStream2); +// left outer join +dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply((leftIterable, rightInterable, collector) -> { + if(!rightInterable.iterator().hasNext()){ + // implement user-defined left outer join logic + } + }); +// right outer join +dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply((leftIterable, rightInterable, collector) -> { + if(!leftIterable.iterator().hasNext()){ + // implement user-defined right outer join logic + } + }); +``` +{{< /tab >}} +{{< /tabs>}} + +The OuterJoin operator can be efficiently implemented using the Table object in the Table API. The Table object allows seamless +conversion from a DataStream. Once the outer join computation on the Table is completed, the Table object can be converted back +to a DataStream. Here is an example code snippet: +```java +DataStream<Tuple2<String, Integer>> dataStream1 = // [...] +DataStream<Tuple2<String, Integer>> dataStream2 = // [...] +Table inputTable1 = tableEnv.fromDataStream(dataStream1, $("t1.f0"), $("t1.f1")); +Table inputTable2 = tableEnv.fromDataStream(dataStream2, $("t2.f0"), $("t2.f1")); +Table result1 = inputTable1.leftOuterJoin(inputTable2) + .where($("t1.f0").isEqual($("t2.f0"))) + .select($("*")); +Table result2 = inputTable1.rightOuterJoin(inputTable2) + .where($("t1.f0").isEqual($("t2.f0"))) + .select($("*")); +// implement user-defined left outer join logic on the datastream +DataStream<Row> dataStream1 = tableEnv.toDataStream(result1.select($("*"))); +// implement user-defined right outer join logic on the datastream +DataStream<Row> dataStream2 = tableEnv.toDataStream(result2.select($("*"))); +``` + +### Category 4 + +Currently, Range Partition API is not supported by DataStream. In the implementation of Range Partition, the sample operator Review Comment: Theoretically all batch operations can be achieved by implementing a custom StreamOperator that collects all input records and performs the corresponding operation in its `endInput()` method. So this range partition operation could be supported to my understanding, but it would need some workload for operator development. I'm not familiar with the background of this PR. Please confirm whether it is OK not to provide migration guideline for some operations depending on workload, or we should provide guideline for all operations regardless of workload. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +#### Map + +{{< tabs mapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### FlatMap + +{{< tabs flatmapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Filter + +{{< tabs filterfunc >}} +{{< tab "DataSet">}} +```java +dataSet.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Union + +{{< tabs unionfunc >}} +{{< tab "DataSet">}} +```java +dataSet1.union(dataSet2); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream1.union(dataStream2); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Rebalance + +{{< tabs rebalancefunc >}} +{{< tab "DataSet">}} +```java +dataSet.rebalance(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.rebalance(); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Grouped DataSet + +{{< tabs reducegroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Aggregate on Grouped DataSet + +{{< tabs aggregategroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0).aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0).aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0).aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0).sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0).min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +### Category 2 + +For category 2, these DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. +The developers need to adapt their code to accommodate these variations, which introduces additional complexity. + +#### Project + +{{< tabs projectfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple3<Integer, Double, String>> dataSet = // [...] +dataSet.project(2,0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple3<Integer, Double, String>> dataStream = // [...] +dataStream.map(value -> Tuple2.of(value.f2, value.f0)); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Distinct + +{{< tabs distinctfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +dataSet.distinct(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +DataStream<Integer> output = dataStream + .keyBy(value -> value) + .reduce((value1, value2) -> value1); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Hash-Partition + +{{< tabs hashpartitionfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.partitionByHash(value -> value.f0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// partition by the hashcode of key +dataStream.partitionCustom((key, numSubpartition) -> key.hashCode() % numSubpartition, value -> value.f0); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Full DataSet + +If developers want to compute data of full datastream, GlobalWindow could be used to collect all records of datastream. +However, a special trigger is also required to trigger the computation of GlobalWindow at the end of its inputs. Here is an example +code snippet of the trigger. + +```java +public class EOFTrigger extends Trigger<Object, GlobalWindow> { + + private boolean hasRegistered; + + @Override + public TriggerResult onElement( + Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { + if (!hasRegistered) { + ctx.registerEventTimeTimer(Long.MAX_VALUE); + hasRegistered = true; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.FIRE; + } + + @Override + public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} +} +``` +Then the reduce operation on full datastream could be performed by EOFTrigger. + +{{< tabs reducefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<String> dataSet = // [...] +dataSet.reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<String> dataStream = // [...] +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Aggregate on Full DataSet + +The aggregate on full datastream could also be performed by EOFTrigger. + +{{< tabs aggregatefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<Integer, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.aggregate(SUM, 1); +// compute min of the second field +dataSet.aggregate(MIN, 1); +// compute max of the second field +dataSet.aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<Integer, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).sum(1); +// compute min of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).min(1); +// compute max of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### GroupReduce on Full DataSet + +The grpup reduce on full datastream could also be performed by EOFTrigger. + +{{< tabs groupreducefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +dataSet.reduceGroup(new GroupReduceFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream); +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()) + .apply(new WindowFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +### Category 3 + +For category 3, these DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. Additional +calculation steps will be added. + +To collect records from each key, the DataStream assigns a same timestamp to all records and uses a fixed-length time window to gather them. +However, this introduces additional computational costs due to timestamp processing. + +Here is an example code snippet. The function assignSameTimestamp is used to explain the detailed behavior and will be utilized in the +subsequent sections: +```java +// assign a same timestamp to all records +void assignSameTimestamp(DataStream<Tuple2<String, Integer>> dataStream) { + dataStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> 0)); +} +``` + +To collect records from each subtask, every record needs to be assigned a unique subtask ID and grouped accordingly within the window. +This additional step of assigning the subtask ID and performing a groupby operation introduces shuffle costs. Here is an example code +snippet showing how to assign a subtask ID to each record. The function assignSubtaskID is used to explain the detailed behavior and will +be utilized in the subsequent sections: +```java +// assign subtask ID to all records +DataStream<Tuple2<String, Integer>> assignSubtaskID(DataStream<Integer> dataStream) { + return dataStream.map(new RichMapFunction<Integer, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> map(Integer value) { + return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value); + } + }); +} +``` + +#### MapPartition/SortPartition + +{{< tabs mapsortpartitionfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +// MapPartition +dataSet.mapPartition(new MapPartitionFunction(){ + // implement user-defined map partition logic + }); +// SortPartition +dataSet.sortPartition(0, Order.ASCENDING); +dataSet.sortPartition(0, Order.DESCENDING) +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +// assign subtask ID to all records +DataStream<Tuple2<String, Integer>> dataStream1 = assignSubtaskID(dataStream); +// assign a same timestamp to all records +assignSameTimestamp(dataStream1); +dataStream1.keyBy(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) Review Comment: Would it be better to reuse the `EOFTrigger` above here? In this way the timestamp of stream records need not be changed. ########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,699 @@ +--- +title: "How To Migrate From DataSet to DataStream" +weight: 302 +type: docs +bookToc: false +aliases: + - /dev/dataset_migration.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +For the most of DataSet APIs, the users can utilize the DataStream API to get the same calculation result in the batch jobs. However, +different DataSet API can be implemented by DataStream API with various difference on semantic and behavior. All DataSet APIs can be +categorized into four types: + +Category 1: These DataSet APIs can be implemented by DataStream APIs with same semantic and same calculation behavior. + +Category 2: These DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. This +will involve additional computation and shuffle costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and provide detailed explanations on how to implement +each category of DataSet APIs using the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing ExecutionEnvironment with StreamExecutionEnvironment. +{{< tabs executionenv >}} +{{< tab "DataSet">}} +```java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +``` +{{< /tab >}} +{{< /tabs>}} + +As the source of DataSet is always bounded, the execution mode is suggested to be set to RuntimeMode.BATCH to allow Flink to apply +additional optimizations for batch processing. +```java +StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Implement the DataSet API by DataStream + +### Category 1 + +For Category 1, the usage of the API in DataStream is almost identical to that in DataSet. This means that implementing these +DataSet APIs by the DataStream API is relatively straightforward and does not require significant modifications or complexity +in the job code. + +#### Map + +{{< tabs mapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.map(new MapFunction(){ + // implement user-defined map logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### FlatMap + +{{< tabs flatmapfunc >}} +{{< tab "DataSet">}} +```java +dataSet.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.flatMap(new FlatMapFunction(){ + // implement user-defined flatmap logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Filter + +{{< tabs filterfunc >}} +{{< tab "DataSet">}} +```java +dataSet.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.filter(new FilterFunction(){ + // implement user-defined filter logic +}); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Union + +{{< tabs unionfunc >}} +{{< tab "DataSet">}} +```java +dataSet1.union(dataSet2); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream1.union(dataStream2); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Rebalance + +{{< tabs rebalancefunc >}} +{{< tab "DataSet">}} +```java +dataSet.rebalance(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +dataStream.rebalance(); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Grouped DataSet + +{{< tabs reducegroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Aggregate on Grouped DataSet + +{{< tabs aggregategroupfunc >}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0).aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0).aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0).aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0).sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0).min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +### Category 2 + +For category 2, these DataSet APIs can be implemented by DataStream APIs with different semantic but same calculation behavior. +The developers need to adapt their code to accommodate these variations, which introduces additional complexity. + +#### Project + +{{< tabs projectfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple3<Integer, Double, String>> dataSet = // [...] +dataSet.project(2,0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple3<Integer, Double, String>> dataStream = // [...] +dataStream.map(value -> Tuple2.of(value.f2, value.f0)); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Distinct + +{{< tabs distinctfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +dataSet.distinct(); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +DataStream<Integer> output = dataStream + .keyBy(value -> value) + .reduce((value1, value2) -> value1); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Hash-Partition + +{{< tabs hashpartitionfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet = // [...] +dataSet.partitionByHash(value -> value.f0); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// partition by the hashcode of key +dataStream.partitionCustom((key, numSubpartition) -> key.hashCode() % numSubpartition, value -> value.f0); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Reduce on Full DataSet + +If developers want to compute data of full datastream, GlobalWindow could be used to collect all records of datastream. +However, a special trigger is also required to trigger the computation of GlobalWindow at the end of its inputs. Here is an example +code snippet of the trigger. + +```java +public class EOFTrigger extends Trigger<Object, GlobalWindow> { + + private boolean hasRegistered; + + @Override + public TriggerResult onElement( + Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { + if (!hasRegistered) { + ctx.registerEventTimeTimer(Long.MAX_VALUE); + hasRegistered = true; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.FIRE; + } + + @Override + public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} +} +``` +Then the reduce operation on full datastream could be performed by EOFTrigger. + +{{< tabs reducefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<String> dataSet = // [...] +dataSet.reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<String> dataStream = // [...] +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()) + .reduce(new ReduceFunction(){ + // implement user-defined reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### Aggregate on Full DataSet + +The aggregate on full datastream could also be performed by EOFTrigger. + +{{< tabs aggregatefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<Integer, Integer>> dataSet = // [...] +// compute sum of the second field +dataSet.aggregate(SUM, 1); +// compute min of the second field +dataSet.aggregate(MIN, 1); +// compute max of the second field +dataSet.aggregate(MAX, 1); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<Integer, Integer>> dataStream = // [...] +// compute sum of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).sum(1); +// compute min of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).min(1); +// compute max of the second field +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()).max(1); +``` +{{< /tab >}} +{{< /tabs>}} + + +#### GroupReduce on Full DataSet + +The grpup reduce on full datastream could also be performed by EOFTrigger. + +{{< tabs groupreducefullfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +dataSet.reduceGroup(new GroupReduceFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream); +dataStream.windowAll(GlobalWindows.create()).trigger(new EOFTrigger()) + .apply(new WindowFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +### Category 3 + +For category 3, these DataSet APIs can be implemented by DataStream APIs with different semantic and different calculation behavior. Additional +calculation steps will be added. + +To collect records from each key, the DataStream assigns a same timestamp to all records and uses a fixed-length time window to gather them. +However, this introduces additional computational costs due to timestamp processing. + +Here is an example code snippet. The function assignSameTimestamp is used to explain the detailed behavior and will be utilized in the +subsequent sections: +```java +// assign a same timestamp to all records +void assignSameTimestamp(DataStream<Tuple2<String, Integer>> dataStream) { + dataStream.assignTimestampsAndWatermarks( + WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> 0)); +} +``` + +To collect records from each subtask, every record needs to be assigned a unique subtask ID and grouped accordingly within the window. +This additional step of assigning the subtask ID and performing a groupby operation introduces shuffle costs. Here is an example code +snippet showing how to assign a subtask ID to each record. The function assignSubtaskID is used to explain the detailed behavior and will +be utilized in the subsequent sections: +```java +// assign subtask ID to all records +DataStream<Tuple2<String, Integer>> assignSubtaskID(DataStream<Integer> dataStream) { + return dataStream.map(new RichMapFunction<Integer, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> map(Integer value) { + return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value); + } + }); +} +``` + +#### MapPartition/SortPartition + +{{< tabs mapsortpartitionfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Integer> dataSet = // [...] +// MapPartition +dataSet.mapPartition(new MapPartitionFunction(){ + // implement user-defined map partition logic + }); +// SortPartition +dataSet.sortPartition(0, Order.ASCENDING); +dataSet.sortPartition(0, Order.DESCENDING) +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Integer> dataStream = // [...] +// assign subtask ID to all records +DataStream<Tuple2<String, Integer>> dataStream1 = assignSubtaskID(dataStream); +// assign a same timestamp to all records +assignSameTimestamp(dataStream1); +dataStream1.keyBy(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new WindowFunction(){ + // implement user-defined map partition or sort partition logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### GroupReduce on Grouped DataSet + +{{< tabs groupreducefunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<Integer, String>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduceGroup(new GroupReduceFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream); +dataStream.keyBy(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new WindowFunction(){ + // implement user-defined group reduce logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +#### Join + +{{< tabs joinfunc>}} +{{< tab "DataSet">}} +```java +DataSet<Tuple2<String, Integer>> dataSet1 = // [...] +DataSet<Tuple2<String, Integer>> dataSet2 = // [...] +dataSet1.join(dataSet2) + .where(data -> data.f0) + .equalTo(data -> data.f0) + .with(new JoinFunction(){ + // implement user-defined join logic + }); +``` +{{< /tab >}} +{{< tab "DataStream">}} +```java +DataStream<Tuple2<String, Integer>> dataStream1 = // [...] +DataStream<Tuple2<String, Integer>> dataStream2 = // [...] +// assign a same timestamp to all records +assignSameTimestamp(dataStream1); +assignSameTimestamp(dataStream2); +dataStream1.join(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .apply(new JoinFunction(){ + // implement user-defined join logic + }); +``` +{{< /tab >}} +{{< /tabs>}} + +The Join operator can be efficiently implemented using the Table object in the Table API. The Table object allows seamless +conversion from a DataStream. Once the join computation on the Table is completed, the Table object can be converted back to +a DataStream. Here is an example code snippet: Review Comment: Table & DataStream conversion might bring loss in data precision and time semantics. For example, refer to the following document for how watermark is (partially) preserved during table/datastream conversion. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
