[ 
https://issues.apache.org/jira/browse/FLINK-32496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17739274#comment-17739274
 ] 

Rui Fan commented on FLINK-32496:
---------------------------------

Add more background here.
h2. What is the purpose of the change

When one source is always active and other sources are idle (Either there is no 
data, or alignment causes idle.), these source won't resume.

For example, sourceA is always active, sourceB is idle due to there is no data. 
SourceB won't resume forever even if it can read more data.
h3. Root cause
 * Step1: When source is idle, [WatermarkToDataOutput will 
update|https://github.com/apache/flink/blob/c1740861727d2614f9bbf154bcdd274d7990e133/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java#L95C12-L95C12]
 the {{Long.MAX_VALUE}} to 
[{{SourceOperator#lastEmittedWatermark}}|https://github.com/apache/flink/blob/c1740861727d2614f9bbf154bcdd274d7990e133/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L605]
 as the watermark.
 * Step2: 
[{{SourceOperator#shouldWaitForAlignment}}|https://github.com/apache/flink/blob/c1740861727d2614f9bbf154bcdd274d7990e133/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L678]
 is {{{}currentMaxDesiredWatermark < lastEmittedWatermark{}}}, the 
{{lastEmittedWatermark}} is updated to {{Long.MAX_VALUE}} and the 
{{currentMaxDesiredWatermark}} is normal due to one source is active. So 
{{shouldWaitForAlignment}} will be always true.
 * Step3: {{SourceOperator#updateCurrentEffectiveWatermark}} -> 
{{checkWatermarkAlignment()}} -> {{shouldWaitForAlignment()}} will update the 
{{operatingMode}} to {{OperatingMode.WAITING_FOR_ALIGNMENT;}}

h3. Bug 👻👻👻
 * Bug1: Actually, it shouldn't be updated to WAITING_FOR_ALIGNMENT. It's 
updated, because the {{lastEmittedWatermark}} isn't the real watermark. It's 
very big due to idle.
 * Bug2: The source operatingMode cannot convert from {{WAITING_FOR_ALIGNMENT}} 
to {{READING}} at {{checkWatermarkAlignment}} forever. Because 
{{shouldWaitForAlignment}} is always true.

h2. Brief change log

The idle shouldn't update {{SourceOperator#lastEmittedWatermark}} and shouldn't 
effect the {{shouldWaitForAlignment()}} logic.

I introduced the {{{}SourceOperator#isIdle{}}}, when it's true, 
{{emitLatestWatermark}} will send the {{Watermark.MAX_WATERMARK}} to the 
SourceCoordinator. It won't effect the alignment logic at SourceOperator side.

> Sources with idleness and alignment always wait for alignment when part of 
> multiple sources is idle
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32496
>                 URL: https://issues.apache.org/jira/browse/FLINK-32496
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.16.2, 1.17.1
>            Reporter: haishui
>            Assignee: Rui Fan
>            Priority: Major
>              Labels: pull-request-available
>
> Sources with idleness and alignment always wait for alignment when part of 
> multiple sources is idle.
> *Root cause:*
> In 
> [SourceOperator|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java],
>  `lastEmittedWatermark` is Long.MAX_VALUE if a source is idle.
> When other source is active, the `currentMaxDesiredWatermark` is less then 
> Long.MAX_VALUE.
> So the `shouldWaitForAlignment` method is always true for idle sources.
>  
> What's more, the source will become idle if a source wait for alignment for a 
> long time, which also should be considered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to