[ 
https://issues.apache.org/jira/browse/FLINK-38817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18046479#comment-18046479
 ] 

Zhu Zhu commented on FLINK-38817:
---------------------------------

I think the core problem is not the partitioner conversion. It is that the 
parallelism of downstream operators of the SortLimit should never exceed 1, and 
therefore they should be assigned a maxParallelism of 1.

Otherwise, Flink will assume that the parallelism of downstream operators can 
be greater than 1, which can lead to the issue mentioned in your Google Doc: 
"The problem occurs even when parallelism matches (e.g., both upstream and 
downstream have parallelism = 1)."

With maxParallelism set to 1, the result will be correct even if a rescale 
partitioner is used. In such cases (i.e. SortLimit (maxParallelism=1) → Sink 
(maxParallelism=1)) a rescale partitioner behaves exactly the same as a forward 
partitioner.

One more thing to mention is that Flink does not convert a ForwardPartitioner 
into a RescalePartitioner. A ForwardForUnspecifiedPartitioner is different from 
a ForwardPartitioner: it means the partitioner can be converted into a forward 
partitioner, but is not required to. Therefore, theoretically, it should be 
safe to convert it into other partitioners.

And let’s see under what conditions this issue can be reproduced.

> Out of order data seen while running tpc-ds queries
> ---------------------------------------------------
>
>                 Key: FLINK-38817
>                 URL: https://issues.apache.org/jira/browse/FLINK-38817
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.2.0
>            Reporter: Bonnie Varghese
>            Priority: Major
>         Attachments: screenshot-1.png
>
>
> All unspecified edges are converted to Rescale edges by default for dynamic 
> graphs. Related Jira - https://issues.apache.org/jira/browse/FLINK-25046
> While testing tpc-ds queries I observed that after a global operation the 
> order of the global operation is not preserved due to Rescale edges.
> For SQL batch to work correctly, we should keep Forward edges after a global 
> operation such as `SortLimit` or `Sort `to obtain data correctness and 
> avoiding out of order data.
> I have put my observations and experiments in this doc here:
> [https://docs.google.com/document/d/1TTj2ddlQTfDgtGb0ISmiKWt6R9U4RxJ59o6bULC1YtI/edit?usp=sharing]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to