Thanks for bringing up this! It would be a useful feature for batch users.

For the FLIP, I have some questions:

1st, the implementation plan is to rewrite the optimization based on the
execnode of the table planner, but the config option for the optimization
is under flink-core module, does it mean this optimization is available for
datastream jobs as well? (I didn't see the details in the FLIP)
If doesn't, my suggestion is to put these new options into table module.

2nd, the FLIP performs parameter control and optimization based on the
size of the key group, how can users perceive the specific key group size
(or the median key group size) from the job information provided by flink?

3rd, IIUC, the following limitation in the FLIP exists only for streaming
executions. So, is the new IntraInputKeyGroupCorrelation /
InterInputsKeyGroupCorrelation mentioned in FLIP still necessary?
> “The existing data distribution algorithms in Flink impose strict
limitations on joins, requiring that data within the same key group
must be sent to the same downstream for processing. This restricts
the adaptability of data distribution.”


Best,
Lincoln Lee


Zhu Zhu <reed...@gmail.com> 于2024年8月19日周一 16:50写道:

> +1 for the FLIP
>
> Long-tail tasks caused by skewed data usually pose significant
> challenges for users. It's great that Flink can mitigate such
> issues automatically.
>
> Thanks,
> Zhu
>
> Lei Yang <leya5...@gmail.com> 于2024年8月16日周五 11:18写道:
>
> > Hi devs,
> >
> >
> > Junrui Lee, Xia Sun and I would like to initiate a discussion about
> > FLIP-475: Support Adaptive Skewed Join Optimization [1].
> >
> >
> > In a Join query, when certain keys occur frequently, it can lead to an
> > uneven distribution of data across partitions. This may affect the
> > execution performance of Flink jobs, as a single partition with skewed
> data
> > can severely downgrade the performance of the entire job. To ensure data
> is
> > evenly distributed to downstream tasks, we can use the statistics of the
> > input to split (and duplicate if needed) skewed and splittable partitions
> > into balanced partitions at runtime. However, currently, Flink is unable
> to
> > accurately determine which partitions are skewed and eligible for
> splitting
> > at runtime, and it also lacks the capability to split data within the
> same
> > key group.
> >
> >
> > To address this issue, we plan to introduce Adaptive Skewed Join
> > Optimization capability. This will allow the Join operator to dynamically
> > split partitions that are skewed and splittable based on the statistics
> of
> > the input at runtime, reducing the long-tail problem caused by skewed
> data.
> > This FLIP is based on FLIP-469 [2] and also leverages capabilities
> > introduced in FLIP-470 [3].
> >
> >
> > For more details, please refer to FLIP-475 [1]. We look forward to your
> > feedback.
> >
> >
> > Best,
> >
> >
> > Junrui Lee, Xia Sun and Lei Yang
> >
> >
> > [1]
> >
> > *
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization
> > >*
> >
> > [2]
> >
> > *
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > >*
> >
> > [3]
> >
> > *
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > >*
> >
>

Reply via email to