viirya commented on code in PR #233:
URL:
https://github.com/apache/arrow-datafusion-comet/pull/233#discussion_r1540221507
##########
spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala:
##########
@@ -208,6 +217,50 @@ object CometShuffleExchangeExec extends
ShimCometShuffleExchangeExec {
dependency
}
+ /**
+ * This is copied from Spark
`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle`. The only
+ * difference is that we use `BosonShuffleManager` instead of
`SortShuffleManager`.
+ */
+ private def needToCopyObjectsBeforeShuffle(partitioner: Partitioner):
Boolean = {
+ // Note: even though we only use the partitioner's `numPartitions` field,
we require it to be
+ // passed instead of directly passing the number of partitions in order to
guard against
+ // corner-cases where a partitioner constructed with `numPartitions`
partitions may output
+ // fewer partitions (like RangePartitioner, for example).
+ val conf = SparkEnv.get.conf
+ val shuffleManager = SparkEnv.get.shuffleManager
+ val sortBasedShuffleOn = shuffleManager.isInstanceOf[CometShuffleManager]
+ val bypassMergeThreshold =
conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
+ val numParts = partitioner.numPartitions
+ if (sortBasedShuffleOn) {
+ if (numParts <= bypassMergeThreshold) {
+ // If we're using the original SortShuffleManager and the number of
output partitions is
+ // sufficiently small, then Spark will fall back to the hash-based
shuffle write path, which
+ // doesn't buffer deserialized records.
+ // Note that we'll have to remove this case if we fix SPARK-6026 and
remove this bypass.
+ false
+ } else if (numParts <=
SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
+ // SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize
individual records
+ // prior to sorting them. This optimization is only applied in cases
where shuffle
+ // dependency does not specify an aggregator or ordering and the
record serializer has
+ // certain properties and the number of partitions doesn't exceed the
limitation. If this
+ // optimization is enabled, we can safely avoid the copy.
+ //
+ // Exchange never configures its ShuffledRDDs with aggregators or key
orderings, and the
+ // serializer in Spark SQL always satisfy the properties, so we only
need to check whether
+ // the number of partitions exceeds the limitation.
+ false
+ } else {
+ // This different to Spark `SortShuffleManager`.
+ // Comet doesn't use Spark `ExternalSorter` to buffer records in
memory, so we don't need to
+ // copy.
+ false
Review Comment:
Basically Comet shuffle doesn't run through the ways that require additional
copying on row objects like Spark. So these branches are returning `false` at
all. But I keep them to make it clear to explain the reason.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]