viirya commented on code in PR #233:
URL: 
https://github.com/apache/arrow-datafusion-comet/pull/233#discussion_r1540221507


##########
spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala:
##########
@@ -208,6 +217,50 @@ object CometShuffleExchangeExec extends 
ShimCometShuffleExchangeExec {
     dependency
   }
 
+  /**
+   * This is copied from Spark 
`ShuffleExchangeExec.needToCopyObjectsBeforeShuffle`. The only
+   * difference is that we use `BosonShuffleManager` instead of 
`SortShuffleManager`.
+   */
+  private def needToCopyObjectsBeforeShuffle(partitioner: Partitioner): 
Boolean = {
+    // Note: even though we only use the partitioner's `numPartitions` field, 
we require it to be
+    // passed instead of directly passing the number of partitions in order to 
guard against
+    // corner-cases where a partitioner constructed with `numPartitions` 
partitions may output
+    // fewer partitions (like RangePartitioner, for example).
+    val conf = SparkEnv.get.conf
+    val shuffleManager = SparkEnv.get.shuffleManager
+    val sortBasedShuffleOn = shuffleManager.isInstanceOf[CometShuffleManager]
+    val bypassMergeThreshold = 
conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
+    val numParts = partitioner.numPartitions
+    if (sortBasedShuffleOn) {
+      if (numParts <= bypassMergeThreshold) {
+        // If we're using the original SortShuffleManager and the number of 
output partitions is
+        // sufficiently small, then Spark will fall back to the hash-based 
shuffle write path, which
+        // doesn't buffer deserialized records.
+        // Note that we'll have to remove this case if we fix SPARK-6026 and 
remove this bypass.
+        false
+      } else if (numParts <= 
SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
+        // SPARK-4550 and  SPARK-7081 extended sort-based shuffle to serialize 
individual records
+        // prior to sorting them. This optimization is only applied in cases 
where shuffle
+        // dependency does not specify an aggregator or ordering and the 
record serializer has
+        // certain properties and the number of partitions doesn't exceed the 
limitation. If this
+        // optimization is enabled, we can safely avoid the copy.
+        //
+        // Exchange never configures its ShuffledRDDs with aggregators or key 
orderings, and the
+        // serializer in Spark SQL always satisfy the properties, so we only 
need to check whether
+        // the number of partitions exceeds the limitation.
+        false
+      } else {
+        // This different to Spark `SortShuffleManager`.
+        // Comet doesn't use Spark `ExternalSorter` to buffer records in 
memory, so we don't need to
+        // copy.
+        false

Review Comment:
   Basically Comet shuttle doesn't run through the ways that require additional 
copying on row objects like Spark. So these branches are returning `false` at 
all. But I keep them to make it clear to explain the reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to