[ 
https://issues.apache.org/jira/browse/FLINK-32414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32414:
----------------------------
    Description: 
Watermark alignment will cause flink jobs to hang forever when any source 
subtask has no SourceSplit.
h1. Root cause:
 # 
[SourceOperator#emitLatestWatermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L504]
 reports the lastEmittedWatermark to SourceCoordinator
 # If one subtask has no SourceSplit, the lastEmittedWatermark will be the 
[Watermark.UNINITIALIZED.getTimestamp()|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L149]
 forever, it's Long.MIN_VALUE.
 # SourceCoordinator combines the watermark of all subtasks, and using the 
[minimum 
watermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
 as the aggregated watermark.
 # Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark =  
Long.MIN_VALUE + maxAllowedWatermarkDrift, and [SourceCoordinator will announce 
it to all 
subtasks.|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L168]
 # The maxAllowedWatermark is very small, so all source subtasks will hang 
forever

h1. How to reproduce?

When the kafka partition number is less than the parallelism of kafka source.

Here is a demo: [code 
link|https://github.com/1996fanrui/fanrui-learning/commit/24b707f7805b3a61a70df1c70c26f8e8a16b006b]
 * kafka partition is 1
 * The paralleslism is 2

 

!image-2023-06-22-22-43-59-671.png|width=1439,height=296!

  was:
Watermark alignment will cause flink jobs to hang forever when any source 
subtask has no SourceSplit.
h1. Root cause:
 # 
[SourceOperator#emitLatestWatermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L504]
 reports the lastEmittedWatermark to SourceCoordinator
 # If one subtask has no SourceSplit, the lastEmittedWatermark will be the 
[Watermark.UNINITIALIZED.getTimestamp()|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L149]
 forever, it's Long.MIN_VALUE.
 # SourceCoordinator combines the watermark of all subtasks, and using the 
[minimum 
watermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
 as the aggregated watermark.
 # Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark =  
Long.MIN_VALUE + maxAllowedWatermarkDrift, and [SourceCoordinator will announce 
it to all 
subtasks.|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L168]
 # The maxAllowedWatermark is very small, so all source subtasks will hang 
forever

h1. How to reproduce?

When the kafka partition number is less than the parallelism of kafka source.

Here is a demo: [code 
link|https://github.com/1996fanrui/fanrui-learning/commit/24b707f7805b3a61a70df1c70c26f8e8a16b006b]
 * kafka partition is 1
 * The paralleslism is 2

 

!image-2023-06-22-22-43-59-671.png!


> Watermark alignment will cause flink jobs to hang forever when any source 
> subtask has no SourceSplit
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32414
>                 URL: https://issues.apache.org/jira/browse/FLINK-32414
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.2, 1.17.1
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>         Attachments: image-2023-06-22-22-43-59-671.png
>
>
> Watermark alignment will cause flink jobs to hang forever when any source 
> subtask has no SourceSplit.
> h1. Root cause:
>  # 
> [SourceOperator#emitLatestWatermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L504]
>  reports the lastEmittedWatermark to SourceCoordinator
>  # If one subtask has no SourceSplit, the lastEmittedWatermark will be the 
> [Watermark.UNINITIALIZED.getTimestamp()|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L149]
>  forever, it's Long.MIN_VALUE.
>  # SourceCoordinator combines the watermark of all subtasks, and using the 
> [minimum 
> watermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
>  as the aggregated watermark.
>  # Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark = 
>  Long.MIN_VALUE + maxAllowedWatermarkDrift, and [SourceCoordinator will 
> announce it to all 
> subtasks.|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L168]
>  # The maxAllowedWatermark is very small, so all source subtasks will hang 
> forever
> h1. How to reproduce?
> When the kafka partition number is less than the parallelism of kafka source.
> Here is a demo: [code 
> link|https://github.com/1996fanrui/fanrui-learning/commit/24b707f7805b3a61a70df1c70c26f8e8a16b006b]
>  * kafka partition is 1
>  * The paralleslism is 2
>  
> !image-2023-06-22-22-43-59-671.png|width=1439,height=296!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to