This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new cb376d7e [hotfix] Discard watermarks when feed a datastream into
iteration body
cb376d7e is described below
commit cb376d7e45074a6052c5331e511cfc5aca224ddf
Author: Zhipeng Zhang <[email protected]>
AuthorDate: Fri Apr 14 15:09:03 2023 +0800
[hotfix] Discard watermarks when feed a datastream into iteration body
This closes #223.
---
.../src/main/java/org/apache/flink/iteration/Iterations.java | 3 +++
.../java/org/apache/flink/iteration/operator/InputOperator.java | 6 ++++++
2 files changed, 9 insertions(+)
diff --git
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
index 6e6ff9d3..8f716770 100644
---
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
+++
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
@@ -85,6 +85,9 @@ import static org.apache.flink.util.Preconditions.checkState;
* <p>The limitation of constructing the subgraph inside the iteration body
could be refer in {@link
* IterationBody}.
*
+ * <p>Note that the iteration framework cannot deal with watermarks correctly
for now. It should be
+ * resolved by FLINK-31373.
+ *
* <p>An example of the iteration is like:
*
* <pre>{@code
diff --git
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
index b6908ff8..604715f1 100644
---
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
+++
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/** Input operator that wraps the user record into {@link IterationRecord}. */
@@ -46,4 +47,9 @@ public class InputOperator<T> extends
AbstractStreamOperator<IterationRecord<T>>
reusable.getValue().setValue(streamRecord.getValue());
output.collect(reusable);
}
+
+ @Override
+ public void processWatermark(Watermark mark) {
+ // TODO: FLINK-31373 Support processing watermarks in iterations.
+ }
}