Thank you Yang for your reply! The updated optoins in the FLIP addressed my 1st question.
And for the 2nd question, I understand the mechanism of your implementation, but as a public option, it's important for the user to have a specific metric to base the value of the option on when they need to adjust it. Or to put it another way, under what circumstances do users need to adjust the value of these options, and to what value? For the 3rd question, I now understand that the "key group" you mentioned actually refers to all the data corresponding to a specific join key, rather than the concept of key groups related to Flink state. I'm not sure if others might have this misunderstanding as well, but using a different description to distinguish it from the well-known key group concept in Flink streaming can help avoid such confusion. WDYT? Best, Lincoln Lee Lei Yang <leya5...@gmail.com> 于2024年8月30日周五 14:37写道: > Hi Lincoln Lee, > Thanks for your feedback! > For the 1st question, thank you for the reminder. This optimization is only > available for Table jobs in batch mode, and I have put these new options > into table module. I also replaced the "enable" and "force" configurations > with a new enum type configuration to avoid confusing. The new enum type > configuration has three values: "AUTO" means Flink will automatically apply > this optimization, "FORCED" means Flink will enforce this optimization even > if it introduces extra hash shuffle, and "NONE" means this optimization > will not be executed. > For 2nd question, the key group size (or median size) is calculated based > on statistical information from the upstream output and is used to > determine data skewness, so users do not need to know the specific values. > I specifically mentioned "the median key group size" just because I chose > to use it to represent the central tendency of data processing volumes > across all parallel instances. > For 3rd question, after my confirmation, this limitation also exists in > batch mode. Therefore, IntraInputKeyGroupCorrelation and > InterInputsKeyGroupCorrelation are necessary. I need them to determine > whether and how to split the skewed key group to ensure data correctness. > Additionally, adding these two correlations has other benefits: other > optimization strategies can also modify them to flexibly choose the data > distribution algorithm based on the operator’s specific situation. > > Best, Lei Yang > > Lincoln Lee <lincoln.8...@gmail.com> 于2024年8月29日周四 23:13写道: > > > Lincoln Lee <lincoln.8...@gmail.com> 于2024年8月29日周四 23:13写道: > > > Thanks for bringing up this! It would be a useful feature for batch > users. > > > > For the FLIP, I have some questions: > > > > 1st, the implementation plan is to rewrite the optimization based on the > > execnode of the table planner, but the config option for the optimization > > is under flink-core module, does it mean this optimization is available > for > > datastream jobs as well? (I didn't see the details in the FLIP) > > If doesn't, my suggestion is to put these new options into table module. > > > > 2nd, the FLIP performs parameter control and optimization based on the > > size of the key group, how can users perceive the specific key group size > > (or the median key group size) from the job information provided by > flink? > > > > 3rd, IIUC, the following limitation in the FLIP exists only for streaming > > executions. So, is the new IntraInputKeyGroupCorrelation / > > InterInputsKeyGroupCorrelation mentioned in FLIP still necessary? > > > “The existing data distribution algorithms in Flink impose strict > > limitations on joins, requiring that data within the same key group > > must be sent to the same downstream for processing. This restricts > > the adaptability of data distribution.” > > > > > > Best, > > Lincoln Lee > > > > > > Zhu Zhu <reed...@gmail.com> 于2024年8月19日周一 16:50写道: > > > > > +1 for the FLIP > > > > > > Long-tail tasks caused by skewed data usually pose significant > > > challenges for users. It's great that Flink can mitigate such > > > issues automatically. > > > > > > Thanks, > > > Zhu > > > > > > Lei Yang <leya5...@gmail.com> 于2024年8月16日周五 11:18写道: > > > > > > > Hi devs, > > > > > > > > > > > > Junrui Lee, Xia Sun and I would like to initiate a discussion about > > > > FLIP-475: Support Adaptive Skewed Join Optimization [1]. > > > > > > > > > > > > In a Join query, when certain keys occur frequently, it can lead to > an > > > > uneven distribution of data across partitions. This may affect the > > > > execution performance of Flink jobs, as a single partition with > skewed > > > data > > > > can severely downgrade the performance of the entire job. To ensure > > data > > > is > > > > evenly distributed to downstream tasks, we can use the statistics of > > the > > > > input to split (and duplicate if needed) skewed and splittable > > partitions > > > > into balanced partitions at runtime. However, currently, Flink is > > unable > > > to > > > > accurately determine which partitions are skewed and eligible for > > > splitting > > > > at runtime, and it also lacks the capability to split data within the > > > same > > > > key group. > > > > > > > > > > > > To address this issue, we plan to introduce Adaptive Skewed Join > > > > Optimization capability. This will allow the Join operator to > > dynamically > > > > split partitions that are skewed and splittable based on the > statistics > > > of > > > > the input at runtime, reducing the long-tail problem caused by skewed > > > data. > > > > This FLIP is based on FLIP-469 [2] and also leverages capabilities > > > > introduced in FLIP-470 [3]. > > > > > > > > > > > > For more details, please refer to FLIP-475 [1]. We look forward to > your > > > > feedback. > > > > > > > > > > > > Best, > > > > > > > > > > > > Junrui Lee, Xia Sun and Lei Yang > > > > > > > > > > > > [1] > > > > > > > > * > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization > > > > >* > > > > > > > > [2] > > > > > > > > * > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > > >* > > > > > > > > [3] > > > > > > > > * > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join > > > > >* > > > > > > > > > >