This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cdf43aecafa [FLINK-35933] Skip distributing maxAllowedWatermark if
there are no subtasks
cdf43aecafa is described below
commit cdf43aecafa33a50a4b4b6841c4cf505b195994a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Jul 30 17:21:49 2024 +0000
[FLINK-35933] Skip distributing maxAllowedWatermark if there are no subtasks
---
.../flink/runtime/source/coordinator/SourceCoordinator.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 3133bbe7ce7..982310bc540 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -166,6 +166,13 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
@VisibleForTesting
void announceCombinedWatermark() {
+ Set<Integer> subTaskIds = combinedWatermark.keySet();
+ if (subTaskIds.isEmpty()) {
+ LOG.debug(
+ "Skip distributing maxAllowedWatermark of group={} for
source {} - no subtasks.",
+ watermarkAlignmentParams.getWatermarkGroup(),
+ operatorName);
+ }
checkState(
watermarkAlignmentParams !=
WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED);
@@ -190,7 +197,6 @@ public class SourceCoordinator<SplitT extends SourceSplit,
EnumChkT>
maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
}
- Set<Integer> subTaskIds = combinedWatermark.keySet();
LOG.info(
"Distributing maxAllowedWatermark={} of group={} to
subTaskIds={} for source {}.",
maxAllowedWatermark,