je-ik commented on code in PR #22889:
URL: https://github.com/apache/beam/pull/22889#discussion_r1015595567
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:
##########
@@ -326,6 +324,17 @@ public DoFnOperator(
this.finishBundleBeforeCheckpointing =
flinkOptions.getFinishBundleBeforeCheckpointing();
}
+ private boolean isRequiresStableInput(DoFn<InputT, OutputT> doFn) {
+ // WindowDoFnOperator does not use a DoFn
+ return doFn != null
Review Comment:
Good point. I guess, that the current state of Flink runner is that the
annotation is not supported on timers. This might be bug, but should be
probably dealt with in a different PR.
##########
runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java:
##########
@@ -38,10 +38,15 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;
Review Comment:
Hm, we might be. But looking into the test, it does not fail consistently
when the annotation is not correctly supported, right? Looks like it turns into
flake in that case. This test fails consistently and tests a flink-specific
behavior (checkpoint), so we might want to keep that as well.
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java:
##########
@@ -85,6 +86,8 @@ public static <InputT, OutputT> BufferingDoFnRunner<InputT,
OutputT> create(
int currentStateIndex;
/** The current handler used for buffering. */
private BufferingElementsHandler currentBufferingElementsHandler;
+ /** Minimum timestamp of all buffered elements. */
+ private volatile long currentOutputWatermarkHold;
Review Comment:
There should be other holds, so releasing single hold should not advance the
watermark. It will advance iff there is nothing else holding it, which seems to
me to be semantically correct. Is it not?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]