Hi Lincoln Lee, Thanks for your feedback!
For the 2nd question, I have added the description for the configuration items to guide users on when and how to adjust them. Typically, users can refer to the "Read Bytes" metric of the subtasks to adjust the "skewed-factor" and "skewed-threshold". I've also added some suggested values for typical scenarios. For the 3rd question, I agree with your suggestion. In this optimization, we are focusing on the data corresponding to a specific join key rather than KeyGroup. Therefore, I followed your advice and modified my Flip, replacing the KeyGroup-related concepts with your proposed description to avoid confusion for users. Best, Lei Yang Lincoln Lee <lincoln.8...@gmail.com> 于2024年9月1日周日 23:12写道: > Thank you Yang for your reply! > > The updated optoins in the FLIP addressed my 1st question. > > And for the 2nd question, I understand the mechanism of your > implementation, but as a public option, it's important for the user > to have a specific metric to base the value of the option on when > they need to adjust it. Or to put it another way, under what > circumstances do users need to adjust the value of these options, > and to what value? > > For the 3rd question, I now understand that the "key group" you > mentioned actually refers to all the data corresponding to a specific > join key, rather than the concept of key groups related to Flink state. > I'm not sure if others might have this misunderstanding as well, but > using a different description to distinguish it from the well-known key > group concept in Flink streaming can help avoid such confusion. WDYT? > > > Best, > Lincoln Lee > > > Lei Yang <leya5...@gmail.com> 于2024年8月30日周五 14:37写道: > > > Hi Lincoln Lee, > > Thanks for your feedback! > > For the 1st question, thank you for the reminder. This optimization is > only > > available for Table jobs in batch mode, and I have put these new options > > into table module. I also replaced the "enable" and "force" > configurations > > with a new enum type configuration to avoid confusing. The new enum type > > configuration has three values: "AUTO" means Flink will automatically > apply > > this optimization, "FORCED" means Flink will enforce this optimization > even > > if it introduces extra hash shuffle, and "NONE" means this optimization > > will not be executed. > > For 2nd question, the key group size (or median size) is calculated based > > on statistical information from the upstream output and is used to > > determine data skewness, so users do not need to know the specific > values. > > I specifically mentioned "the median key group size" just because I chose > > to use it to represent the central tendency of data processing volumes > > across all parallel instances. > > For 3rd question, after my confirmation, this limitation also exists in > > batch mode. Therefore, IntraInputKeyGroupCorrelation and > > InterInputsKeyGroupCorrelation are necessary. I need them to determine > > whether and how to split the skewed key group to ensure data correctness. > > Additionally, adding these two correlations has other benefits: other > > optimization strategies can also modify them to flexibly choose the data > > distribution algorithm based on the operator’s specific situation. > > > > Best, Lei Yang > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2024年8月29日周四 23:13写道: > > > > > > Lincoln Lee <lincoln.8...@gmail.com> 于2024年8月29日周四 23:13写道: > > > > > Thanks for bringing up this! It would be a useful feature for batch > > users. > > > > > > For the FLIP, I have some questions: > > > > > > 1st, the implementation plan is to rewrite the optimization based on > the > > > execnode of the table planner, but the config option for the > optimization > > > is under flink-core module, does it mean this optimization is available > > for > > > datastream jobs as well? (I didn't see the details in the FLIP) > > > If doesn't, my suggestion is to put these new options into table > module. > > > > > > 2nd, the FLIP performs parameter control and optimization based on the > > > size of the key group, how can users perceive the specific key group > size > > > (or the median key group size) from the job information provided by > > flink? > > > > > > 3rd, IIUC, the following limitation in the FLIP exists only for > streaming > > > executions. So, is the new IntraInputKeyGroupCorrelation / > > > InterInputsKeyGroupCorrelation mentioned in FLIP still necessary? > > > > “The existing data distribution algorithms in Flink impose strict > > > limitations on joins, requiring that data within the same key group > > > must be sent to the same downstream for processing. This restricts > > > the adaptability of data distribution.” > > > > > > > > > Best, > > > Lincoln Lee > > > > > > > > > Zhu Zhu <reed...@gmail.com> 于2024年8月19日周一 16:50写道: > > > > > > > +1 for the FLIP > > > > > > > > Long-tail tasks caused by skewed data usually pose significant > > > > challenges for users. It's great that Flink can mitigate such > > > > issues automatically. > > > > > > > > Thanks, > > > > Zhu > > > > > > > > Lei Yang <leya5...@gmail.com> 于2024年8月16日周五 11:18写道: > > > > > > > > > Hi devs, > > > > > > > > > > > > > > > Junrui Lee, Xia Sun and I would like to initiate a discussion about > > > > > FLIP-475: Support Adaptive Skewed Join Optimization [1]. > > > > > > > > > > > > > > > In a Join query, when certain keys occur frequently, it can lead to > > an > > > > > uneven distribution of data across partitions. This may affect the > > > > > execution performance of Flink jobs, as a single partition with > > skewed > > > > data > > > > > can severely downgrade the performance of the entire job. To ensure > > > data > > > > is > > > > > evenly distributed to downstream tasks, we can use the statistics > of > > > the > > > > > input to split (and duplicate if needed) skewed and splittable > > > partitions > > > > > into balanced partitions at runtime. However, currently, Flink is > > > unable > > > > to > > > > > accurately determine which partitions are skewed and eligible for > > > > splitting > > > > > at runtime, and it also lacks the capability to split data within > the > > > > same > > > > > key group. > > > > > > > > > > > > > > > To address this issue, we plan to introduce Adaptive Skewed Join > > > > > Optimization capability. This will allow the Join operator to > > > dynamically > > > > > split partitions that are skewed and splittable based on the > > statistics > > > > of > > > > > the input at runtime, reducing the long-tail problem caused by > skewed > > > > data. > > > > > This FLIP is based on FLIP-469 [2] and also leverages capabilities > > > > > introduced in FLIP-470 [3]. > > > > > > > > > > > > > > > For more details, please refer to FLIP-475 [1]. We look forward to > > your > > > > > feedback. > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > Junrui Lee, Xia Sun and Lei Yang > > > > > > > > > > > > > > > [1] > > > > > > > > > > * > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization > > > > > < > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-475%3A+Support+Adaptive+Skewed+Join+Optimization > > > > > >* > > > > > > > > > > [2] > > > > > > > > > > * > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > > > < > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > > > >* > > > > > > > > > > [3] > > > > > > > > > > * > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join > > > > > < > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join > > > > > >* > > > > > > > > > > > > > > >