[ 
https://issues.apache.org/jira/browse/SPARK-27853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-27853:
-----------------------------------
    Labels: pull-request-available  (was: )

> Allow for custom Partitioning implementations
> ---------------------------------------------
>
>                 Key: SPARK-27853
>                 URL: https://issues.apache.org/jira/browse/SPARK-27853
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, SQL
>    Affects Versions: 3.1.0
>            Reporter: Marc Arndt
>            Priority: Major
>              Labels: pull-request-available
>
> When partitioning a Dataset Spark uses the physical plan element 
> ShuffleExchangeExec together with a Partitioning instance. 
> I find myself in situation where I need to provide my own partitioning 
> criteria, that decides to which partition each InternalRow should belong. 
> According to the Spark API I would expect to be able to provide my custom 
> partitioning criteria as a custom implementation of the Partitioning 
> interface.
> Sadly after implementing a custom Partitioning implementation you will 
> receive a "Exchange not implemented for $newPartitioning" error message, 
> because of the following code inside the 
> ShuffleExchangeExec#prepareShuffleDependency method:
> {code:scala}
> val part: Partitioner = newPartitioning match {
>     case RoundRobinPartitioning(numPartitions) => new 
> HashPartitioner(numPartitions)
>     case HashPartitioning(_, n) =>
>     new Partitioner {
>         override def numPartitions: Int = n
>         // For HashPartitioning, the partitioning key is already a valid 
> partition ID, as we use
>         // `HashPartitioning.partitionIdExpression` to produce partitioning 
> key.
>         override def getPartition(key: Any): Int = key.asInstanceOf[Int]
>     }
>     case RangePartitioning(sortingExpressions, numPartitions) =>
>     // Internally, RangePartitioner runs a job on the RDD that samples keys 
> to compute
>     // partition bounds. To get accurate samples, we need to copy the mutable 
> keys.
>     val rddForSampling = rdd.mapPartitionsInternal { iter =>
>         val mutablePair = new MutablePair[InternalRow, Null]()
>         iter.map(row => mutablePair.update(row.copy(), null))
>     }
>     implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, 
> outputAttributes)
>     new RangePartitioner(
>         numPartitions,
>         rddForSampling,
>         ascending = true,
>         samplePointsPerPartitionHint = 
> SQLConf.get.rangeExchangeSampleSizePerPartition)
>     case SinglePartition =>
>     new Partitioner {
>         override def numPartitions: Int = 1
>         override def getPartition(key: Any): Int = 0
>     }
>     case _ => sys.error(s"Exchange not implemented for $newPartitioning")
>     // TODO: Handle BroadcastPartitioning.
> }
> def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
>     case RoundRobinPartitioning(numPartitions) =>
>     // Distributes elements evenly across output partitions, starting from a 
> random partition.
>     var position = new 
> Random(TaskContext.get().partitionId()).nextInt(numPartitions)
>     (row: InternalRow) => {
>         // The HashPartitioner will handle the `mod` by the number of 
> partitions
>         position += 1
>         position
>     }
>     case h: HashPartitioning =>
>     val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, 
> outputAttributes)
>     row => projection(row).getInt(0)
>     case RangePartitioning(_, _) | SinglePartition => identity
>     case _ => sys.error(s"Exchange not implemented for $newPartitioning")
> }
> {code}
> The code in the above code snippet matches the given Partitioning instance 
> "newPartitioning" against a set of given hardcoded Partitioning types. When 
> adding a new Partitioning implementation the pattern matching won't be able 
> to find a pattern for it and therefore will use the fallback case:
> {code:java}
>     case _ => sys.error(s"Exchange not implemented for $newPartitioning")
> {code}
> and throw an exception.
> To be able to provide custom partition behaviour I would suggest to change 
> the implementation in ShuffleExchangeExec to be able to work with an 
> arbitrary Partitioning implementation. For the Partition creation I would 
> imagine that this can be done in a nice way inside the Partitioning classes 
> via a Partitioning#createPartitioner method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to