marin-ma commented on code in PR #10991:
URL: 
https://github.com/apache/incubator-gluten/pull/10991#discussion_r2486018463


##########
backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala:
##########
@@ -30,30 +31,50 @@ import 
org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQuery
  */
 case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] 
{
   override def apply(plan: SparkPlan): SparkPlan = {
+    val resizeBatchesShuffleInputEnabled = 
VeloxConfig.get.veloxResizeBatchesShuffleInput
+    val resizeBatchesShuffleOutputEnabled = 
VeloxConfig.get.veloxResizeBatchesShuffleOutput
+    if (!resizeBatchesShuffleInputEnabled && 
!resizeBatchesShuffleOutputEnabled) {
+      return plan
+    }
+
     val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
     plan.transformUp {
-      case shuffle: ColumnarShuffleExchangeExec
-          if shuffle.shuffleWriterType == HashShuffleWriterType &&
-            VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+      case ColumnarResizeableShuffleExchangeExec(shuffle) if 
resizeBatchesShuffleInputEnabled =>

Review Comment:
   It seems this change assumes a specific shuffle writer type requires both 
shuffle input and output require resizing at the same time, but I don't think 
that's necessarily true. We may assume for a shuffle type it may need input 
resizing but not output resizing, and vice versa. (Although for the existing 
types they are the same).
   
   Maybe we can generalize this by adding these flags to the ShuffleWriterType 
trait:
   ```
   trait ShuffleWriterType {
     val name: String
     val requiresResizingShuffleInput: Boolean
     val requiresResizingShuffleOutput: Boolean
   }
   ```
   
   What do you think?



##########
backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala:
##########
@@ -30,30 +31,50 @@ import 
org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQuery
  */
 case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] 
{
   override def apply(plan: SparkPlan): SparkPlan = {
+    val resizeBatchesShuffleInputEnabled = 
VeloxConfig.get.veloxResizeBatchesShuffleInput
+    val resizeBatchesShuffleOutputEnabled = 
VeloxConfig.get.veloxResizeBatchesShuffleOutput
+    if (!resizeBatchesShuffleInputEnabled && 
!resizeBatchesShuffleOutputEnabled) {
+      return plan
+    }
+
     val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
     plan.transformUp {
-      case shuffle: ColumnarShuffleExchangeExec
-          if shuffle.shuffleWriterType == HashShuffleWriterType &&
-            VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+      case ColumnarResizeableShuffleExchangeExec(shuffle) if 
resizeBatchesShuffleInputEnabled =>
         val appendBatches =
           VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
         shuffle.withNewChildren(Seq(appendBatches))
-      case a @ AQEShuffleReadExec(ShuffleQueryStageExec(_, _: 
ColumnarShuffleExchangeExec, _), _)
-          if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+      case a @ AQEShuffleReadExec(
+            ShuffleQueryStageExec(_, ColumnarResizeableShuffleExchangeExec(_), 
_),
+            _) if resizeBatchesShuffleOutputEnabled =>
         VeloxResizeBatchesExec(a, range.min, range.max)
       // Since it's transformed in a bottom to up order, so we may first 
encountered
       // ShuffeQueryStageExec, which is transformed to 
VeloxResizeBatchesExec(ShuffeQueryStageExec),
       // then we see AQEShuffleReadExec
       case a @ AQEShuffleReadExec(
             VeloxResizeBatchesExec(
-              s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _),
+              s @ ShuffleQueryStageExec(_, 
ColumnarResizeableShuffleExchangeExec(_), _),
               _,
               _),
-            _) if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+            _) if resizeBatchesShuffleOutputEnabled =>
         VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
-      case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _)
-          if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+      case s @ ShuffleQueryStageExec(_, 
ColumnarResizeableShuffleExchangeExec(_), _)
+          if resizeBatchesShuffleOutputEnabled =>
         VeloxResizeBatchesExec(s, range.min, range.max)
     }
   }
+
+  private object ColumnarResizeableShuffleExchangeExec {
+    def unapply(plan: SparkPlan): Option[ColumnarShuffleExchangeExec] = {
+      plan match {
+        // sort-based/rss-sort shuffle has already resized the batch size in 
shuffle read,
+        // so no need to resize again
+        case shuffle: ColumnarShuffleExchangeExec

Review Comment:
   nit: Perhaps a more readable way for this logic
   
   ```
   case shuffle: ColumnarShuffleExchangeExec
               if shuffle.shuffleWriterType != SortShuffleWriterType
                 && shuffle.shuffleWriterType != RssSortShuffleWriterType => 
Some(shuffle)
   case _ => None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to