[
https://issues.apache.org/jira/browse/FLINK-37161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Junrui Lee closed FLINK-37161.
------------------------------
Resolution: Done
> Cross-team verification for "Adaptive skewed join optimization for batch jobs"
> ------------------------------------------------------------------------------
>
> Key: FLINK-37161
> URL: https://issues.apache.org/jira/browse/FLINK-37161
> Project: Flink
> Issue Type: Sub-task
> Reporter: Junrui Lee
> Assignee: xingbe
> Priority: Blocker
> Fix For: 2.0.0
>
>
> In Flink 2.0, we support the capability of adaptive skewed join optimization
> for batch jobs, which will allow the Join operator to dynamically split
> skewed and splittable partitions based on runtime input statistics, thereby
> mitigating the long-tail problem caused by skewed data.
> We may need the following tests:
> #
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is
> set to {{{}auto{}}}. We need to construct a simple join case with data skewed
> on a single key (e.g., making the data of a specified join key N times larger
> than other join keys, where N is defined by
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}). And ensuring
> the data volume for the skewed join key exceeds the skewed-threshold (defined
> by {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}).
> Finally, observe whether the ratio of the maximum data volume to the median
> data volume processed by concurrent join tasks is less than the skew factor.
> #
> Test the case where {{table.optimizer.skewed-join-optimization.strategy}} is
> set to {{{}forced{}}}. Construct a skewed join instance similar to Test 1,
> but with the following difference: the join case should be connected to a
> downstream operator that performs hashing on the same field (e.g., hash
> aggregation or group by). It is recommended to set different parallelisms for
> the join operator and the downstream operator to prevent the out edge from
> being optimized to a forward edge. Finally, observe whether the ratio of the
> maximum data volume to the median data volume processed by concurrent join
> tasks is less than the skew factor.
> #
> Test the case where
> {{{}table.optimizer.skewed-join-optimization.strategy{}}}as none, and verify
> that the join operator will not be optimized into an adaptive join operator
> under any circumstances.
> # Test the case with customized
> {{{}table.optimizer.skewed-join-optimization.skewed-factor{}}}. We need to
> construct a skewed join instance similar to Test 1, setting different skewed
> factors and observing whether the ratio of the maximum data volume to the
> median data volume processed by concurrent join tasks is less than the skew
> factor. Note that currently, Flink can only reduce the ratio to 2.0, and
> please ensure that the skewed-factor is greater than 2.0 during testing.
> # Test the case with customized
> {{{}table.optimizer.skewed-join-optimization.skewed-threshold{}}}. We need to
> construct a skewed join instance similar to Test 1, setting different
> skewed-threshold and observing whether the optimization is effective only
> when the data volume processed by the skewed join instance is greater than
> the skewed threshold.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)