mas-chen commented on code in PR #22925:
URL: https://github.com/apache/flink/pull/22925#discussion_r1255025231
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java:
##########
@@ -46,11 +45,14 @@ public interface TimestampsAndWatermarks<T> {
/** Lets the owner/creator of the output know about latest emitted
watermark. */
@Internal
interface WatermarkUpdateListener {
+
+ /** It should be called once the idle is changed. */
+ void updateIdle(boolean isIdle);
Review Comment:
I wonder if we need this API. Can't we tell by
#updateCurrentEffectiveWatermark() already if the source is idle, e.g. checking
if it is a MAX_WATERMARK?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -146,7 +146,9 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
private DataOutput<OUT> lastInvokedOutput;
- private long lastEmittedWatermark = Watermark.UNINITIALIZED.getTimestamp();
+ private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
+
+ private boolean isIdle = false;
Review Comment:
In terms of convention, I agree with @RocMarshal . I think (2) by IDEA is
intended it is `is` because it is a boolean. The thinking is "is it?" or "is it
not?"--because boolean is binary
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -181,9 +181,10 @@ void announceCombinedWatermark() {
Set<Integer> subTaskIds = combinedWatermark.keySet();
LOG.info(
- "Distributing maxAllowedWatermark={} to subTaskIds={}",
+ "Distributing maxAllowedWatermark={} to subTaskIds={} for
source {}.",
maxAllowedWatermark,
- subTaskIds);
+ subTaskIds,
+ operatorName);
Review Comment:
Is possible to also to include the alignment group names? Would be useful
for debugging in the case of multiple groups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]