[ 
https://issues.apache.org/jira/browse/SPARK-38124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-38124:
---------------------------------
    Description: 
SPARK-35703 removed HashClusteredDistribution and replaced its usages with 
ClusteredDistribution.

While this works great for non stateful operators, we still need to have a 
separate requirement of distribution for stateful operator, because the 
requirement of ClusteredDistribution is too relaxed while the requirement of 
physical partitioning on stateful operator is quite strict.

In most cases, stateful operators must require child distribution as 
HashClusteredDistribution, with below major assumptions:
 # HashClusteredDistribution creates HashPartitioning and we will never ever 
change it for the future.
 # We will never ever change the implementation of {{partitionIdExpression}} in 
HashPartitioning for the future, so that Partitioner will behave consistently 
across Spark versions.
 # No partitioning except HashPartitioning can satisfy 
HashClusteredDistribution.

 

We should revive HashClusteredDistribution (with probably renaming specifically 
with stateful operator) and apply the distribution to the all stateful 
operators.

SPARK-35703 only touched stream-stream join, which means stream-stream join 
hasn't been broken in actual releases. Let's aim the partial revert of 
SPARK-35703 in this ticket, and have another ticket to deal with other stateful 
operators, which have been broken for their introduction (2.2+).

  was:
SPARK-35703 removed HashClusteredDistribution and replaced its usages with 
ClusteredDistribution.

While this works great for non stateful operators, we still need to have a 
separate requirement of distribution for stateful operator, because the 
requirement of ClusteredDistribution is too relaxed while the requirement of 
physical partitioning on stateful operator is quite strict.

In most cases, stateful operators must require child distribution as 
HashClusteredDistribution, with below major assumptions:
 # HashClusteredDistribution creates HashPartitioning and we will never ever 
change it for the future.
 # We will never ever change the implementation of {{partitionIdExpression}} in 
HashPartitioning for the future, so that Partitioner will behave consistently 
across Spark versions.
 # No partitioning except HashPartitioning can satisfy 
HashClusteredDistribution.

 

We should revive HashClusteredDistribution (with probably renaming specifically 
with stateful operator) and apply the distribution to the all stateful 
operators.

SPARK-35703 only touched stream-stream join, which means other stateful 
operators already used ClusteredDistribution, hence have been broken for a long 
time.


> Revive HashClusteredDistribution and apply to stream-stream join
> ----------------------------------------------------------------
>
>                 Key: SPARK-38124
>                 URL: https://issues.apache.org/jira/browse/SPARK-38124
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Jungtaek Lim
>            Priority: Blocker
>              Labels: correctness
>
> SPARK-35703 removed HashClusteredDistribution and replaced its usages with 
> ClusteredDistribution.
> While this works great for non stateful operators, we still need to have a 
> separate requirement of distribution for stateful operator, because the 
> requirement of ClusteredDistribution is too relaxed while the requirement of 
> physical partitioning on stateful operator is quite strict.
> In most cases, stateful operators must require child distribution as 
> HashClusteredDistribution, with below major assumptions:
>  # HashClusteredDistribution creates HashPartitioning and we will never ever 
> change it for the future.
>  # We will never ever change the implementation of {{partitionIdExpression}} 
> in HashPartitioning for the future, so that Partitioner will behave 
> consistently across Spark versions.
>  # No partitioning except HashPartitioning can satisfy 
> HashClusteredDistribution.
>  
> We should revive HashClusteredDistribution (with probably renaming 
> specifically with stateful operator) and apply the distribution to the all 
> stateful operators.
> SPARK-35703 only touched stream-stream join, which means stream-stream join 
> hasn't been broken in actual releases. Let's aim the partial revert of 
> SPARK-35703 in this ticket, and have another ticket to deal with other 
> stateful operators, which have been broken for their introduction (2.2+).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to