1996fanrui commented on code in PR #22925:
URL: https://github.com/apache/flink/pull/22925#discussion_r1255220217
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java:
##########
@@ -46,11 +45,14 @@ public interface TimestampsAndWatermarks<T> {
/** Lets the owner/creator of the output know about latest emitted
watermark. */
@Internal
interface WatermarkUpdateListener {
+
+ /** It should be called once the idle is changed. */
+ void updateIdle(boolean isIdle);
Review Comment:
Actually, I have the same question during developing.
I add the method based on 2 reasons:
- The idle is related to watermark, so it's reasonable to add it.
- We can check if it's idle based on `MAX_WATERMARK`, however it's too
complex.
- We send `MAX_WATERMARK` as the idle at the caller side
- And check `MAX_WATERMARK` as the idle at the callee side
So I think adding the method is more clear.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -146,7 +146,9 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
private DataOutput<OUT> lastInvokedOutput;
- private long lastEmittedWatermark = Watermark.UNINITIALIZED.getTimestamp();
+ private long latestWatermark = Watermark.UNINITIALIZED.getTimestamp();
+
+ private boolean isIdle = false;
Review Comment:
Sounds make sense, updated.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -181,9 +181,10 @@ void announceCombinedWatermark() {
Set<Integer> subTaskIds = combinedWatermark.keySet();
LOG.info(
- "Distributing maxAllowedWatermark={} to subTaskIds={}",
+ "Distributing maxAllowedWatermark={} to subTaskIds={} for
source {}.",
maxAllowedWatermark,
- subTaskIds);
+ subTaskIds,
+ operatorName);
Review Comment:
Good suggestion, updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]