This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5525d85e70b [FLINK-33144][datastream]Deprecate Iteration API in
DataStream
5525d85e70b is described below
commit 5525d85e70bfd53fb47a459b2e637e3d8a2bff54
Author: Wencong Liu <[email protected]>
AuthorDate: Fri Sep 1 10:20:49 2023 +0800
[FLINK-33144][datastream]Deprecate Iteration API in DataStream
This closes #23456
---
.../docs/dev/datastream/execution_mode.md | 1 -
.../docs/dev/datastream/operators/overview.md | 40 --------
docs/content.zh/docs/dev/datastream/overview.md | 90 ------------------
docs/content/docs/dev/datastream/execution_mode.md | 1 -
.../docs/dev/datastream/operators/overview.md | 40 --------
docs/content/docs/dev/datastream/overview.md | 102 ---------------------
flink-examples/flink-examples-streaming/pom.xml | 6 ++
.../flink/streaming/api/datastream/DataStream.java | 22 ++++-
.../streaming/api/datastream/IterativeStream.java | 22 ++++-
9 files changed, 46 insertions(+), 278 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/execution_mode.md
b/docs/content.zh/docs/dev/datastream/execution_mode.md
index a69024b5706..df45f432b74 100644
--- a/docs/content.zh/docs/dev/datastream/execution_mode.md
+++ b/docs/content.zh/docs/dev/datastream/execution_mode.md
@@ -206,7 +206,6 @@ Checkpointing 用于故障恢复的特点之一是,在发生故障时,Flink
`批`模式下不支持的:
* [Checkpointing]({{< ref "docs/concepts/stateful-stream-processing"
>}}#checkpointing) 和任何依赖于 checkpointing 的操作都不支持。
-* [迭代(Iterations)]({{< ref "docs/dev/datastream/operators/overview"
>}}#iterate)
自定义算子应谨慎执行,否则可能会有不恰当的行为。更多细节请参见下面的补充说明。
diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md
b/docs/content.zh/docs/dev/datastream/operators/overview.md
index a8c1b901d46..d5f52aa1f7c 100644
--- a/docs/content.zh/docs/dev/datastream/operators/overview.md
+++ b/docs/content.zh/docs/dev/datastream/operators/overview.md
@@ -567,46 +567,6 @@ connectedStreams.flat_map(MyCoFlatMapFunction())
{{< /tab >}}
{{< /tabs>}}
-### Iterate
-#### DataStream → IterativeStream → ConnectedStream
-
-通过将一个算子的输出重定向到某个之前的算子来在流中创建“反馈”循环。这对于定义持续更新模型的算法特别有用。下面的代码从一个流开始,并不断地应用迭代自身。大于
0 的元素被发送回反馈通道,其余元素被转发到下游。
-
-{{< tabs iterate >}}
-{{< tab "Java" >}}
-```java
-IterativeStream<Long> iteration = initialStream.iterate();
-DataStream<Long> iterationBody = iteration.map (/*do something*/);
-DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
- @Override
- public boolean filter(Long value) throws Exception {
- return value > 0;
- }
-});
-iteration.closeWith(feedback);
-DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
- @Override
- public boolean filter(Long value) throws Exception {
- return value <= 0;
- }
-});
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-initialStream.iterate {
- iteration => {
- val iterationBody = iteration.map {/*do something*/}
- (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
- }
-}
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-Python 中尚不支持此特性。
-{{< /tab >}}
-{{< /tabs>}}
-
### Cache
#### DataStream → CachedDataStream
diff --git a/docs/content.zh/docs/dev/datastream/overview.md
b/docs/content.zh/docs/dev/datastream/overview.md
index e281ccf0b8e..9544803692e 100644
--- a/docs/content.zh/docs/dev/datastream/overview.md
+++ b/docs/content.zh/docs/dev/datastream/overview.md
@@ -428,96 +428,6 @@ Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系
{{< top >}}
-<a name="iterations"></a>
-
-Iterations
-----------
-
-{{< tabs "c4cc97af-7ce1-4333-a010-3072b34d5540" >}}
-{{< tab "Java" >}}
-
-Iterative streaming 程序实现了 step function 并将其嵌入到 `IterativeStream` 。由于
DataStream 程序可能永远不会完成,因此没有最大迭代次数。相反,你需要指定流的哪一部分反馈给迭代,哪一部分使用[旁路输出]({{< ref
"docs/dev/datastream/side_output" >}})或`过滤器`转发到下游。这里,我们展示了一个使用过滤器的示例。首先,我们定义一个
IterativeStream
-
-```java
-IterativeStream<Integer> iteration = input.iterate();
-```
-
-然后,我们使用一系列转换(这里是一个简单的 `map` 转换)指定将在循环内执行的逻辑
-
-```java
-DataStream<Integer> iterationBody = iteration.map(/* this is executed many
times */);
-```
-
-要关闭迭代并定义迭代尾部,请调用 `IterativeStream` 的 `closeWith(feedbackStream)` 方法。提供给
`closeWith` 函数的 DataStream
将反馈给迭代头。一种常见的模式是使用过滤器将反馈的流部分和向前传播的流部分分开。例如,这些过滤器可以定义“终止”逻辑,其中允许元素向下游传播而不是被反馈。
-
-```java
-iteration.closeWith(iterationBody.filter(/* one part of the stream */));
-DataStream<Integer> output = iterationBody.filter(/* some other part of the
stream */);
-```
-
-例如,下面的程序从一系列整数中连续减去 1,直到它们达到零:
-
-```java
-DataStream<Long> someIntegers = env.generateSequence(0, 1000);
-
-IterativeStream<Long> iteration = someIntegers.iterate();
-
-DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- return value - 1 ;
- }
-});
-
-DataStream<Long> stillGreaterThanZero = minusOne.filter(new
FilterFunction<Long>() {
- @Override
- public boolean filter(Long value) throws Exception {
- return (value > 0);
- }
-});
-
-iteration.closeWith(stillGreaterThanZero);
-
-DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
- @Override
- public boolean filter(Long value) throws Exception {
- return (value <= 0);
- }
-});
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-
-Iterative streaming 程序实现了 step function 并将其嵌入到 `IterativeStream` 。由于
DataStream 程序可能永远不会完成,因此没有最大迭代次数。相反,你需要指定流的哪一部分反馈给迭代,哪一部分使用[旁路输出]({{< ref
"docs/dev/datastream/side_output"
>}})或`过滤器`转发到下游。这里,我们展示了一个迭代示例,其中主体(重复计算的部分)是一个简单的映射转换,使用过滤器将反馈的元素和向下游转发的元素进行分离。
-
-```scala
-val iteratedStream = someDataStream.iterate(
- iteration => {
- val iterationBody = iteration.map(/* this is executed many times */)
- (iterationBody.filter(/* one part of the stream */),
iterationBody.filter(/* some other part of the stream */))
-})
-```
-
-例如,下面的程序从一系列整数中连续减去 1,直到它们达到零:
-
-```scala
-val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
-
-val iteratedStream = someIntegers.iterate(
- iteration => {
- val minusOne = iteration.map( v => v - 1)
- val stillGreaterThanZero = minusOne.filter (_ > 0)
- val lessThanZero = minusOne.filter(_ <= 0)
- (stillGreaterThanZero, lessThanZero)
- }
-)
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-{{< top >}}
-
<a name="execution-parameters"></a>
执行参数
diff --git a/docs/content/docs/dev/datastream/execution_mode.md
b/docs/content/docs/dev/datastream/execution_mode.md
index 3f903907cdc..925b03984ca 100644
--- a/docs/content/docs/dev/datastream/execution_mode.md
+++ b/docs/content/docs/dev/datastream/execution_mode.md
@@ -352,7 +352,6 @@ Unsupported in BATCH mode:
* [Checkpointing]({{< ref "docs/concepts/stateful-stream-processing"
>}}#checkpointing)
and any operations that depend on checkpointing do not work.
-* [Iterations]({{< ref "docs/dev/datastream/operators/overview" >}}#iterate)
Custom operators should be implemented with care, otherwise they might behave
improperly. See also additional explanations below for more details.
diff --git a/docs/content/docs/dev/datastream/operators/overview.md
b/docs/content/docs/dev/datastream/operators/overview.md
index 5e04da86aea..e9834b7c8bd 100644
--- a/docs/content/docs/dev/datastream/operators/overview.md
+++ b/docs/content/docs/dev/datastream/operators/overview.md
@@ -570,46 +570,6 @@ connectedStreams.flat_map(MyCoFlatMapFunction())
{{< /tab >}}
{{< /tabs>}}
-### Iterate
-#### DataStream → IterativeStream → ConnectedStream
-
-Creates a "feedback" loop in the flow, by redirecting the output of one
operator to some previous operator. This is especially useful for defining
algorithms that continuously update a model. The following code starts with a
stream and applies the iteration body continuously. Elements that are greater
than 0 are sent back to the feedback channel, and the rest of the elements are
forwarded downstream.
-
-{{< tabs iterate >}}
-{{< tab "Java" >}}
-```java
-IterativeStream<Long> iteration = initialStream.iterate();
-DataStream<Long> iterationBody = iteration.map (/*do something*/);
-DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
- @Override
- public boolean filter(Long value) throws Exception {
- return value > 0;
- }
-});
-iteration.closeWith(feedback);
-DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
- @Override
- public boolean filter(Long value) throws Exception {
- return value <= 0;
- }
-});
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-initialStream.iterate {
- iteration => {
- val iterationBody = iteration.map {/*do something*/}
- (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
- }
-}
-```
-{{< /tab >}}
-{{< tab "Python" >}}
-This feature is not yet supported in Python
-{{< /tab >}}
-{{< /tabs>}}
-
### Cache
#### DataStream → CachedDataStream
diff --git a/docs/content/docs/dev/datastream/overview.md
b/docs/content/docs/dev/datastream/overview.md
index cbe856d7c12..b72fcf7c871 100644
--- a/docs/content/docs/dev/datastream/overview.md
+++ b/docs/content/docs/dev/datastream/overview.md
@@ -531,108 +531,6 @@ for exactly-once semantics.
{{< top >}}
-Iterations
-----------
-
-{{< tabs "c4cc97af-7ce1-4333-a010-3072b34d5540" >}}
-{{< tab "Java" >}}
-
-Iterative streaming programs implement a step function and embed it into an
`IterativeStream`. As a DataStream
-program may never finish, there is no maximum number of iterations. Instead,
you need to specify which part
-of the stream is fed back to the iteration and which part is forwarded
downstream using a [side output]({{< ref "docs/dev/datastream/side_output" >}})
-or a `filter`. Here, we show an example using filters. First, we define an
`IterativeStream`
-
-```java
-IterativeStream<Integer> iteration = input.iterate();
-```
-
-Then, we specify the logic that will be executed inside the loop using a
series of transformations (here
-a simple `map` transformation)
-
-```java
-DataStream<Integer> iterationBody = iteration.map(/* this is executed many
times */);
-```
-
-To close an iteration and define the iteration tail, call the
`closeWith(feedbackStream)` method of the `IterativeStream`.
-The DataStream given to the `closeWith` function will be fed back to the
iteration head.
-A common pattern is to use a filter to separate the part of the stream that is
fed back,
-and the part of the stream which is propagated forward. These filters can,
e.g., define
-the "termination" logic, where an element is allowed to propagate downstream
rather
-than being fed back.
-
-```java
-iteration.closeWith(iterationBody.filter(/* one part of the stream */));
-DataStream<Integer> output = iterationBody.filter(/* some other part of the
stream */);
-```
-
-For example, here is program that continuously subtracts 1 from a series of
integers until they reach zero:
-
-```java
-DataStream<Long> someIntegers = env.generateSequence(0, 1000);
-
-IterativeStream<Long> iteration = someIntegers.iterate();
-
-DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- return value - 1 ;
- }
-});
-
-DataStream<Long> stillGreaterThanZero = minusOne.filter(new
FilterFunction<Long>() {
- @Override
- public boolean filter(Long value) throws Exception {
- return (value > 0);
- }
-});
-
-iteration.closeWith(stillGreaterThanZero);
-
-DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
- @Override
- public boolean filter(Long value) throws Exception {
- return (value <= 0);
- }
-});
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-
-Iterative streaming programs implement a step function and embed it into an
`IterativeStream`. As a DataStream
-program may never finish, there is no maximum number of iterations. Instead,
you need to specify which part
-of the stream is fed back to the iteration and which part is forwarded
downstream using a [side output]({{< ref "docs/dev/datastream/side_output" >}})
-or a `filter`. Here, we show an example iteration where the body (the part of
the computation that is repeated)
-is a simple map transformation, and the elements that are fed back are
distinguished by the elements that
-are forwarded downstream using filters.
-
-```scala
-val iteratedStream = someDataStream.iterate(
- iteration => {
- val iterationBody = iteration.map(/* this is executed many times */)
- (iterationBody.filter(/* one part of the stream */),
iterationBody.filter(/* some other part of the stream */))
-})
-```
-
-For example, here is program that continuously subtracts 1 from a series of
integers until they reach zero:
-
-```scala
-val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
-
-val iteratedStream = someIntegers.iterate(
- iteration => {
- val minusOne = iteration.map( v => v - 1)
- val stillGreaterThanZero = minusOne.filter (_ > 0)
- val lessThanZero = minusOne.filter(_ <= 0)
- (stillGreaterThanZero, lessThanZero)
- }
-)
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-{{< top >}}
-
Execution Parameters
--------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml
b/flink-examples/flink-examples-streaming/pom.xml
index 4ff5a0a1cc8..582fb28ee29 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -137,6 +137,12 @@ under the License.
<configuration>
<compilerArgument>-Xlint:deprecation</compilerArgument>
<failOnWarning>true</failOnWarning>
+ <!-- This example is
temporarily preserved only for testing purpose. -->
+ <excludes>
+ <exclude>
+
org/apache/flink/streaming/examples/iteration/IterateExample.java
+ </exclude>
+ </excludes>
</configuration>
</execution>
</executions>
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d0195ae1c15..d6843097401 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -526,8 +526,17 @@ public class DataStream<T> {
* in the set time, the stream terminates.
*
* @return The iterative data stream created.
+ * @deprecated This method is deprecated since Flink 1.19. The only known
use case of this
+ * Iteration API comes from Flink ML, which already has its own
implementation of iteration
+ * and no longer uses this API. If there's any use cases other than
Flink ML that needs
+ * iteration support, please reach out to [email protected] and we
can consider making
+ * the Flink ML iteration implementation a separate common library.
+ * @see <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream">
+ * FLIP-357: Deprecate Iteration API of DataStream </a>
+ * @see <a
href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">Flink ML </a>
*/
- @PublicEvolving
+ @Deprecated
public IterativeStream<T> iterate() {
return new IterativeStream<>(this, 0);
}
@@ -554,8 +563,17 @@ public class DataStream<T> {
*
* @param maxWaitTimeMillis Number of milliseconds to wait between inputs
before shutting down
* @return The iterative data stream created.
+ * @deprecated This method is deprecated since Flink 1.19. The only known
use case of this
+ * Iteration API comes from Flink ML, which already has its own
implementation of iteration
+ * and no longer uses this API. If there's any use cases other than
Flink ML that needs
+ * iteration support, please reach out to [email protected] and we
can consider making
+ * the Flink ML iteration implementation a separate common library.
+ * @see <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream">
+ * FLIP-357: Deprecate Iteration API of DataStream </a>
+ * @see <a
href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">Flink ML </a>
*/
- @PublicEvolving
+ @Deprecated
public IterativeStream<T> iterate(long maxWaitTimeMillis) {
return new IterativeStream<>(this, maxWaitTimeMillis);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
index 21031858b1f..6b53493cb2e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
@@ -34,8 +33,17 @@ import java.util.Collection;
* The iterative data stream represents the start of an iteration in a {@link
DataStream}.
*
* @param <T> Type of the elements in this Stream
+ * @deprecated This method is deprecated since Flink 1.19. The only known use
case of this Iteration
+ * API comes from Flink ML, which already has its own implementation of
iteration and no longer
+ * uses this API. If there's any use cases other than Flink ML that needs
iteration support,
+ * please reach out to [email protected] and we can consider making
the Flink ML iteration
+ * implementation a separate common library.
+ * @see <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream">
+ * FLIP-357: Deprecate Iteration API of DataStream </a>
+ * @see <a
href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">Flink ML </a>
*/
-@PublicEvolving
+@Deprecated
public class IterativeStream<T> extends SingleOutputStreamOperator<T> {
// We store these so that we can create a co-iteration if we need to
@@ -128,7 +136,17 @@ public class IterativeStream<T> extends
SingleOutputStreamOperator<T> {
*
* @param <I> Type of the input of the iteration
* @param <F> Type of the feedback of the iteration
+ * @deprecated This method is deprecated since Flink 1.19. The only known
use case of this
+ * Iteration API comes from Flink ML, which already has its own
implementation of iteration
+ * and no longer uses this API. If there's any use cases other than
Flink ML that needs
+ * iteration support, please reach out to [email protected] and we
can consider making
+ * the Flink ML iteration implementation a separate common library.
+ * @see <a
+ *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream">
+ * FLIP-357: Deprecate Iteration API of DataStream </a>
+ * @see <a
href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">Flink ML </a>
*/
+ @Deprecated
@Public
public static class ConnectedIterativeStreams<I, F> extends
ConnectedStreams<I, F> {