gene-bordegaray opened a new issue, #19269:
URL: https://github.com/apache/datafusion/issues/19269

   ### Is your feature request related to a problem or challenge?
   
   When DF checks if our partitioning is satisfied or if a repartition is 
needed it only checks if the expressions the partitions are hashed on is equal 
to the required expressions: hash_required = hash_current.
   
   Hash partitioning is also satisfied if we are partitioned on a superset of 
the required hash expressions: hash_required "is subset of" hash_current. Thus, 
in certain situations it would be ideal if we did not insert a repartition if 
we have a superset of the required partitioning.
   
   This comes with the caveat of skewed data. Say we are hash via Hash(a) and 
Hash(a, b) is required. We implicitly satisfy Hash(a, b) via this superset 
property but we may still want to insert a repartition if our data is heavily 
skewed when repartitioned via Hash(a) as the repartitioning would help 
distribute this skew. Here is an example, say we are partitioned via Hash(a) 
and have 3 total cpus (target_partitions):
   
   ```text
   Partition 1:
   a    b
   A1   B1
   A1   B1
   A1   B1
   A1   B1
   A1   B2
   A1   B2
   A1   B2
   A1   B2
   A1   B3
   A1   B3
   A1   B3
   
   Partiton 2:
   A2   B1
   A2   B1
   A2   B1
   
   Partiton 3:
   A3   B1
   A3   B1
   A3   B1
   ```
   
   Then in our plan we hit a operator that requires partitioning via Hash(a, 
b). Although our existing Hash(a) implicitly satisfies the required Hash(a, b) 
since all occurrences of a (a, b) combination are contained within a single 
partition, we would get a much better data distribution if we repartitioned on 
Hash(a, b).
   
   ```text
   Partition 1:
   a    b
   A1   B1
   A1   B1
   A1   B1
   A1   B1
   
   Partiton 2:
   A1   B2
   A1   B2
   A1   B2
   A1   B2
   A2   B1
   A2   B1
   A2   B1
   
   Partiton 3:
   A3   B1
   A3   B1
   A3   B1
   A1   B3
   A1   B3
   A1   B3
   ```
   
   Thus, this needs to be considered in the implementation.
   
   ### Describe the solution you'd like
   
   There are two approaches I have thought of and would like to discuss.
   
   1. An option that can be a boolean that turns this behavior on or off. This 
is simple to implement and low risk. 
   2. We can read the amount of rows in parquet metadata to make a heuristic of 
when this behavior should turn on or not. Thus, if we are already distributed 
well we wont repartition, otherwise insert the repartition. This comes with the 
complexity of make a good heuristic and more code added.
   
   ### Describe alternatives you've considered
   
   I listed the two approaches above and am considering them equally. Would 
love more thoughts.
   
   ### Additional context
   
   The elimination of shuffles would have a huge impact for distributed 
datafusion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to