WencongLiu commented on code in PR #23362: URL: https://github.com/apache/flink/pull/23362#discussion_r1378433008
########## docs/content/docs/dev/datastream/dataset_migration.md: ########## @@ -0,0 +1,778 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +To build the same data processing application, the DataSet APIs can be divided into four categories when migrating them to +DataStream APIs. + +Category 1: These DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. + +Category 2: These DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. This will +make the job code more complex. + +Category 3: These DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior. This +will involve additional computation and I/O costs. + +Category 4: These DataSet APIs are not supported by DataStream APIs. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +To execute a DataSet pipeline by DataStream API, we should first start by replacing `ExecutionEnvironment` with `StreamExecutionEnvironment`. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">DataSet</th> + <th class="text-left">DataStream</th> + </tr> + </thead> + <tbody> + <tr> + <td> + {{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); + {{< /highlight >}} + </td> + </tr> + </tbody> +</table> + +As the source of DataSet is always bounded, the `RuntimeMode` must be set to `RuntimeMode.BATCH` to make Flink execute in batch mode. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">DataSet</th> + <th class="text-left">DataStream</th> + </tr> + </thead> + <tbody> + <tr> + <td> + {{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) + {{< /highlight >}} + </td> + </tr> + </tbody> +</table> + +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">DataSet</th> + <th class="text-left">DataStream</th> + </tr> + </thead> + <tbody> + <tr> + <td> + {{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +// Write to sink function or sink +DataStreamSink<> sink = dataStream.addSink(sinkFunction) +DataStreamSink<> sink = dataStream.sinkTo(sink) +// Write to csv file +DataStreamSink<> sink = dataStream.writeAsCsv(path); +// Write to text file +DataStreamSink<> sink = dataStream.writeAsText(path); + {{< /highlight >}} + </td> + </tr> + </tbody> +</table> + +If you are looking for pre-defined source and sink connectors of DataStream, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}) + +## Migrating DataSet APIs + +### Category 1 + +For Category 1, these DataSet APIs can be migrated to DataStream APIs with same semantic and same processing behavior. This means the migration is +relatively straightforward and does not require significant modifications or complexity in the job code. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Operations</th> + <th class="text-left">DataSet</th> + <th class="text-left">DataStream</th> + </tr> + </thead> + <tbody> + <tr> + <td>Map</td> + <td> + {{< highlight "java" >}} +dataSet.map(new MapFunction<>(){ +// implement user-defined map logic +}); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +dataStream.map(new MapFunction<>(){ +// implement user-defined map logic +}); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>FlatMap</td> + <td> + {{< highlight "java" >}} +dataSet.flatMap(new FlatMapFunction<>(){ +// implement user-defined flatmap logic +}); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +dataStream.flatMap(new FlatMapFunction<>(){ +// implement user-defined flatmap logic +}); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Filter</td> + <td> + {{< highlight "java" >}} +dataSet.filter(new FilterFunction<>(){ +// implement user-defined filter logic +}); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +dataStream.filter(new FilterFunction<>(){ +// implement user-defined filter logic +}); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Union</td> + <td> + {{< highlight "java" >}} +dataSet1.union(dataSet2); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +dataStream1.union(dataStream2); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Rebalance</td> + <td> + {{< highlight "java" >}} +dataSet.rebalance(); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +dataStream.rebalance(); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Project</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple3<>> dataSet = // [...] +dataSet.project(2,0); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple3<>> dataStream = // [...] +dataStream.project(2,0); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Reduce on Grouped DataSet</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple2<>> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Aggregate on Grouped DataSet</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(MAX, 1); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple2<>> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0) + .sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0) + .min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0) + .max(1); + {{< /highlight >}} + </td> + </tr> + </tbody> +</table> + +### Category 2 + +For category 2, these DataSet APIs can be migrated to DataStream APIs with different semantic but same processing behavior. +The different semantic will introduce additional complexity of job codes and require developers more migration efforts. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Operations</th> + <th class="text-left">DataSet</th> + <th class="text-left">DataStream</th> + </tr> + </thead> + <tbody> + <tr> + <td>Distinct</td> + <td> + {{< highlight "java" >}} +DataSet<Integer> dataSet = // [...] +dataSet.distinct(); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Integer> dataStream = // [...] +dataStream.keyBy(value -> value) + .reduce((value1, value2) -> value1); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Hash-Partition</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet = // [...] +dataSet.partitionByHash(value -> value.f0); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple2<>> dataStream = // [...] +// partition by the hashcode of key +dataStream.partitionCustom( + (key, numSubpartition) -> key.hashCode() % numSubpartition, + value -> value.f0); + {{< /highlight >}} + </td> + </tr> + </tbody> +</table> + +Operations on a full DataSet correspond to the global window aggregation in DataStream with a custom `EndOfStreamWindows` +WindowAssigner that invokes the computation of window at the end of the inputs. Below is an example code snippet of +`EndOfStreamWindows` that will be reused in the rest of this document. + +```java +public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> { + private static final long serialVersionUID = 1L; + + private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows(); + + private static final TimeWindow TIME_WINDOW_INSTANCE = + new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); + + private EndOfStreamWindows() {} + + public static EndOfStreamWindows get() { + return INSTANCE; + } + + @Override + public Collection<TimeWindow> assignWindows( + Object element, long timestamp, WindowAssignerContext context) { + return Collections.singletonList(TIME_WINDOW_INSTANCE); + } + + @Override + public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { + return new EndOfStreamTrigger(); + } + + @Override + public String toString() { + return "EndOfStreamWindows()"; + } + + @Override + public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { + return new TimeWindow.Serializer(); + } + + @Override + public boolean isEventTime() { + return true; + } + + @Internal + public static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> { + @Override + public TriggerResult onElement( + Object element, long timestamp, TimeWindow window, TriggerContext ctx) + throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { + return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + } +} +``` + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Operations</th> + <th class="text-left">DataSet</th> + <th class="text-left">DataStream</th> + </tr> + </thead> + <tbody> + <tr> + <td>Reduce on Full DataSet</td> + <td> + {{< highlight "java" >}} +DataSet<String> dataSet = // [...] +dataSet.reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<String> dataStream = // [...] +dataStream.windowAll(EndOfStreamWindows.get()) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Aggregate on Full DataSet</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet = // [...] +// compute sum of the second field +dataSet.aggregate(SUM, 1); +// compute min of the second field +dataSet.aggregate(MIN, 1); +// compute max of the second field +dataSet.aggregate(MAX, 1); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple2<>> dataStream = // [...] +// compute sum of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .sum(1); +// compute min of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .min(1); +// compute max of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .max(1); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>GroupReduce on Full DataSet</td> + <td> + {{< highlight "java" >}} +DataSet<Integer> dataSet = // [...] +dataSet.reduceGroup(new GroupReduceFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Integer> dataStream = // [...] +dataStream.windowAll(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>GroupReduce on Grouped DataSet</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduceGroup(new GroupReduceFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple2<>> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>First-n</td> + <td> + {{< highlight "java" >}} +dataSet.first(n) + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +dataStream.windowAll(EndOfStreamWindows.get()) + .apply(new AllWindowFunction<>(){ + // implement first-n logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Join</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet1 = // [...] +DataSet<Tuple2<>> dataSet2 = // [...] +dataSet1.join(dataSet2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .with(new JoinFunction<>(){ + // implement user-defined join logic + }); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple2<>> dataStream1 = // [...] +DataStream<Tuple2<>> dataStream2 = // [...] +dataStream1.join(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get())) + .apply(new JoinFunction<>(){ + // implement user-defined join logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>CoGroup</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet1 = // [...] +DataSet<Tuple2<>> dataSet2 = // [...] +dataSet1.coGroup(dataSet2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .with(new CoGroupFunction<>(){ + // implement user-defined co group logic + }); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Tuple2<>> dataStream1 = // [...] +DataStream<Tuple2<>> dataStream2 = // [...] +dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get())) + .apply(new CoGroupFunction<>(){ + // implement user-defined co group logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>OuterJoin</td> + <td> + {{< highlight "java" >}} +DataSet<Tuple2<>> dataSet1 = // [...] +DataSet<Tuple2<>> dataSet2 = // [...] +// left outer join +dataSet1.leftOuterJoin(dataSet2) + .where(dataSet1.f0) + .equalTo(dataSet2.f0) + .with(new JoinFunction<>(){ + // implement user-defined left outer join logic + }); +// right outer join +dataSet1.rightOuterJoin(dataSet2) + .where(dataSet1.f0) + .equalTo(dataSet2.f0) + .with(new JoinFunction<>(){ + // implement user-defined right outer join logic + }); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} + DataStream<Tuple2<>> dataStream1 = // [...] + DataStream<Tuple2<>> dataStream2 = // [...] + // left outer join + dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply((leftIterable, rightInterable, collector) -> { + if(!rightInterable.iterator().hasNext()){ + // implement user-defined left outer join logic + } + }); + // right outer join + dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply((leftIterable, rightInterable, collector) -> { + if(!leftIterable.iterator().hasNext()){ + // implement user-defined right outer join logic + } + }); + {{< /highlight >}} + </td> + </tr> + </tbody> +</table> + +### Category 3 + +For category 3, these DataSet APIs can be migrated to DataStream APIs with different semantic and different processing behavior, which will lead to more processing steps. + +To collect records from each subtask, it is necessary to assign a unique subtask ID to each record and group them accordingly within the window. The +following code snippet illustrates how to assign a subtask ID to each record by `AddSubtaskIDMapFunction`, which will be used in the subsequent DataStream examples. + +```java +public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> { + @Override + public Tuple2<String, T> map(T value) { + return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value); + } +} +``` + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Operations</th> + <th class="text-left">DataSet</th> + <th class="text-left">DataStream</th> + </tr> + </thead> + <tbody> + <tr> + <td>MapPartition/SortPartition</td> + <td> + {{< highlight "java" >}} +DataSet<Integer> dataSet = // [...] +// MapPartition +dataSet.mapPartition(new MapPartitionFunction<>(){ + // implement user-defined map partition logic + }); +// SortPartition +dataSet.sortPartition(0, Order.ASCENDING); +dataSet.sortPartition(0, Order.DESCENDING); + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +DataStream<Integer> dataStream = // [...] +// assign subtask ID to all records +DataStream<Tuple2<String, Integer>> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction()); +dataStream1.keyBy(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined map partition or sort partition logic + }); + {{< /highlight >}} + </td> + </tr> + <tr> + <td>Cross</td> + <td> + {{< highlight "java" >}} +DataSet<Integer> dataSet1 = // [...] +DataSet<Integer> dataSet2 = // [...] +// Cross +dataSet1.cross(dataSet2) + .with(new CrossFunction<>(){ + // implement user-defined cross logic + }) + {{< /highlight >}} + </td> + <td> + {{< highlight "java" >}} +// the parallelism of dataStream1 and dataStream2 should be same +DataStream<Integer> dataStream1 = // [...] +DataStream<Integer> dataStream2 = // [...] +DataStream<Tuple2<String, Integer>> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction()); +DataStream<Tuple2<String, Integer>> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction()); +// join the two streams according to the subtask ID +dataStream3.join(dataStream4) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new JoinFunction<>(){ + // implement user-defined cross logic + }) + {{< /highlight >}} + </td> + </tr> + </tbody> +</table> + +### Category 4 + +The following DataSet APIs are not directly supported by DataStream, given that they rely on modifications to Flink's infrastructures. DataStream +has provided low-level APIs like ProcessFunction that exposes more basic building blocks of Flink, and users may use these APIs to +implement custom operators that achieve the same function as the following DataSet APIs. Review Comment: It's hard for the users to achieve the same behaviors by DataStream. Simply informing users that these APIs are not supported is a better approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
