[
https://issues.apache.org/jira/browse/SPARK-27853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-27853:
-----------------------------------
Labels: pull-request-available (was: )
> Allow for custom Partitioning implementations
> ---------------------------------------------
>
> Key: SPARK-27853
> URL: https://issues.apache.org/jira/browse/SPARK-27853
> Project: Spark
> Issue Type: Improvement
> Components: Optimizer, SQL
> Affects Versions: 3.1.0
> Reporter: Marc Arndt
> Priority: Major
> Labels: pull-request-available
>
> When partitioning a Dataset Spark uses the physical plan element
> ShuffleExchangeExec together with a Partitioning instance.
> I find myself in situation where I need to provide my own partitioning
> criteria, that decides to which partition each InternalRow should belong.
> According to the Spark API I would expect to be able to provide my custom
> partitioning criteria as a custom implementation of the Partitioning
> interface.
> Sadly after implementing a custom Partitioning implementation you will
> receive a "Exchange not implemented for $newPartitioning" error message,
> because of the following code inside the
> ShuffleExchangeExec#prepareShuffleDependency method:
> {code:scala}
> val part: Partitioner = newPartitioning match {
> case RoundRobinPartitioning(numPartitions) => new
> HashPartitioner(numPartitions)
> case HashPartitioning(_, n) =>
> new Partitioner {
> override def numPartitions: Int = n
> // For HashPartitioning, the partitioning key is already a valid
> partition ID, as we use
> // `HashPartitioning.partitionIdExpression` to produce partitioning
> key.
> override def getPartition(key: Any): Int = key.asInstanceOf[Int]
> }
> case RangePartitioning(sortingExpressions, numPartitions) =>
> // Internally, RangePartitioner runs a job on the RDD that samples keys
> to compute
> // partition bounds. To get accurate samples, we need to copy the mutable
> keys.
> val rddForSampling = rdd.mapPartitionsInternal { iter =>
> val mutablePair = new MutablePair[InternalRow, Null]()
> iter.map(row => mutablePair.update(row.copy(), null))
> }
> implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions,
> outputAttributes)
> new RangePartitioner(
> numPartitions,
> rddForSampling,
> ascending = true,
> samplePointsPerPartitionHint =
> SQLConf.get.rangeExchangeSampleSizePerPartition)
> case SinglePartition =>
> new Partitioner {
> override def numPartitions: Int = 1
> override def getPartition(key: Any): Int = 0
> }
> case _ => sys.error(s"Exchange not implemented for $newPartitioning")
> // TODO: Handle BroadcastPartitioning.
> }
> def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
> case RoundRobinPartitioning(numPartitions) =>
> // Distributes elements evenly across output partitions, starting from a
> random partition.
> var position = new
> Random(TaskContext.get().partitionId()).nextInt(numPartitions)
> (row: InternalRow) => {
> // The HashPartitioner will handle the `mod` by the number of
> partitions
> position += 1
> position
> }
> case h: HashPartitioning =>
> val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil,
> outputAttributes)
> row => projection(row).getInt(0)
> case RangePartitioning(_, _) | SinglePartition => identity
> case _ => sys.error(s"Exchange not implemented for $newPartitioning")
> }
> {code}
> The code in the above code snippet matches the given Partitioning instance
> "newPartitioning" against a set of given hardcoded Partitioning types. When
> adding a new Partitioning implementation the pattern matching won't be able
> to find a pattern for it and therefore will use the fallback case:
> {code:java}
> case _ => sys.error(s"Exchange not implemented for $newPartitioning")
> {code}
> and throw an exception.
> To be able to provide custom partition behaviour I would suggest to change
> the implementation in ShuffleExchangeExec to be able to work with an
> arbitrary Partitioning implementation. For the Partition creation I would
> imagine that this can be done in a nice way inside the Partitioning classes
> via a Partitioning#createPartitioner method.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]