[ 
https://issues.apache.org/jira/browse/FLINK-32414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32414:
----------------------------
        Parent: FLINK-32548
    Issue Type: Sub-task  (was: Bug)

> Watermark alignment will cause flink jobs to hang forever when any source 
> subtask has no SourceSplit
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32414
>                 URL: https://issues.apache.org/jira/browse/FLINK-32414
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Common
>    Affects Versions: 1.16.2, 1.17.1
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.18.0, 1.16.3, 1.17.2
>
>         Attachments: image-2023-06-22-22-43-59-671.png
>
>
> Watermark alignment will cause flink jobs to hang forever when any source 
> subtask has no SourceSplit.
> h1. Root cause:
>  # 
> [SourceOperator#emitLatestWatermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L504]
>  reports the lastEmittedWatermark to SourceCoordinator
>  # If one subtask has no SourceSplit, the lastEmittedWatermark will be the 
> [Watermark.UNINITIALIZED.getTimestamp()|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L149]
>  forever, it's Long.MIN_VALUE.
>  # SourceCoordinator combines the watermark of all subtasks, and using the 
> [minimum 
> watermark|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L644]
>  as the aggregated watermark.
>  # Long.MIN_VALUE must be the minimum watermark, so the maxAllowedWatermark = 
>  Long.MIN_VALUE + maxAllowedWatermarkDrift, and [SourceCoordinator will 
> announce it to all 
> subtasks.|https://github.com/apache/flink/blob/274aa0debffaa57926c474f11e36be753b49cbc5/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L168]
>  # The maxAllowedWatermark is very small, so all source subtasks will hang 
> forever
> h1. How to reproduce?
> When the kafka partition number is less than the parallelism of kafka source.
> Here is a demo: [code 
> link|https://github.com/1996fanrui/fanrui-learning/commit/24b707f7805b3a61a70df1c70c26f8e8a16b006b]
>  * kafka partition is 1
>  * The paralleslism is 2
>  
> !image-2023-06-22-22-43-59-671.png|width=1439,height=296!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to