Marc Arndt created SPARK-27853:
----------------------------------
Summary: Allow for custom Partitioning implementations
Key: SPARK-27853
URL: https://issues.apache.org/jira/browse/SPARK-27853
Project: Spark
Issue Type: Improvement
Components: Optimizer, SQL
Affects Versions: 2.4.3
Reporter: Marc Arndt
When partitioning a Dataset Spark uses the physical plan element
ShuffleExchangeExec together with a Partitioning instance.
I find myself in situation where I need to provide my own partitioning
criteria, that decides to which partition each InternalRow should belong.
According to the Spark API I would expect to be able to provide my custom
partitioning criteria as a custom implementation of the Partitioning interface.
Sadly after implementing a custom Partitioning implementation you will receive
a "Exchange not implemented for $newPartitioning" error message, because of the
following code inside the ShuffleExchangeExec#prepareShuffleDependency method:
{code:scala}
val part: Partitioner = newPartitioning match {
case RoundRobinPartitioning(numPartitions) => new
HashPartitioner(numPartitions)
case HashPartitioning(_, n) =>
new Partitioner {
override def numPartitions: Int = n
// For HashPartitioning, the partitioning key is already a valid
partition ID, as we use
// `HashPartitioning.partitionIdExpression` to produce partitioning key.
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
case RangePartitioning(sortingExpressions, numPartitions) =>
// Internally, RangePartitioner runs a job on the RDD that samples keys to
compute
// partition bounds. To get accurate samples, we need to copy the mutable
keys.
val rddForSampling = rdd.mapPartitionsInternal { iter =>
val mutablePair = new MutablePair[InternalRow, Null]()
iter.map(row => mutablePair.update(row.copy(), null))
}
implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions,
outputAttributes)
new RangePartitioner(
numPartitions,
rddForSampling,
ascending = true,
samplePointsPerPartitionHint =
SQLConf.get.rangeExchangeSampleSizePerPartition)
case SinglePartition =>
new Partitioner {
override def numPartitions: Int = 1
override def getPartition(key: Any): Int = 0
}
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
// TODO: Handle BroadcastPartitioning.
}
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a
random partition.
var position = new
Random(TaskContext.get().partitionId()).nextInt(numPartitions)
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
position
}
case h: HashPartitioning =>
val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil,
outputAttributes)
row => projection(row).getInt(0)
case RangePartitioning(_, _) | SinglePartition => identity
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
}
{code}
The code in the above code snippet matches the given Partitioning instance
"newPartitioning" against a set of given hardcoded Partitioning types. When
adding a new Partitioning implementation the pattern matching won't be able to
find a pattern for it and therefore will use the fallback case:
{code:java}
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
{code}
and throw an exception.
To be able to provide custom partition behaviour I would suggest to change the
implementation in ShuffleExchangeExec to be able to work with an arbitrary
Partitioning implementation. For the Partition creation I would imagine that
this can be done in a nice way inside the Partitioning classes via a
Partitioning#createPartitioner method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]