Rui Fan created FLINK-32414:
-------------------------------
Summary: Watermark alignment will cause flink jobs to hang forever
when any source subtask has no SourceSplit
Key: FLINK-32414
URL: https://issues.apache.org/jira/browse/FLINK-32414
Project: Flink
Issue Type: Bug
Affects Versions: 1.17.1, 1.16.2
Reporter: Rui Fan
Assignee: Rui Fan
Attachments: image-2023-06-22-22-43-59-671.png
Watermark alignment will cause flink jobs to hang forever when any source
subtask has no SourceSplit.
h1. Root cause:
#
[SourceOperator#emitLatestWatermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L504]
reports the lastEmittedWatermark to SourceCoordinator
# If one subtask has no SourceSplit, the lastEmittedWatermark will be the
[Watermark.UNINITIALIZED.getTimestamp()|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L149]
forever, it's Long.MIN_VALUE.
# SourceCoordinator combines the watermark of all subtasks, and using the
[minimum
watermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
as the aggregated watermark.
# Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark =
Long.MIN_VALUE + maxAllowedWatermarkDrift, and [SourceCoordinator will announce
it to all
subtasks.|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L168]
# The maxAllowedWatermark is very small, so all source subtasks will hang
forever
h1. How to reproduce?
When the kafka partition number is less than the parallelism of kafka source.
Here is a demo: [code
link|https://github.com/1996fanrui/fanrui-learning/commit/24b707f7805b3a61a70df1c70c26f8e8a16b006b]
* kafka partition is 1
* The paralleslism is 2
!image-2023-06-22-22-43-59-671.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)