This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2cac88404cda5f6c8e00d74a94ae0881e8b2e3b8 Author: codenohup <[email protected]> AuthorDate: Tue Oct 1 19:46:41 2024 +0800 [FLINK-36336][docs] Remove dataset and scala example in documents --- docs/content.zh/docs/concepts/overview.md | 4 +- docs/content.zh/docs/dev/configuration/overview.md | 3 +- .../docs/dev/datastream/dataset_migration.md | 769 -------------------- docs/content.zh/docs/dev/datastream/overview.md | 181 ----- .../docs/dev/datastream/scala_api_extensions.md | 226 ------ docs/content/docs/concepts/overview.md | 11 +- docs/content/docs/deployment/elastic_scaling.md | 6 +- .../docs/deployment/speculative_execution.md | 5 - docs/content/docs/dev/configuration/overview.md | 3 +- .../docs/dev/datastream/dataset_migration.md | 774 --------------------- .../docs/dev/datastream/execution/parallel.md | 48 -- docs/content/docs/dev/datastream/overview.md | 218 ------ .../docs/dev/datastream/scala_api_extensions.md | 235 ------- docs/static/fig/levels_of_abstraction.png | Bin 0 -> 351948 bytes docs/static/fig/levels_of_abstraction.svg | 193 ----- 15 files changed, 9 insertions(+), 2667 deletions(-) diff --git a/docs/content.zh/docs/concepts/overview.md b/docs/content.zh/docs/concepts/overview.md index 5ab63019fbd..7280a7732e1 100644 --- a/docs/content.zh/docs/concepts/overview.md +++ b/docs/content.zh/docs/concepts/overview.md @@ -41,10 +41,10 @@ Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象 - Flink API 第二层抽象是 **Core APIs**。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 **Core APIs** 进行编程:其中包含 [DataStream API]({{< ref "docs/dev/datastream/overview" >}})(应用于有界/无界数据流场景)。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。 - *Process Function* 这类底层抽象和 *DataStream API* 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。*DataSet API* 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。 + *Process Function* 这类底层抽象和 *DataStream API* 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。 - Flink API 第三层抽象是 **Table API**。**Table API** 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。[Table API]({{< ref "docs/dev/table/overview" >}}) 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义*应执行的逻辑操作*,而不是确切地指定程序*应该执行的代码*。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。 - 表和 *DataStream*/*DataSet* 可以进行无缝切换,Flink 允许用户在编写应用程序时将 *Table API* 与 *DataStream*/*DataSet* API 混合使用。 + 表和 *DataStream* 可以进行无缝切换,Flink 允许用户在编写应用程序时将 *Table API* 与 *DataStream API* 混合使用。 - Flink API 最顶层抽象是 **SQL**。这层抽象在语义和程序表达式上都类似于 *Table API*,但是其程序实现都是 SQL 查询表达式。[SQL]({{< ref "docs/dev/table/overview" >}}#sql) 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 *Table API* 中定义的表上执行。 diff --git a/docs/content.zh/docs/dev/configuration/overview.md b/docs/content.zh/docs/dev/configuration/overview.md index fea5a49539f..42241e5cc80 100644 --- a/docs/content.zh/docs/dev/configuration/overview.md +++ b/docs/content.zh/docs/dev/configuration/overview.md @@ -216,8 +216,7 @@ Flink提供了两大 API:[Datastream API]({{< ref "docs/dev/datastream/overvie | 你要使用的 API | 你需要添加的依赖项 | |-----------------------------------------------------------------------------------|-----------------------------------------------------| -| [DataStream]({{< ref "docs/dev/datastream/overview" >}}) | `flink-streaming-java` | -| [DataStream Scala 版]({{< ref "docs/dev/datastream/scala_api_extensions" >}}) | `flink-streaming-scala{{< scala_version >}}` | +| [DataStream]({{< ref "docs/dev/datastream/overview" >}}) | `flink-streaming-java` | | [Table API]({{< ref "docs/dev/table/common" >}}) | `flink-table-api-java` | | [Table API Scala 版]({{< ref "docs/dev/table/common" >}}) | `flink-table-api-scala{{< scala_version >}}` | | [Table API + DataStream]({{< ref "docs/dev/table/data_stream_api" >}}) | `flink-table-api-java-bridge` | diff --git a/docs/content.zh/docs/dev/datastream/dataset_migration.md b/docs/content.zh/docs/dev/datastream/dataset_migration.md deleted file mode 100644 index 1de2818d5ff..00000000000 --- a/docs/content.zh/docs/dev/datastream/dataset_migration.md +++ /dev/null @@ -1,769 +0,0 @@ ---- -title: "如何迁移 DataSet 到 DataStream" -weight: 302 -type: docs ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -# 如何迁移 DataSet 到 DataStream - -DataSet API 已被正式弃用,并且将不再获得主动的维护和支持,它将在 Flink 2.0 版本被删除。 -建议 Flink 用户从 DataSet API 迁移到 DataStream API、Table API 和 SQL 来满足数据处理需求。 - -请注意,DataStream 中的 API 并不总是与 DataSet 完全匹配。 -本文档的目的是帮助用户理解如何使用 DataStream API 实现与使用 DataSet API 相同的数据处理行为。 - -根据迁移过程中开发和执行效率的变化程度,我们将 DataSet API 分为四类: - -- 第一类:在 DataStream 中具有完全相同的 API,几乎不需要任何更改即可迁移; - -- 第二类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码,但仍保持相同的执行效率; - -- 第三类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本; - -- 第四类:其行为不被 DataStream API 支持。 - -后续章节将首先介绍如何设置执行环境和 source/sink ,然后详细解释每种类别的 DataSet API 如何迁移到 DataStream API,强调与每个类别迁移过程中相关的考虑因素和面临的挑战。 - - -## 设置执行环境 - -将应用程序从 DataSet API 迁移到 DataStream API 的第一步是将 `ExecutionEnvironment` 替换为 `StreamExecutionEnvironment`。 - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td> - {{< highlight "java" >}} -// 创建执行环境 -ExecutionEnvironment.getExecutionEnvironment(); -// 创建本地执行环境 -ExecutionEnvironment.createLocalEnvironment(); -// 创建 collection 环境 -new CollectionEnvironment(); -// 创建远程执行环境 -ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// 创建执行环境 -StreamExecutionEnvironment.getExecutionEnvironment(); -// 创建本地执行环境 -StreamExecutionEnvironment.createLocalEnvironment(); -// 不支持 collection 环境 -// 创建远程执行环境 -StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。 - -如果需要的话,用户可以显式地将执行模式设置为 `RuntimeExecutionMode.BATCH`。 - -```java -StreamExecutionEnvironment executionEnvironment = // [...]; -executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); -``` - -## 设置 streaming 类型的 Source 和 Sink - -### Sources - -DataStream API 使用 `DataStreamSource` 从外部系统读取记录,而 DataSet API 使用 `DataSource`。 - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td> - {{< highlight "java" >}} -// Read data from file -DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); -// Read data from collection -DataSource<> source = ExecutionEnvironment.fromCollection(data); -// Read data from inputformat -DataSource<> source = ExecutionEnvironment.createInput(inputFormat) - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// Read data from file -DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); -// Read data from collection -DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); -// Read data from inputformat -DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### Sinks - -DataStream API 使用 `DataStreamSink` 将记录写入外部系统,而 DataSet API 使用 `DataSink`。 - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td> - {{< highlight "java" >}} -// Write to outputformat -DataSink<> sink = dataSet.output(outputFormat); -// Write to csv file -DataSink<> sink = dataSet.writeAsCsv(filePath); -// Write to text file -DataSink<> sink = dataSet.writeAsText(filePath); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// Write to sink -DataStreamSink<> sink = dataStream.sinkTo(sink) -// Write to csv file -DataStreamSink<> sink = dataStream.writeAsCsv(path); -// Write to text file -DataStreamSink<> sink = dataStream.writeAsText(path); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -如果您正在寻找 DataStream 预定义的连接器,请查看[连接器]({{< ref "docs/connectors/datastream/overview" >}})。 - -## 迁移 DataSet APIs - -### 第一类 - -对于第一类,这些 DataSet API 在 DataStream 中具有完全相同的功能,几乎不需要任何更改即可迁移。 - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Operations</th> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td>Map</td> - <td> - {{< highlight "java" >}} -dataSet.map(new MapFunction<>(){ -// implement user-defined map logic -}); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.map(new MapFunction<>(){ -// implement user-defined map logic -}); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>FlatMap</td> - <td> - {{< highlight "java" >}} -dataSet.flatMap(new FlatMapFunction<>(){ -// implement user-defined flatmap logic -}); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.flatMap(new FlatMapFunction<>(){ -// implement user-defined flatmap logic -}); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Filter</td> - <td> - {{< highlight "java" >}} -dataSet.filter(new FilterFunction<>(){ -// implement user-defined filter logic -}); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.filter(new FilterFunction<>(){ -// implement user-defined filter logic -}); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Union</td> - <td> - {{< highlight "java" >}} -dataSet1.union(dataSet2); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream1.union(dataStream2); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Rebalance</td> - <td> - {{< highlight "java" >}} -dataSet.rebalance(); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.rebalance(); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Project</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple3<>> dataSet = // [...] -dataSet.project(2,0); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple3<>> dataStream = // [...] -dataStream.project(2,0); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Reduce on Grouped DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -dataSet.groupBy(value -> value.f0) - .reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -dataStream.keyBy(value -> value.f0) - .reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Aggregate on Grouped DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -// compute sum of the second field -dataSet.groupBy(value -> value.f0) - .aggregate(SUM, 1); -// compute min of the second field -dataSet.groupBy(value -> value.f0) - .aggregate(MIN, 1); -// compute max of the second field -dataSet.groupBy(value -> value.f0) - .aggregate(MAX, 1); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -// compute sum of the second field -dataStream.keyBy(value -> value.f0) - .sum(1); -// compute min of the second field -dataStream.keyBy(value -> value.f0) - .min(1); -// compute max of the second field -dataStream.keyBy(value -> value.f0) - .max(1); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### 第二类 - -对于第二类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码来进行迁移,但仍保持相同的执行效率。 - -DataSet 中存在对整个 DataSet 进行操作的 API。这些 API 在 DataStream 中可以用一个全局窗口来实现,该全局窗口只会在输入数据结束时触发窗口内数据的计算。 -[附录]({{< ref "docs/dev/datastream/dataset_migration#endofstreamwindows" >}})中的 `EndOfStreamWindows` 显示了如何实现这样的窗口,我们将在本文档的其余部分重复使用它。 - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Operations</th> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td>Distinct</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet = // [...] -dataSet.distinct(); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Integer> dataStream = // [...] -dataStream.keyBy(value -> value) - .reduce((value1, value2) -> value1); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Hash-Partition</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -dataSet.partitionByHash(value -> value.f0); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -// partition by the hashcode of key -dataStream.partitionCustom( - (key, numSubpartition) -> key.hashCode() % numSubpartition, - value -> value.f0); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Reduce on Full DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<String> dataSet = // [...] -dataSet.reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<String> dataStream = // [...] -dataStream.windowAll(EndOfStreamWindows.get()) - .reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Aggregate on Full DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -// compute sum of the second field -dataSet.aggregate(SUM, 1); -// compute min of the second field -dataSet.aggregate(MIN, 1); -// compute max of the second field -dataSet.aggregate(MAX, 1); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -// compute sum of the second field -dataStream.windowAll(EndOfStreamWindows.get()) - .sum(1); -// compute min of the second field -dataStream.windowAll(EndOfStreamWindows.get()) - .min(1); -// compute max of the second field -dataStream.windowAll(EndOfStreamWindows.get()) - .max(1); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>GroupReduce on Full DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet = // [...] -dataSet.reduceGroup(new GroupReduceFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Integer> dataStream = // [...] -dataStream.windowAll(EndOfStreamWindows.get()) - .apply(new WindowFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>GroupReduce on Grouped DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -dataSet.groupBy(value -> value.f0) - .reduceGroup(new GroupReduceFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -dataStream.keyBy(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply(new WindowFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>First-n</td> - <td> - {{< highlight "java" >}} -dataSet.first(n) - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.windowAll(EndOfStreamWindows.get()) - .apply(new AllWindowFunction<>(){ - // implement first-n logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Join</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet1 = // [...] -DataSet<Tuple2<>> dataSet2 = // [...] -dataSet1.join(dataSet2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .with(new JoinFunction<>(){ - // implement user-defined join logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream1 = // [...] -DataStream<Tuple2<>> dataStream2 = // [...] -dataStream1.join(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get())) - .apply(new JoinFunction<>(){ - // implement user-defined join logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>CoGroup</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet1 = // [...] -DataSet<Tuple2<>> dataSet2 = // [...] -dataSet1.coGroup(dataSet2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .with(new CoGroupFunction<>(){ - // implement user-defined co group logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream1 = // [...] -DataStream<Tuple2<>> dataStream2 = // [...] -dataStream1.coGroup(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get())) - .apply(new CoGroupFunction<>(){ - // implement user-defined co group logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>OuterJoin</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet1 = // [...] -DataSet<Tuple2<>> dataSet2 = // [...] -// left outer join -dataSet1.leftOuterJoin(dataSet2) - .where(dataSet1.f0) - .equalTo(dataSet2.f0) - .with(new JoinFunction<>(){ - // implement user-defined left outer join logic - }); -// right outer join -dataSet1.rightOuterJoin(dataSet2) - .where(dataSet1.f0) - .equalTo(dataSet2.f0) - .with(new JoinFunction<>(){ - // implement user-defined right outer join logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} - DataStream<Tuple2<>> dataStream1 = // [...] - DataStream<Tuple2<>> dataStream2 = // [...] - // left outer join - dataStream1.coGroup(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply((leftIterable, rightInterable, collector) -> { - if(!rightInterable.iterator().hasNext()){ - // implement user-defined left outer join logic - } - }); - // right outer join - dataStream1.coGroup(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply((leftIterable, rightInterable, collector) -> { - if(!leftIterable.iterator().hasNext()){ - // implement user-defined right outer join logic - } - }); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### 第三类 - -对于第三类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本。 - -目前,DataStream API 不直接支持 non-keyed 流上的聚合(对 subtask 内的数据进行聚合)。为此,我们需要首先将 subtask ID 分配给记录,然后将流转换为 keyed 流。 -[附录]({{< ref "docs/dev/datastream/dataset_migration#addsubtaskidmapfunction" >}})中的 `AddSubtaskIdMapFunction` 显示了如何执行此操作,我们将在本文档的其余部分中重复使用它。 - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Operations</th> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td>MapPartition/SortPartition</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet = // [...] -// MapPartition -dataSet.mapPartition(new MapPartitionFunction<>(){ - // implement user-defined map partition logic - }); -// SortPartition -dataSet.sortPartition(0, Order.ASCENDING); -dataSet.sortPartition(0, Order.DESCENDING); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Integer> dataStream = // [...] -// assign subtask ID to all records -DataStream<Tuple2<String, Integer>> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction()); -dataStream1.keyBy(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply(new WindowFunction<>(){ - // implement user-defined map partition or sort partition logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Cross</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet1 = // [...] -DataSet<Integer> dataSet2 = // [...] -// Cross -dataSet1.cross(dataSet2) - .with(new CrossFunction<>(){ - // implement user-defined cross logic - }) - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// the parallelism of dataStream1 and dataStream2 should be same -DataStream<Integer> dataStream1 = // [...] -DataStream<Integer> dataStream2 = // [...] -DataStream<Tuple2<String, Integer>> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction()); -DataStream<Tuple2<String, Integer>> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction()); -// join the two streams according to the subtask ID -dataStream3.join(dataStream4) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply(new JoinFunction<>(){ - // implement user-defined cross logic - }) - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### 第四类 - -以下 DataSet API 的行为不被 DataStream 支持。 - -* RangePartition -* GroupCombine - - -## 附录 - -#### EndOfStreamWindows - -以下代码展示了 `EndOfStreamWindows` 示例实现。 - -```java -public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> { - private static final long serialVersionUID = 1L; - - private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows(); - - private static final TimeWindow TIME_WINDOW_INSTANCE = - new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); - - private EndOfStreamWindows() {} - - public static EndOfStreamWindows get() { - return INSTANCE; - } - - @Override - public Collection<TimeWindow> assignWindows( - Object element, long timestamp, WindowAssignerContext context) { - return Collections.singletonList(TIME_WINDOW_INSTANCE); - } - - @Override - public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { - return new EndOfStreamTrigger(); - } - - @Override - public String toString() { - return "EndOfStreamWindows()"; - } - - @Override - public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { - return new TimeWindow.Serializer(); - } - - @Override - public boolean isEventTime() { - return true; - } - - @Internal - public static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> { - @Override - public TriggerResult onElement( - Object element, long timestamp, TimeWindow window, TriggerContext ctx) - throws Exception { - return TriggerResult.CONTINUE; - } - - @Override - public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { - return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; - } - - @Override - public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} - - @Override - public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { - return TriggerResult.CONTINUE; - } - } -} -``` - -#### AddSubtaskIDMapFunction - -以下代码展示了 `AddSubtaskIDMapFunction` 示例实现。 - -```java -public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> { - @Override - public Tuple2<String, T> map(T value) { - return Tuple2.of(String.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), value); - } -} -``` - -{{< top >}} diff --git a/docs/content.zh/docs/dev/datastream/overview.md b/docs/content.zh/docs/dev/datastream/overview.md index 90409ce8caf..5e599ee3a79 100644 --- a/docs/content.zh/docs/dev/datastream/overview.md +++ b/docs/content.zh/docs/dev/datastream/overview.md @@ -61,12 +61,6 @@ Flink 程序看起来像一个转换 `DataStream` 的常规程序。每个程序 4. 指定计算结果的存储位置; 5. 触发程序执行。 -{{< hint warning >}} -All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API. - -See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a> -{{< /hint >}} - {{< tabs "fa68701c-59e8-4509-858e-3e8a123eeacf" >}} {{< tab "Java" >}} @@ -117,51 +111,6 @@ writeAsText(String path); print(); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -现在我们将对这些步骤逐一进行概述,更多细节请参考相关章节。请注意,Java DataStream API 的所有核心类都可以在 {{< gh_link file="/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala" name="org.apache.flink.streaming.api.scala" >}} 中找到。 - -`StreamExecutionEnvironment` 是所有 Flink 程序的基础。你可以使用 `StreamExecutionEnvironment` 的如下静态方法获取 `StreamExecutionEnvironment`: - -```scala -getExecutionEnvironment() - -createLocalEnvironment() - -createRemoteEnvironment(host: String, port: Int, jarFiles: String*) -``` - -通常,你只需要使用 `getExecutionEnvironment()` 即可,因为该方法会根据上下文做正确的处理:如果在 IDE 中执行你的程序或作为常规 Java 程序,它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。如果你基于程序创建了一个 JAR 文件,并通过[命令行]({{< ref "docs/deployment/cli" >}})调用它,Flink 集群管理器将执行程序的 main 方法,同时 `getExecutionEnvironment()` 方法会返回一个执行环境以在集群上执行你的程序。 - -为了指定 data sources,执行环境提供了一些方法,支持使用各种方法从文件中读取数据:你可以直接逐行读取数据,像读 CSV 文件一样,或使用任何第三方提供的 source。如果只是将一个文本文件作为一个行的序列来读,你可以使用: - -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() - -val text: DataStream[String] = env.readTextFile("file:///path/to/file") -``` - -这将为你生成一个 DataStream,然后你可以在上面应用转换来创建新的派生 DataStream。 - -你可以调用 DataStream 上具有转换功能的方法来应用转换。例如,一个 map 的转换如下所示: - -```scala -val input: DataSet[String] = ... - -val mapped = input.map { x => x.toInt } -``` - -这将通过把原始集合中的每一个字符串转换为一个整数来创建一个新的 DataStream。 - -一旦你有了包含最终结果的 DataStream,你就可以通过创建 sink 把它写到外部系统。下面是一些用于创建 sink 的示例方法: - -```scala -writeAsText(path: String) - -print() -``` - {{< /tab >}} {{< /tabs >}} @@ -231,31 +180,6 @@ public class WindowWordCount { } ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -import org.apache.flink.streaming.api.scala._ -import java.time.Duration - -object WindowWordCount { - def main(args: Array[String]) { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val text = env.socketTextStream("localhost", 9999) - - val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } - .map { (_, 1) } - .keyBy(_._1) - .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5))) - .sum(1) - - counts.print() - - env.execute("Window Stream WordCount") - } -} -``` {{< /tab >}} {{< /tabs >}} @@ -319,51 +243,6 @@ Source 是你的程序从中读取其输入的地方。你可以用 `StreamExecu - `addSource` - 关联一个新的 source function。例如,你可以使用 `addSource(new FlinkKafkaConsumer<>(...))` 来从 Apache Kafka 获取数据。更多详细信息见[连接器]({{< ref "docs/connectors/datastream/overview" >}})。 -{{< /tab >}} -{{< tab "Scala" >}} - -Source 是你的程序从中读取其输入的地方。你可以用 `StreamExecutionEnvironment.addSource(sourceFunction)` 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 `SourceFunction` 接口编写自定义的非并行 source,也可以通过实现 `ParallelSourceFunction` 接口或者继承 `RichParallelSourceFunction` 类编写自定义的并行 sources。 -通过 `StreamExecutionEnvironment` 可以访问多种预定义的 stream source: - -基于文件: - -- `readTextFile(path)` - 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。 - -- `readFile(fileInputFormat, path)` - 按照指定的文件输入格式读取(一次)文件。 - -- `readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)` - 这是前两个方法内部调用的方法。它基于给定的 `fileInputFormat` 读取路径 `path` 上的文件。根据提供的 `watchType` 的不同,source 可能定期(每 `interval` 毫秒)监控路径上的新数据(watchType 为 `FileProcessingMode.PROCESS_CONTINUOUSLY`),或者处理一次当前路径中的数据然后退出(watchType 为 `FileProcessingMode.PROCESS_ONCE`)。使用 `pathFilter`,用户可以进一步排除正在处理的文件。 - - *实现:* - - 在底层,Flink 将文件读取过程拆分为两个子任务,即 *目录监控* 和 *数据读取*。每个子任务都由一个单独的实体实现。监控由单个**非并行**(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 `watchType`),找到要处理的文件,将它们划分为 *分片*,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。 - - *重要提示:* - - 1. 如果 `watchType` 设置为 `FileProcessingMode.PROCESS_CONTINUOUSLY`,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的**所有**内容。 - - 2. 如果 `watchType` 设置为 `FileProcessingMode.PROCESS_ONCE`,source 扫描**一次**路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。 - -基于套接字: - -- `socketTextStream` - 从套接字读取。元素可以由分隔符分隔。 - -基于集合: - -- `fromCollection(Collection)` - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。 - -- `fromCollection(Iterator, Class)` - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。 - -- `fromElements(T ...)` - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。 - -- `fromParallelCollection(SplittableIterator, Class)` - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。 - -- `fromSequence(from, to)` - 基于给定间隔内的数字序列并行生成数据流。 - -自定义: - -- `addSource` - 关联一个新的 source function。例如,你可以使用 `addSource(new FlinkKafkaConsumer<>(...))` 来从 Apache Kafka 获取数据。更多详细信息见[连接器]({{< ref "docs/connectors/datastream/overview" >}})。 - - {{< /tab >}} {{< /tabs >}} @@ -401,24 +280,6 @@ Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系 - `addSink` - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。 -{{< /tab >}} -{{< tab "Scala" >}} - -Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里: - -- `writeAsText()` / `TextOutputFormat` - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。 - -- `writeAsCsv(...)` / `CsvOutputFormat` - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 *toString()* 方法。 - -- `print()` / `printToErr()` - 在标准输出/标准错误流上打印每个元素的 *toString()* 值。 - 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 *print* 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。 - -- `writeUsingOutputFormat()` / `FileOutputFormat` - 自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。 - -- `writeToSocket` - 根据 `SerializationSchema` 将元素写入套接字。 - -- `addSink` - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。 - {{< /tab >}} {{< /tabs >}} @@ -464,14 +325,6 @@ env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment -env.setBufferTimeout(timeoutMillis) - -env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) -``` -{{< /tab >}} {{< /tabs >}} 为了最大限度地提高吞吐量,设置 `setBufferTimeout(-1)` 来删除超时,这样缓冲区仅在它们已满时才会被刷新。要最小化延迟,请将超时设置为接近 0 的值(例如 5 或 10 毫秒)。应避免超时为 0 的缓冲区,因为它会导致严重的性能下降。 @@ -506,17 +359,6 @@ DataStream<String> lines = env.addSource(/* some source */); env.execute(); ``` {{< /tab >}} -{{< tab "Scala" >}} - -```scala -val env = StreamExecutionEnvironment.createLocalEnvironment() - -val lines = env.addSource(/* some source */) -// 构建你的程序 - -env.execute() -``` -{{< /tab >}} {{< /tabs >}} <a name="collection-data-sources"></a> @@ -544,22 +386,6 @@ Iterator<Long> longIt = ... DataStream<Long> myLongs = env.fromCollection(longIt, Long.class); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.createLocalEnvironment() - -// 从元素列表创建一个 DataStream -val myInts = env.fromElements(1, 2, 3, 4, 5) - -// 从任何 Java 集合创建一个 DataStream -val data: Seq[(String, Int)] = ... -val myTuples = env.fromCollection(data) - -// 从迭代器创建一个 DataStream -val longIt: Iterator[Long] = ... -val myLongs = env.fromCollection(longIt) -``` -{{< /tab >}} {{< /tabs >}} **注意:** 目前,集合 data source 要求数据类型和迭代器实现 `Serializable`。此外,集合 data sources 不能并行执行(parallelism = 1)。 @@ -578,13 +404,6 @@ DataStream<Tuple2<String, Integer>> myResult = ... Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync(); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -val myResult: DataStream[(String, Int)] = ... -val myOutput: Iterator[(String, Int)] = myResult.collectAsync() -``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content.zh/docs/dev/datastream/scala_api_extensions.md b/docs/content.zh/docs/dev/datastream/scala_api_extensions.md deleted file mode 100644 index be1274dbc2d..00000000000 --- a/docs/content.zh/docs/dev/datastream/scala_api_extensions.md +++ /dev/null @@ -1,226 +0,0 @@ ---- -title: "Scala API 扩展" -weight: 201 -type: docs -aliases: - - /zh/dev/scala_api_extensions.html - - /zh/apis/scala_api_extensions.html ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -{{< hint warning >}} -All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API. - -See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a> -{{< /hint >}} - -# Scala API 扩展 - -为了在 Scala 和 Java API 之间保持大致相同的使用体验,在批处理和流处理的标准 API 中省略了一些允许 Scala 高级表达的特性。 - -_如果你想拥有完整的 Scala 体验,可以选择通过隐式转换增强 Scala API 的扩展。_ - -要使用所有可用的扩展,你只需为 DataStream API 添加一个简单的引入 - -{{< highlight scala >}} -import org.apache.flink.streaming.api.scala.extensions._ -{{< /highlight >}} - -或者,您可以引入单个扩展 _a-là-carte_ 来使用您喜欢的扩展。 - -## Accept partial functions - -通常,DataStream API 不接受匿名模式匹配函数来解构元组、case 类或集合,如下所示: - -{{< highlight scala >}} -val data: DataStream[(Int, String, Double)] = // [...] -data.map { - case (id, name, temperature) => // [...] - // The previous line causes the following compilation error: - // "The argument types of an anonymous function must be fully known. (SLS 8.5)" -} -{{< /highlight >}} - -这个扩展在 DataStream Scala API 中引入了新的方法,这些方法在扩展 API 中具有一对一的对应关系。这些委托方法支持匿名模式匹配函数。 - -#### DataStream API - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Method</th> - <th class="text-left" style="width: 20%">Original</th> - <th class="text-center">Example</th> - </tr> - </thead> - - <tbody> - <tr> - <td><strong>mapWith</strong></td> - <td><strong>map (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.mapWith { - case (_, value) => value.toString -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>flatMapWith</strong></td> - <td><strong>flatMap (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.flatMapWith { - case (_, name, visits) => visits.map(name -> _) -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>filterWith</strong></td> - <td><strong>filter (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.filterWith { - case Train(_, isOnTime) => isOnTime -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>keyingBy</strong></td> - <td><strong>keyBy (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.keyingBy { - case (id, _, _) => id -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>mapWith</strong></td> - <td><strong>map (ConnectedDataStream)</strong></td> - <td> -{{< highlight scala >}} -data.mapWith( - map1 = case (_, value) => value.toString, - map2 = case (_, _, value, _) => value + 1 -) -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>flatMapWith</strong></td> - <td><strong>flatMap (ConnectedDataStream)</strong></td> - <td> -{{< highlight scala >}} -data.flatMapWith( - flatMap1 = case (_, json) => parse(json), - flatMap2 = case (_, _, json, _) => parse(json) -) -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>keyingBy</strong></td> - <td><strong>keyBy (ConnectedDataStream)</strong></td> - <td> -{{< highlight scala >}} -data.keyingBy( - key1 = case (_, timestamp) => timestamp, - key2 = case (id, _, _) => id -) -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>reduceWith</strong></td> - <td><strong>reduce (KeyedStream, WindowedStream)</strong></td> - <td> -{{< highlight scala >}} -data.reduceWith { - case ((_, sum1), (_, sum2) => sum1 + sum2 -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>projecting</strong></td> - <td><strong>apply (JoinedStream)</strong></td> - <td> -{{< highlight scala >}} -data1.join(data2). - whereClause(case (pk, _) => pk). - isEqualTo(case (_, fk) => fk). - projecting { - case ((pk, tx), (products, fk)) => tx -> products - } -{{< /highlight >}} - </td> - </tr> - </tbody> -</table> - - - -有关每个方法语义的更多信息, 请参考 [DataStream]({{< ref "docs/dev/datastream/overview" >}}) API 文档。 - -要单独使用此扩展,你可以添加以下引入: - -{{< highlight scala >}} -import org.apache.flink.api.scala.extensions.acceptPartialFunctions -{{< /highlight >}} - -用于 DataSet 扩展 - -{{< highlight scala >}} -import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions -{{< /highlight >}} - -下面的代码片段展示了如何一起使用这些扩展方法 (以及 DataSet API) 的最小示例: - -{{< highlight scala >}} -object Main { - import org.apache.flink.streaming.api.scala.extensions._ - - case class Point(x: Double, y: Double) - - def main(args: Array[String]): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6)) - - ds.filterWith { - case Point(x, _) => x > 1 - }.reduceWith { - case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2) - }.mapWith { - case Point(x, y) => (x, y) - }.flatMapWith { - case (x, y) => Seq("x" -> x, "y" -> y) - }.keyingBy { - case (id, value) => id - } - } -} -{{< /highlight >}} - -{{< top >}} diff --git a/docs/content/docs/concepts/overview.md b/docs/content/docs/concepts/overview.md index 50c7e4e4971..821b82128c2 100644 --- a/docs/content/docs/concepts/overview.md +++ b/docs/content/docs/concepts/overview.md @@ -39,7 +39,7 @@ This _Concepts in Depth_ section provides a deeper understanding of how Flink's Flink offers different levels of abstraction for developing streaming/batch applications. -{{< img src="/fig/levels_of_abstraction.svg" alt="Programming levels of abstraction" width="70%" >}} +{{< img src="/fig/levels_of_abstraction.png" alt="Programming levels of abstraction" width="70%" >}} - The lowest level abstraction simply offers **stateful and timely stream processing**. It is embedded into the [DataStream API]({{< ref "docs/dev/datastream/overview" >}}) via the [Process @@ -58,9 +58,7 @@ Flink offers different levels of abstraction for developing streaming/batch appl respective programming languages. The low level *Process Function* integrates with the *DataStream API*, - making it possible to use the lower-level abstraction on an as-needed basis. - The *DataSet API* offers additional primitives on bounded data sets, - like loops/iterations. + making it possible to use the lower-level abstraction on an as-needed basis. - The **Table API** is a declarative DSL centered around *tables*, which may be dynamically changing tables (when representing streams). The [Table @@ -75,9 +73,8 @@ Flink offers different levels of abstraction for developing streaming/batch appl use (less code to write). In addition, Table API programs also go through an optimizer that applies optimization rules before execution. - One can seamlessly convert between tables and *DataStream*/*DataSet*, - allowing programs to mix the *Table API* with the *DataStream* and - *DataSet* APIs. + One can seamlessly convert between tables and *DataStream*, + allowing programs to mix the *Table API* with the *DataStream API*. - The highest level abstraction offered by Flink is **SQL**. This abstraction is similar to the *Table API* both in semantics and expressiveness, but diff --git a/docs/content/docs/deployment/elastic_scaling.md b/docs/content/docs/deployment/elastic_scaling.md index eeca7726d70..dbdb4b57b85 100644 --- a/docs/content/docs/deployment/elastic_scaling.md +++ b/docs/content/docs/deployment/elastic_scaling.md @@ -226,10 +226,6 @@ In addition, there are several related configuration options that may need adjus The Adaptive Batch Scheduler only decides the parallelism for operators which do not have a parallelism set. So if you want the parallelism of an operator to be automatically decided, you need to avoid setting the parallelism for the operator through the 'setParallelism()' method. - In addition, the following configurations are required for DataSet jobs: - - Set `parallelism.default: -1`. - - Don't call `setParallelism()` on `ExecutionEnvironment`. - #### Enable dynamic parallelism inference support for Sources New {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java" name="Source" >}} can implement the interface {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java" name="DynamicParallelismInference" >}} to enable dynamic parallelism inference. @@ -254,7 +250,7 @@ Note that the dynamic source parallelism inference only decides the parallelism ### Limitations - **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted. -- **BLOCKING or HYBRID jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE`. Note that for DataSet jobs which do not recognize the aforementioned shuffle mode, the ExecutionMode needs to be BATCH_FORCED to force BLOCKING shuffle. +- **BLOCKING or HYBRID jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE`. - **FileInputFormat sources are not supported**: FileInputFormat sources are not supported, including `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` and `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should use the new sources([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files when using th [...] - **Inconsistent broadcast results metrics on WebUI**: When use Adaptive Batch Scheduler to automatically decide parallelisms for operators, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details. diff --git a/docs/content/docs/deployment/speculative_execution.md b/docs/content/docs/deployment/speculative_execution.md index e3fbfce22dd..883ee477b11 100644 --- a/docs/content/docs/deployment/speculative_execution.md +++ b/docs/content/docs/deployment/speculative_execution.md @@ -46,11 +46,6 @@ will create new attempts for the slow tasks and deploy them on nodes that are no This section describes how to use speculative execution, including how to enable it, how to tune it, and how to develop/improve custom sources to work with speculative execution. -{{< hint warning >}} -Note: Flink does not support speculative execution of DataSet jobs because DataSet will be deprecated -in near future. DataStream API is now the recommended low level API to develop Flink batch jobs. -{{< /hint >}} - ### Enable Speculative Execution You can enable speculative execution through the following configuration items: - `execution.batch.speculative.enabled: true` diff --git a/docs/content/docs/dev/configuration/overview.md b/docs/content/docs/dev/configuration/overview.md index bedd5d565f4..88b1b49c83c 100644 --- a/docs/content/docs/dev/configuration/overview.md +++ b/docs/content/docs/dev/configuration/overview.md @@ -213,8 +213,7 @@ They can be used separately, or they can be mixed, depending on your use cases: | APIs you want to use | Dependency you need to add | |-----------------------------------------------------------------------------------|-----------------------------------------------------| -| [DataStream]({{< ref "docs/dev/datastream/overview" >}}) | `flink-streaming-java` | -| [DataStream with Scala]({{< ref "docs/dev/datastream/scala_api_extensions" >}}) | `flink-streaming-scala{{< scala_version >}}` | +| [DataStream]({{< ref "docs/dev/datastream/overview" >}}) | `flink-streaming-java` | | [Table API]({{< ref "docs/dev/table/common" >}}) | `flink-table-api-java` | | [Table API with Scala]({{< ref "docs/dev/table/common" >}}) | `flink-table-api-scala{{< scala_version >}}` | | [Table API + DataStream]({{< ref "docs/dev/table/data_stream_api" >}}) | `flink-table-api-java-bridge` | diff --git a/docs/content/docs/dev/datastream/dataset_migration.md b/docs/content/docs/dev/datastream/dataset_migration.md deleted file mode 100644 index 98e653d8189..00000000000 --- a/docs/content/docs/dev/datastream/dataset_migration.md +++ /dev/null @@ -1,774 +0,0 @@ ---- -title: "How to Migrate from DataSet to DataStream" -weight: 302 -type: docs ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -# How to Migrate from DataSet to DataStream - -The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the -Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their -data processing requirements. - -Noticed that APIs in DataStream do not always match those in DataSet exactly. The purpose of this document is to help users understand -how to achieve the same data processing behaviors with DataStream APIs as using DataSet APIs. - -According to the changes in coding and execution efficiency that are required for migration, we categorized DataSet APIs into 4 categories: - -- Category 1: APIs that have exact equivalent in DataStream, which requires barely any changes to migrate. - -- Category 2: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, which might require some code changes for -migration but will result in the same execution efficiency. - -- Category 3: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency. - -- Category 4: APIs whose behaviors are not supported by DataStream API. - -The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate -each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each -category. - - -## Setting the execution environment - -The first step of migrating an application from DataSet API to DataStream API is to replace `ExecutionEnvironment` with `StreamExecutionEnvironment`. - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td> - {{< highlight "java" >}} -// Create the execution environment -ExecutionEnvironment.getExecutionEnvironment(); -// Create the local execution environment -ExecutionEnvironment.createLocalEnvironment(); -// Create the collection environment -new CollectionEnvironment(); -// Create the remote environment -ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// Create the execution environment -StreamExecutionEnvironment.getExecutionEnvironment(); -// Create the local execution environment -StreamExecutionEnvironment.createLocalEnvironment(); -// The collection environment is not supported. -// Create the remote environment -StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -Unlike DataSet, DataStream supports processing on both bounded and unbounded data streams. Thus, user needs to explicitly set the execution mode -to `RuntimeExecutionMode.BATCH` if that is expected. - -```java -StreamExecutionEnvironment executionEnvironment = // [...]; -executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); -``` - -## Using the streaming sources and sinks - -### Sources - -The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td> - {{< highlight "java" >}} -// Read data from file -DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); -// Read data from collection -DataSource<> source = ExecutionEnvironment.fromCollection(data); -// Read data from inputformat -DataSource<> source = ExecutionEnvironment.createInput(inputFormat) - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// Read data from file -DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); -// Read data from collection -DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); -// Read data from inputformat -DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### Sinks - -The DataStream API uses `DataStreamSink` to write records to external system, while the -DataSet API uses the `DataSink`. - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td> - {{< highlight "java" >}} -// Write to outputformat -DataSink<> sink = dataSet.output(outputFormat); -// Write to csv file -DataSink<> sink = dataSet.writeAsCsv(filePath); -// Write to text file -DataSink<> sink = dataSet.writeAsText(filePath); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// Write to sink -DataStreamSink<> sink = dataStream.sinkTo(sink) -// Write to csv file -DataStreamSink<> sink = dataStream.writeAsCsv(path); -// Write to text file -DataStreamSink<> sink = dataStream.writeAsText(path); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -If you are looking for pre-defined source and sink connectors of DataStream, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}) - -## Migrating DataSet APIs - -### Category 1 - -For Category 1, these DataSet APIs have exact equivalent in DataStream, which requires barely any changes to migrate. - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Operations</th> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td>Map</td> - <td> - {{< highlight "java" >}} -dataSet.map(new MapFunction<>(){ -// implement user-defined map logic -}); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.map(new MapFunction<>(){ -// implement user-defined map logic -}); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>FlatMap</td> - <td> - {{< highlight "java" >}} -dataSet.flatMap(new FlatMapFunction<>(){ -// implement user-defined flatmap logic -}); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.flatMap(new FlatMapFunction<>(){ -// implement user-defined flatmap logic -}); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Filter</td> - <td> - {{< highlight "java" >}} -dataSet.filter(new FilterFunction<>(){ -// implement user-defined filter logic -}); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.filter(new FilterFunction<>(){ -// implement user-defined filter logic -}); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Union</td> - <td> - {{< highlight "java" >}} -dataSet1.union(dataSet2); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream1.union(dataStream2); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Rebalance</td> - <td> - {{< highlight "java" >}} -dataSet.rebalance(); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.rebalance(); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Project</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple3<>> dataSet = // [...] -dataSet.project(2,0); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple3<>> dataStream = // [...] -dataStream.project(2,0); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Reduce on Grouped DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -dataSet.groupBy(value -> value.f0) - .reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -dataStream.keyBy(value -> value.f0) - .reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Aggregate on Grouped DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -// compute sum of the second field -dataSet.groupBy(value -> value.f0) - .aggregate(SUM, 1); -// compute min of the second field -dataSet.groupBy(value -> value.f0) - .aggregate(MIN, 1); -// compute max of the second field -dataSet.groupBy(value -> value.f0) - .aggregate(MAX, 1); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -// compute sum of the second field -dataStream.keyBy(value -> value.f0) - .sum(1); -// compute min of the second field -dataStream.keyBy(value -> value.f0) - .min(1); -// compute max of the second field -dataStream.keyBy(value -> value.f0) - .max(1); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### Category 2 - -For category 2, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, which might require some code changes for -migration but will result in the same execution efficiency. - -Operations on a full DataSet correspond to the global window aggregation in DataStream with a custom window that is triggered at the end of the inputs. The `EndOfStreamWindows` -in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#endofstreamwindows" >}}) shows how such a window can be implemented. We will reuse it in the rest of this document. - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Operations</th> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td>Distinct</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet = // [...] -dataSet.distinct(); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Integer> dataStream = // [...] -dataStream.keyBy(value -> value) - .reduce((value1, value2) -> value1); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Hash-Partition</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -dataSet.partitionByHash(value -> value.f0); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -// partition by the hashcode of key -dataStream.partitionCustom( - (key, numSubpartition) -> key.hashCode() % numSubpartition, - value -> value.f0); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Reduce on Full DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<String> dataSet = // [...] -dataSet.reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<String> dataStream = // [...] -dataStream.windowAll(EndOfStreamWindows.get()) - .reduce(new ReduceFunction<>(){ - // implement user-defined reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Aggregate on Full DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -// compute sum of the second field -dataSet.aggregate(SUM, 1); -// compute min of the second field -dataSet.aggregate(MIN, 1); -// compute max of the second field -dataSet.aggregate(MAX, 1); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -// compute sum of the second field -dataStream.windowAll(EndOfStreamWindows.get()) - .sum(1); -// compute min of the second field -dataStream.windowAll(EndOfStreamWindows.get()) - .min(1); -// compute max of the second field -dataStream.windowAll(EndOfStreamWindows.get()) - .max(1); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>GroupReduce on Full DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet = // [...] -dataSet.reduceGroup(new GroupReduceFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Integer> dataStream = // [...] -dataStream.windowAll(EndOfStreamWindows.get()) - .apply(new WindowFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>GroupReduce on Grouped DataSet</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet = // [...] -dataSet.groupBy(value -> value.f0) - .reduceGroup(new GroupReduceFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream = // [...] -dataStream.keyBy(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply(new WindowFunction<>(){ - // implement user-defined group reduce logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>First-n</td> - <td> - {{< highlight "java" >}} -dataSet.first(n) - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -dataStream.windowAll(EndOfStreamWindows.get()) - .apply(new AllWindowFunction<>(){ - // implement first-n logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Join</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet1 = // [...] -DataSet<Tuple2<>> dataSet2 = // [...] -dataSet1.join(dataSet2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .with(new JoinFunction<>(){ - // implement user-defined join logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream1 = // [...] -DataStream<Tuple2<>> dataStream2 = // [...] -dataStream1.join(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get())) - .apply(new JoinFunction<>(){ - // implement user-defined join logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>CoGroup</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet1 = // [...] -DataSet<Tuple2<>> dataSet2 = // [...] -dataSet1.coGroup(dataSet2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .with(new CoGroupFunction<>(){ - // implement user-defined co group logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Tuple2<>> dataStream1 = // [...] -DataStream<Tuple2<>> dataStream2 = // [...] -dataStream1.coGroup(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get())) - .apply(new CoGroupFunction<>(){ - // implement user-defined co group logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>OuterJoin</td> - <td> - {{< highlight "java" >}} -DataSet<Tuple2<>> dataSet1 = // [...] -DataSet<Tuple2<>> dataSet2 = // [...] -// left outer join -dataSet1.leftOuterJoin(dataSet2) - .where(dataSet1.f0) - .equalTo(dataSet2.f0) - .with(new JoinFunction<>(){ - // implement user-defined left outer join logic - }); -// right outer join -dataSet1.rightOuterJoin(dataSet2) - .where(dataSet1.f0) - .equalTo(dataSet2.f0) - .with(new JoinFunction<>(){ - // implement user-defined right outer join logic - }); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} - DataStream<Tuple2<>> dataStream1 = // [...] - DataStream<Tuple2<>> dataStream2 = // [...] - // left outer join - dataStream1.coGroup(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply((leftIterable, rightInterable, collector) -> { - if(!rightInterable.iterator().hasNext()){ - // implement user-defined left outer join logic - } - }); - // right outer join - dataStream1.coGroup(dataStream2) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply((leftIterable, rightInterable, collector) -> { - if(!leftIterable.iterator().hasNext()){ - // implement user-defined right outer join logic - } - }); - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### Category 3 - -For category 3, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency. - -Currently, DataStream API does not directly support aggregations on non-keyed streams (subtask-scope aggregations). In order to do so, we need to first assign the subtask id -to the records, then turn the stream into a keyed stream. The `AddSubtaskIdMapFunction` in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#addsubtaskidmapfunction" >}}) shows how -to do that, and we will reuse it in the rest of this document. - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">Operations</th> - <th class="text-left">DataSet</th> - <th class="text-left">DataStream</th> - </tr> - </thead> - <tbody> - <tr> - <td>MapPartition/SortPartition</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet = // [...] -// MapPartition -dataSet.mapPartition(new MapPartitionFunction<>(){ - // implement user-defined map partition logic - }); -// SortPartition -dataSet.sortPartition(0, Order.ASCENDING); -dataSet.sortPartition(0, Order.DESCENDING); - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -DataStream<Integer> dataStream = // [...] -// assign subtask ID to all records -DataStream<Tuple2<String, Integer>> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction()); -dataStream1.keyBy(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply(new WindowFunction<>(){ - // implement user-defined map partition or sort partition logic - }); - {{< /highlight >}} - </td> - </tr> - <tr> - <td>Cross</td> - <td> - {{< highlight "java" >}} -DataSet<Integer> dataSet1 = // [...] -DataSet<Integer> dataSet2 = // [...] -// Cross -dataSet1.cross(dataSet2) - .with(new CrossFunction<>(){ - // implement user-defined cross logic - }) - {{< /highlight >}} - </td> - <td> - {{< highlight "java" >}} -// the parallelism of dataStream1 and dataStream2 should be same -DataStream<Integer> dataStream1 = // [...] -DataStream<Integer> dataStream2 = // [...] -DataStream<Tuple2<String, Integer>> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction()); -DataStream<Tuple2<String, Integer>> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction()); -// join the two streams according to the subtask ID -dataStream3.join(dataStream4) - .where(value -> value.f0) - .equalTo(value -> value.f0) - .window(EndOfStreamWindows.get()) - .apply(new JoinFunction<>(){ - // implement user-defined cross logic - }) - {{< /highlight >}} - </td> - </tr> - </tbody> -</table> - -### Category 4 - -The behaviors of the following DataSet APIs are not supported by DataStream. - -* RangePartition -* GroupCombine - - -## Appendix - -#### EndOfStreamWindows - -The following code shows the example of `EndOfStreamWindows`. - -```java -public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> { - private static final long serialVersionUID = 1L; - - private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows(); - - private static final TimeWindow TIME_WINDOW_INSTANCE = - new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); - - private EndOfStreamWindows() {} - - public static EndOfStreamWindows get() { - return INSTANCE; - } - - @Override - public Collection<TimeWindow> assignWindows( - Object element, long timestamp, WindowAssignerContext context) { - return Collections.singletonList(TIME_WINDOW_INSTANCE); - } - - @Override - public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { - return new EndOfStreamTrigger(); - } - - @Override - public String toString() { - return "EndOfStreamWindows()"; - } - - @Override - public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { - return new TimeWindow.Serializer(); - } - - @Override - public boolean isEventTime() { - return true; - } - - @Internal - public static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> { - @Override - public TriggerResult onElement( - Object element, long timestamp, TimeWindow window, TriggerContext ctx) - throws Exception { - return TriggerResult.CONTINUE; - } - - @Override - public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { - return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; - } - - @Override - public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} - - @Override - public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { - return TriggerResult.CONTINUE; - } - } -} -``` - -#### AddSubtaskIDMapFunction - -The following code shows the example of `AddSubtaskIDMapFunction`. -```java -public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> { - @Override - public Tuple2<String, T> map(T value) { - return Tuple2.of(String.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), value); - } -} -``` - -{{< top >}} diff --git a/docs/content/docs/dev/datastream/execution/parallel.md b/docs/content/docs/dev/datastream/execution/parallel.md index 026f73e5519..5df067001cb 100644 --- a/docs/content/docs/dev/datastream/execution/parallel.md +++ b/docs/content/docs/dev/datastream/execution/parallel.md @@ -64,21 +64,6 @@ wordCounts.print(); env.execute("Word Count Example"); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment - -val text = [...] -val wordCounts = text - .flatMap{ _.split(" ") map { (_, 1) } } - .keyBy(_._1) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) - .sum(1).setParallelism(5) -wordCounts.print() - -env.execute("Word Count Example") -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() @@ -124,22 +109,6 @@ wordCounts.print(); env.execute("Word Count Example"); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment -env.setParallelism(3) - -val text = [...] -val wordCounts = text - .flatMap{ _.split(" ") map { (_, 1) } } - .keyBy(_._1) - .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))) - .sum(1) -wordCounts.print() - -env.execute("Word Count Example") -``` -{{< /tab >}} {{< tab "Python" >}} ```python env = StreamExecutionEnvironment.get_execution_environment() @@ -192,23 +161,6 @@ try { e.printStackTrace(); } -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -try { - PackagedProgram program = new PackagedProgram(file, args) - InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123") - Configuration config = new Configuration() - - Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader()) - - // set the parallelism to 10 here - client.run(program, 10, true) - -} catch { - case e: Exception => e.printStackTrace -} ``` {{< /tab >}} {{< tab "Python" >}} diff --git a/docs/content/docs/dev/datastream/overview.md b/docs/content/docs/dev/datastream/overview.md index 9481ccb49c8..ff6d43f3248 100644 --- a/docs/content/docs/dev/datastream/overview.md +++ b/docs/content/docs/dev/datastream/overview.md @@ -72,12 +72,6 @@ program consists of the same basic parts: 4. Specify where to put the results of your computations, 5. Trigger the program execution -{{< hint warning >}} -All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API. - -See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a> -{{< /hint >}} - {{< tabs "fa68701c-59e8-4509-858e-3e8a123eeacf" >}} {{< tab "Java" >}} @@ -147,70 +141,6 @@ writeAsText(String path); print(); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -We will now give an overview of each of those steps, please refer to the -respective sections for more details. Note that all core classes of the Scala -DataStream API can be found in {{< gh_link -file="/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala" -name="org.apache.flink.streaming.api.scala" >}}. - -The `StreamExecutionEnvironment` is the basis for all Flink programs. You can -obtain one using these static methods on `StreamExecutionEnvironment`: - -```scala -getExecutionEnvironment() - -createLocalEnvironment() - -createRemoteEnvironment(host: String, port: Int, jarFiles: String*) -``` - -Typically, you only need to use `getExecutionEnvironment()`, since this will do -the right thing depending on the context: if you are executing your program -inside an IDE or as a regular Java program it will create a local environment -that will execute your program on your local machine. If you created a JAR file -from your program, and invoke it through the [command line]({{< ref "docs/deployment/cli" >}}), the Flink cluster manager will execute your main method and -`getExecutionEnvironment()` will return an execution environment for executing -your program on a cluster. - -For specifying data sources the execution environment has several methods to -read from files using various methods: you can just read them line by line, as -CSV files, or using any of the other provided sources. To just read a text file -as a sequence of lines, you can use: - -```scala -val env = StreamExecutionEnvironment.getExecutionEnvironment() - -val text: DataStream[String] = env.readTextFile("file:///path/to/file") -``` - -This will give you a DataStream on which you can then apply transformations to -create new derived DataStreams. - -You apply transformations by calling methods on DataStream with a -transformation functions. For example, a map transformation looks like this: - -```scala -val input: DataSet[String] = ... - -val mapped = input.map { x => x.toInt } -``` - -This will create a new DataStream by converting every String in the original -collection to an Integer. - -Once you have a DataStream containing your final results, you can write it to -an outside system by creating a sink. These are just some example methods for -creating a sink: - -```scala -writeAsText(path: String) - -print() -``` - {{< /tab >}} {{< /tabs >}} @@ -296,31 +226,6 @@ public class WindowWordCount { } ``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala - -import org.apache.flink.streaming.api.scala._ -import java.time.Duration - -object WindowWordCount { - def main(args: Array[String]) { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val text = env.socketTextStream("localhost", 9999) - - val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } - .map { (_, 1) } - .keyBy(_._1) - .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5))) - .sum(1) - - counts.print() - - env.execute("Window Stream WordCount") - } -} -``` {{< /tab >}} {{< /tabs >}} @@ -391,61 +296,6 @@ Collection-based: Custom: -- `addSource` - Attach a new source function. For example, to read from Apache Kafka you can use - `addSource(new FlinkKafkaConsumer<>(...))`. See [connectors]({{< ref "docs/connectors/datastream/overview" >}}) for more details. - -{{< /tab >}} -{{< tab "Scala" >}} - -Sources are where your program reads its input from. You can attach a source to your program by -using `StreamExecutionEnvironment.addSource(sourceFunction)`. Flink comes with a number of pre-implemented -source functions, but you can always write your own custom sources by implementing the `SourceFunction` -for non-parallel sources, or by implementing the `ParallelSourceFunction` interface or extending the -`RichParallelSourceFunction` for parallel sources. - -There are several predefined stream sources accessible from the `StreamExecutionEnvironment`: - -File-based: - -- `readTextFile(path)` - Reads text files, i.e. files that respect the `TextInputFormat` specification, line-by-line and returns them as Strings. - -- `readFile(fileInputFormat, path)` - Reads (once) files as dictated by the specified file input format. - -- `readFile(fileInputFormat, path, watchType, interval, pathFilter)` - This is the method called internally by the two previous ones. It reads files in the `path` based on the given `fileInputFormat`. Depending on the provided `watchType`, this source may periodically monitor (every `interval` ms) the path for new data (`FileProcessingMode.PROCESS_CONTINUOUSLY`), or process once the data currently in the path and exit (`FileProcessingMode.PROCESS_ONCE`). Using the `pathFilter`, the user [...] - - *IMPLEMENTATION:* - - Under the hood, Flink splits the file reading process into two sub-tasks, namely *directory monitoring* and *data reading*. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, **non-parallel** (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once dep [...] - - *IMPORTANT NOTES:* - - 1. If the `watchType` is set to `FileProcessingMode.PROCESS_CONTINUOUSLY`, when a file is modified, its contents are re-processed entirely. This can break the "exactly-once" semantics, as appending data at the end of a file will lead to **all** its contents being re-processed. - - 2. If the `watchType` is set to `FileProcessingMode.PROCESS_ONCE`, the source scans the path **once** and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint. - -Socket-based: - -- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter. - -Collection-based: - -- `fromCollection(Seq)` - Creates a data stream from the Java Java.util.Collection. All elements - in the collection must be of the same type. - -- `fromCollection(Iterator)` - Creates a data stream from an iterator. The class specifies the - data type of the elements returned by the iterator. - -- `fromElements(elements: _*)` - Creates a data stream from the given sequence of objects. All objects must be - of the same type. - -- `fromParallelCollection(SplittableIterator)` - Creates a data stream from an iterator, in - parallel. The class specifies the data type of the elements returned by the iterator. - -- `fromSequence(from, to)` - Generates the sequence of numbers in the given interval, in - parallel. - -Custom: - - `addSource` - Attach a new source function. For example, to read from Apache Kafka you can use `addSource(new FlinkKafkaConsumer<>(...))`. See [connectors]({{< ref "docs/connectors/datastream/overview" >}}) for more details. @@ -487,32 +337,6 @@ greater than 1, the output will also be prepended with the identifier of the tas - `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` -- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as - Apache Kafka) that are implemented as sink functions. - -{{< /tab >}} -{{< tab "Scala" >}} - -Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. -Flink comes with a variety of built-in output formats that are encapsulated behind operations on the -DataStreams: - -- `writeAsText()` / `TextOutputFormat` - Writes elements line-wise as Strings. The Strings are - obtained by calling the *toString()* method of each element. - -- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field - delimiters are configurable. The value for each field comes from the *toString()* method of the objects. - -- `print()` / `printToErr()` - Prints the *toString()* value -of each element on the standard out / standard error stream. Optionally, a prefix (msg) can be provided which is -prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is -greater than 1, the output will also be prepended with the identifier of the task which produced the output. - -- `writeUsingOutputFormat()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports - custom object-to-bytes conversion. - -- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` - - `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions. @@ -568,14 +392,6 @@ env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment -env.setBufferTimeout(timeoutMillis) - -env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis) -``` -{{< /tab >}} {{< /tabs >}} To maximize throughput, set `setBufferTimeout(-1)` which will remove the timeout and buffers will only be @@ -614,17 +430,6 @@ DataStream<String> lines = env.addSource(/* some source */); env.execute(); ``` {{< /tab >}} -{{< tab "Scala" >}} - -```scala -val env = StreamExecutionEnvironment.createLocalEnvironment() - -val lines = env.addSource(/* some source */) -// build your program - -env.execute() -``` -{{< /tab >}} {{< /tabs >}} ### Collection Data Sources @@ -652,22 +457,6 @@ Iterator<Long> longIt = ...; DataStream<Long> myLongs = env.fromCollection(longIt, Long.class); ``` {{< /tab >}} -{{< tab "Scala" >}} -```scala -val env = StreamExecutionEnvironment.createLocalEnvironment() - -// Create a DataStream from a list of elements -val myInts = env.fromElements(1, 2, 3, 4, 5) - -// Create a DataStream from any Collection -val data: Seq[(String, Int)] = ... -val myTuples = env.fromCollection(data) - -// Create a DataStream from an Iterator -val longIt: Iterator[Long] = ... -val myLongs = env.fromCollection(longIt) -``` -{{< /tab >}} {{< /tabs >}} **Note:** Currently, the collection data source requires that data types and iterators implement @@ -685,13 +474,6 @@ DataStream<Tuple2<String, Integer>> myResult = ...; Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync(); ``` -{{< /tab >}} -{{< tab "Scala" >}} - -```scala -val myResult: DataStream[(String, Int)] = ... -val myOutput: Iterator[(String, Int)] = myResult.collectAsync() -``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content/docs/dev/datastream/scala_api_extensions.md b/docs/content/docs/dev/datastream/scala_api_extensions.md deleted file mode 100644 index d69468afb2b..00000000000 --- a/docs/content/docs/dev/datastream/scala_api_extensions.md +++ /dev/null @@ -1,235 +0,0 @@ ---- -title: "Scala API Extensions" -weight: 201 -type: docs -aliases: - - /dev/scala_api_extensions.html - - /apis/scala_api_extensions.html ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -{{< hint warning >}} -All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API. - -See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a> -{{< /hint >}} - -# Scala API Extensions - -In order to keep a fair amount of consistency between the Scala and Java APIs, some -of the features that allow a high-level of expressiveness in Scala have been left -out from the standard APIs for both batch and streaming. - -If you want to _enjoy the full Scala experience_ you can choose to opt-in to -extensions that enhance the Scala API via implicit conversions. - -To use all the available extensions, you can just add a simple `import` for the DataStream API - -{{< highlight scala >}} -import org.apache.flink.streaming.api.scala.extensions._ -{{< /highlight >}} - -Alternatively, you can import individual extensions _a-là-carte_ to only use those -you prefer. - -## Accept partial functions - -Normally, the DataStream API does not accept anonymous pattern -matching functions to deconstruct tuples, case classes or collections, like the -following: - -{{< highlight scala >}} -val data: DataStream[(Int, String, Double)] = // [...] -data.map { - case (id, name, temperature) => // [...] - // The previous line causes the following compilation error: - // "The argument types of an anonymous function must be fully known. (SLS 8.5)" -} -{{< /highlight >}} - -This extension introduces new methods in the DataStream Scala API -that have a one-to-one correspondence in the extended API. These delegating methods -do support anonymous pattern matching functions. - -#### DataStream API - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Method</th> - <th class="text-left" style="width: 20%">Original</th> - <th class="text-center">Example</th> - </tr> - </thead> - - <tbody> - <tr> - <td><strong>mapWith</strong></td> - <td><strong>map (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.mapWith { - case (_, value) => value.toString -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>flatMapWith</strong></td> - <td><strong>flatMap (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.flatMapWith { - case (_, name, visits) => visits.map(name -> _) -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>filterWith</strong></td> - <td><strong>filter (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.filterWith { - case Train(_, isOnTime) => isOnTime -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>keyingBy</strong></td> - <td><strong>keyBy (DataStream)</strong></td> - <td> -{{< highlight scala >}} -data.keyingBy { - case (id, _, _) => id -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>mapWith</strong></td> - <td><strong>map (ConnectedDataStream)</strong></td> - <td> -{{< highlight scala >}} -data.mapWith( - map1 = case (_, value) => value.toString, - map2 = case (_, _, value, _) => value + 1 -) -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>flatMapWith</strong></td> - <td><strong>flatMap (ConnectedDataStream)</strong></td> - <td> -{{< highlight scala >}} -data.flatMapWith( - flatMap1 = case (_, json) => parse(json), - flatMap2 = case (_, _, json, _) => parse(json) -) -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>keyingBy</strong></td> - <td><strong>keyBy (ConnectedDataStream)</strong></td> - <td> -{{< highlight scala >}} -data.keyingBy( - key1 = case (_, timestamp) => timestamp, - key2 = case (id, _, _) => id -) -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>reduceWith</strong></td> - <td><strong>reduce (KeyedStream, WindowedStream)</strong></td> - <td> -{{< highlight scala >}} -data.reduceWith { - case ((_, sum1), (_, sum2) => sum1 + sum2 -} -{{< /highlight >}} - </td> - </tr> - <tr> - <td><strong>projecting</strong></td> - <td><strong>apply (JoinedStream)</strong></td> - <td> -{{< highlight scala >}} -data1.join(data2). - whereClause(case (pk, _) => pk). - isEqualTo(case (_, fk) => fk). - projecting { - case ((pk, tx), (products, fk)) => tx -> products - } -{{< /highlight >}} - </td> - </tr> - </tbody> -</table> - - - -For more information on the semantics of each method, please refer to the [DataStream]({{< ref "docs/dev/datastream/overview" >}}) API documentation. - -To use this extension exclusively, you can add the following `import`: - -{{< highlight scala >}} -import org.apache.flink.api.scala.extensions.acceptPartialFunctions -{{< /highlight >}} - -for the DataSet extensions and - -{{< highlight scala >}} -import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions -{{< /highlight >}} - -The following snippet shows a minimal example of how to use these extension -methods together (with the DataSet API): - -{{< highlight scala >}} -object Main { - import org.apache.flink.streaming.api.scala.extensions._ - - case class Point(x: Double, y: Double) - - def main(args: Array[String]): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(Point(1, 2), Point(3, 4), Point(5, 6)) - - ds.filterWith { - case Point(x, _) => x > 1 - }.reduceWith { - case (Point(x1, y1), (Point(x2, y2))) => Point(x1 + y1, x2 + y2) - }.mapWith { - case Point(x, y) => (x, y) - }.flatMapWith { - case (x, y) => Seq("x" -> x, "y" -> y) - }.keyingBy { - case (id, value) => id - } - } -} -{{< /highlight >}} - -{{< top >}} diff --git a/docs/static/fig/levels_of_abstraction.png b/docs/static/fig/levels_of_abstraction.png new file mode 100644 index 00000000000..74090592976 Binary files /dev/null and b/docs/static/fig/levels_of_abstraction.png differ diff --git a/docs/static/fig/levels_of_abstraction.svg b/docs/static/fig/levels_of_abstraction.svg deleted file mode 100644 index 8f04a313419..00000000000 --- a/docs/static/fig/levels_of_abstraction.svg +++ /dev/null @@ -1,193 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<svg - xmlns:dc="http://purl.org/dc/elements/1.1/" - xmlns:cc="http://creativecommons.org/ns#" - xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" - xmlns:svg="http://www.w3.org/2000/svg" - xmlns="http://www.w3.org/2000/svg" - version="1.1" - width="974.0144" - height="409.9375" - id="svg2"> - <defs - id="defs4" /> - <metadata - id="metadata7"> - <rdf:RDF> - <cc:Work - rdf:about=""> - <dc:format>image/svg+xml</dc:format> - <dc:type - rdf:resource="http://purl.org/dc/dcmitype/StillImage" /> - <dc:title></dc:title> - </cc:Work> - </rdf:RDF> - </metadata> - <g - transform="translate(258.42828,-167.38041)" - id="layer1"> - <g - transform="translate(-323.70953,144.47416)" - id="g2989"> - <path - d="m 66.203993,358.32677 0,73.59333 621.867427,0 0,-73.59333 -621.867427,0 z" - id="path2991" - style="fill:#e4eaf4;fill-opacity:1;fill-rule:evenodd;stroke:none" /> - <path - d="m 66.203993,358.32677 621.867427,0 0,73.59333 -621.867427,0 z" - id="path2993" - style="fill:none;stroke:#898c92;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" /> - <text - x="164.98396" - y="408.29218" - id="text2995" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Stateful</text> - <text - x="293.41599" - y="408.29218" - id="text2997" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Stream Processing</text> - <path - d="m 181.69526,246.88651 0,73.59333 506.37616,0 0,-73.59333 -506.37616,0 z" - id="path2999" - style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:none" /> - <path - d="m 181.69526,246.88651 506.37616,0 0,73.59333 -506.37616,0 z" - id="path3001" - style="fill:none;stroke:#935f1c;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" /> - <text - x="231.63388" - y="296.79422" - id="text3003" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">DataStream </text> - <text - x="428.33289" - y="296.79422" - id="text3005" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">/ </text> - <text - x="447.83777" - y="296.79422" - id="text3007" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">DataSet</text> - <text - x="582.12122" - y="296.79422" - id="text3009" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">API</text> - <path - d="m 288.93448,135.44624 0,73.4433 399.13694,0 0,-73.4433 -399.13694,0 z" - id="path3011" - style="fill:#be73f1;fill-opacity:1;fill-rule:evenodd;stroke:none" /> - <path - d="m 288.93448,135.44624 399.13694,0 0,73.4433 -399.13694,0 z" - id="path3013" - style="fill:none;stroke:#724591;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" /> - <text - x="414.60895" - y="185.29616" - id="text3015" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Table API</text> - <path - d="m 415.0409,23.855943 0,73.593334 273.03052,0 0,-73.593334 -273.03052,0 z" - id="path3017" - style="fill:#e6526e;fill-opacity:1;fill-rule:evenodd;stroke:none" /> - <path - d="m 415.0409,23.855943 273.03052,0 0,73.593334 -273.03052,0 z" - id="path3019" - style="fill:none;stroke:#8a3142;stroke-width:1.87546718px;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" /> - <text - x="516.66846" - y="73.79821" - id="text3021" - xml:space="preserve" - style="font-size:34.95870972px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">SQL</text> - <text - x="722.66699" - y="292.85269" - id="text3023" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Core </text> - <text - x="782.38184" - y="292.85269" - id="text3025" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">APIs</text> - <text - x="722.66699" - y="181.35474" - id="text3027" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Declarative DSL</text> - <text - x="722.66699" - y="69.856773" - id="text3029" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">High</text> - <text - x="774.27985" - y="69.856773" - id="text3031" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">-</text> - <text - x="782.68195" - y="69.856773" - id="text3033" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">level Language</text> - <text - x="722.66699" - y="389.2005" - id="text3035" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">Low</text> - <text - x="768.72845" - y="389.2005" - id="text3037" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">-</text> - <text - x="777.13055" - y="389.2005" - id="text3039" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">level building block</text> - <text - x="722.66699" - y="419.20798" - id="text3041" - xml:space="preserve" - style="font-size:25.05624199px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Arial">(streams, state, [event] time)</text> - </g> - </g> -</svg>
