WencongLiu commented on code in PR #23362:
URL: https://github.com/apache/flink/pull/23362#discussion_r1357720771


##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,699 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+bookToc: false
+aliases:
+  - /dev/dataset_migration.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+{{< tabs executionenv >}}
+{{< tab "DataSet">}}
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+As the source of DataSet is always bounded, the execution mode is suggested to 
be set to RuntimeMode.BATCH to allow Flink to apply
+additional optimizations for batch processing.
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Implement the DataSet API by DataStream
+
+### Category 1
+
+For Category 1, the usage of the API in DataStream is almost identical to that 
in DataSet. This means that implementing these 
+DataSet APIs by the DataStream API is relatively straightforward and does not 
require significant modifications or complexity
+in the job code.
+
+#### Map
+
+{{< tabs mapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### FlatMap
+
+{{< tabs flatmapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Filter
+
+{{< tabs filterfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Union
+
+{{< tabs unionfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet1.union(dataSet2);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream1.union(dataStream2);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### Rebalance
+
+{{< tabs rebalancefunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.rebalance();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.rebalance();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Grouped DataSet
+
+{{< tabs reducegroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> dataSet = // [...]
+dataSet.groupBy(value -> value.f0)
+        .reduce(new ReduceFunction(){

Review Comment:
   I'd prefer to omit the argument symbol of all anonymous classes that 
implement the operator function. This will make the example cleaner. 🤔



##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,699 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+bookToc: false
+aliases:
+  - /dev/dataset_migration.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+{{< tabs executionenv >}}
+{{< tab "DataSet">}}
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+As the source of DataSet is always bounded, the execution mode is suggested to 
be set to RuntimeMode.BATCH to allow Flink to apply
+additional optimizations for batch processing.
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Implement the DataSet API by DataStream
+
+### Category 1
+
+For Category 1, the usage of the API in DataStream is almost identical to that 
in DataSet. This means that implementing these 
+DataSet APIs by the DataStream API is relatively straightforward and does not 
require significant modifications or complexity
+in the job code.
+
+#### Map
+
+{{< tabs mapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### FlatMap
+
+{{< tabs flatmapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Filter
+
+{{< tabs filterfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Union
+
+{{< tabs unionfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet1.union(dataSet2);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream1.union(dataStream2);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### Rebalance
+
+{{< tabs rebalancefunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.rebalance();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.rebalance();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Grouped DataSet
+
+{{< tabs reducegroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> dataSet = // [...]
+dataSet.groupBy(value -> value.f0)
+        .reduce(new ReduceFunction(){

Review Comment:
   I'd prefer to omit the argument symbol of all anonymous classes that 
implement the operator function. This will make the example cleaner. 🤔



##########
docs/content/docs/dev/datastream/dataset_migration.md:
##########
@@ -0,0 +1,699 @@
+---
+title: "How To Migrate From DataSet to DataStream"
+weight: 302
+type: docs
+bookToc: false
+aliases:
+  - /dev/dataset_migration.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# How to Migrate from DataSet to DataStream
+
+The DataSet API has been formally deprecated and will no longer receive active 
maintenance and support. It will be removed in the
+Flink 2.0 version. Flink users are recommended to migrate from the DataSet API 
to the DataStream API, Table API and SQL for their 
+data processing requirements. 
+
+For the most of DataSet APIs, the users can utilize the DataStream API to get 
the same calculation result in the batch jobs. However,
+different DataSet API can be implemented by DataStream API with various 
difference on semantic and behavior. All DataSet APIs can be
+categorized into four types:
+
+Category 1: These DataSet APIs can be implemented by DataStream APIs with same 
semantic and same calculation behavior.
+
+Category 2: These DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior. This will 
+make the job code more complex.
+
+Category 3: These DataSet APIs can be implemented by DataStream APIs with 
different semantic and different calculation behavior. This 
+will involve additional computation and shuffle costs.
+
+Category 4: These DataSet APIs are not supported by DataStream APIs.
+
+The subsequent sections will first introduce how to set the execution 
environment and provide detailed explanations on how to implement 
+each category of DataSet APIs using the DataStream APIs, highlighting the 
specific considerations and challenges associated with each 
+category.
+
+
+## Setting the execution environment
+
+To execute a DataSet pipeline by DataStream API, we should first start by 
replacing ExecutionEnvironment with StreamExecutionEnvironment.
+{{< tabs executionenv >}}
+{{< tab "DataSet">}}
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+As the source of DataSet is always bounded, the execution mode is suggested to 
be set to RuntimeMode.BATCH to allow Flink to apply
+additional optimizations for batch processing.
+```java
+StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+```
+
+## Implement the DataSet API by DataStream
+
+### Category 1
+
+For Category 1, the usage of the API in DataStream is almost identical to that 
in DataSet. This means that implementing these 
+DataSet APIs by the DataStream API is relatively straightforward and does not 
require significant modifications or complexity
+in the job code.
+
+#### Map
+
+{{< tabs mapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.map(new MapFunction(){
+    // implement user-defined map logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### FlatMap
+
+{{< tabs flatmapfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.flatMap(new FlatMapFunction(){
+    // implement user-defined flatmap logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Filter
+
+{{< tabs filterfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.filter(new FilterFunction(){
+    // implement user-defined filter logic
+});
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Union
+
+{{< tabs unionfunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet1.union(dataSet2);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream1.union(dataStream2);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+#### Rebalance
+
+{{< tabs rebalancefunc >}}
+{{< tab "DataSet">}}
+```java
+dataSet.rebalance();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+dataStream.rebalance();
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Grouped DataSet
+
+{{< tabs reducegroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> dataSet = // [...]
+dataSet.groupBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+            // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> dataStream = // [...]
+dataStream.keyBy(value -> value.f0)
+        .reduce(new ReduceFunction(){
+            // implement user-defined reduce logic
+        });
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Aggregate on Grouped DataSet
+
+{{< tabs aggregategroupfunc >}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> dataSet = // [...]
+// compute sum of the second field
+dataSet.groupBy(value -> value.f0).aggregate(SUM, 1);
+// compute min of the second field
+dataSet.groupBy(value -> value.f0).aggregate(MIN, 1);
+// compute max of the second field
+dataSet.groupBy(value -> value.f0).aggregate(MAX, 1);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> dataStream = // [...]
+// compute sum of the second field
+dataStream.keyBy(value -> value.f0).sum(1);
+// compute min of the second field
+dataStream.keyBy(value -> value.f0).min(1);
+// compute max of the second field
+dataStream.keyBy(value -> value.f0).max(1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+
+### Category 2
+
+For category 2, these DataSet APIs can be implemented by DataStream APIs with 
different semantic but same calculation behavior.
+The developers need to adapt their code to accommodate these variations, which 
introduces additional complexity.
+
+#### Project
+
+{{< tabs projectfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple3<Integer, Double, String>> dataSet = // [...]
+dataSet.project(2,0);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple3<Integer, Double, String>> dataStream = // [...]
+dataStream.map(value -> Tuple2.of(value.f2, value.f0));
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Distinct
+
+{{< tabs distinctfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Integer> dataSet = // [...]
+dataSet.distinct();
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Integer> dataStream = // [...]
+DataStream<Integer> output = dataStream
+        .keyBy(value -> value)
+        .reduce((value1, value2) -> value1);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Hash-Partition
+
+{{< tabs hashpartitionfunc>}}
+{{< tab "DataSet">}}
+```java
+DataSet<Tuple2<String, Integer>> dataSet = // [...]
+dataSet.partitionByHash(value -> value.f0);
+```
+{{< /tab >}}
+{{< tab "DataStream">}}
+```java
+DataStream<Tuple2<String, Integer>> dataStream = // [...]
+// partition by the hashcode of key
+dataStream.partitionCustom((key, numSubpartition) -> key.hashCode() % 
numSubpartition, value -> value.f0);
+```
+{{< /tab >}}
+{{< /tabs>}}
+
+#### Reduce on Full DataSet
+
+If developers want to compute data of full datastream, GlobalWindow could be 
used to collect all records of datastream.
+However, a special trigger is also required to trigger the computation of 
GlobalWindow at the end of its inputs. Here is an example 
+code snippet of the trigger.
+
+```java
+public class EOFTrigger extends Trigger<Object, GlobalWindow> {
+
+    private boolean hasRegistered;
+
+    @Override
+    public TriggerResult onElement(
+            Object element, long timestamp, GlobalWindow window, 
TriggerContext ctx) {
+        if (!hasRegistered) {
+            ctx.registerEventTimeTimer(Long.MAX_VALUE);
+            hasRegistered = true;
+        }
+        return TriggerResult.CONTINUE;
+    }
+
+    @Override
+    public TriggerResult onEventTime(long time, GlobalWindow window, 
TriggerContext ctx) {
+        return TriggerResult.FIRE;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to