This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9a7d5fc6b1 [GLUTEN-9034][VL] Add VeloxResizeBatchesExec for Shuffle
(#9035)
9a7d5fc6b1 is described below
commit 9a7d5fc6b13aa41b7a171b71512a037474b9125d
Author: WangGuangxin <[email protected]>
AuthorDate: Tue Apr 22 15:24:00 2025 +0800
[GLUTEN-9034][VL] Add VeloxResizeBatchesExec for Shuffle (#9035)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 15 +-----
.../org/apache/gluten/config/VeloxConfig.scala | 30 ++++++++++-
.../gluten/execution/VeloxResizeBatchesExec.scala | 1 +
...AppendBatchResizeForShuffleInputAndOutput.scala | 58 ++++++++++++++++++++++
.../gluten/execution/MiscOperatorSuite.scala | 44 +++++++++++++++-
6 files changed, 132 insertions(+), 18 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index a4e1eb24b8..fe785c6fea 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -89,6 +89,7 @@ object VeloxRuleApi {
c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf),
rewrites, offloads))
// Legacy: Post-transform rules.
+ injector.injectPostTransform(_ =>
AppendBatchResizeForShuffleInputAndOutput())
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
@@ -176,6 +177,7 @@ object VeloxRuleApi {
c => RasOffload.Rule(offload, validatorBuilder(c.glutenConf),
rewrites)))
// Gluten RAS: Post rules.
+ injector.injectPostTransform(_ =>
AppendBatchResizeForShuffleInputAndOutput())
injector.injectPostTransform(_ => RemoveTransitions)
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 44aa8e836f..f6c66585da 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -362,19 +362,6 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
}
}
- def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = {
- plan match {
- case shuffle: ColumnarShuffleExchangeExec
- if !shuffle.useSortBasedShuffle &&
- VeloxConfig.get.veloxResizeBatchesShuffleInput =>
- val range = VeloxConfig.get.veloxResizeBatchesShuffleInputRange
- val appendBatches =
- VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
- shuffle.withNewChildren(Seq(appendBatches))
- case _ => plan
- }
- }
-
val child = shuffle.child
val newShuffle = shuffle.outputPartitioning match {
@@ -437,7 +424,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
case _ =>
ColumnarShuffleExchangeExec(shuffle, child, null)
}
- maybeAddAppendBatchesExec(newShuffle)
+ newShuffle
}
/** Generate ShuffledHashJoinExecTransformer. */
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index b94da4eafd..389db080ff 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -34,16 +34,19 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf)
{
def veloxResizeBatchesShuffleInput: Boolean =
getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT)
+ def veloxResizeBatchesShuffleOutput: Boolean =
+ getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT)
+
case class ResizeRange(min: Int, max: Int) {
assert(max >= min)
assert(min > 0, "Min batch size should be larger than 0")
assert(max > 0, "Max batch size should be larger than 0")
}
- def veloxResizeBatchesShuffleInputRange: ResizeRange = {
+ def veloxResizeBatchesShuffleInputOutputRange: ResizeRange = {
val standardSize = getConf(COLUMNAR_MAX_BATCH_SIZE)
val defaultMinSize: Int = (0.25 * standardSize).toInt.max(1)
- val minSize = getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+ val minSize =
getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_OUTPUT_MIN_SIZE)
.getOrElse(defaultMinSize)
ResizeRange(minSize, Int.MaxValue)
}
@@ -294,6 +297,14 @@ object VeloxConfig {
.booleanConf
.createWithDefault(true)
+ val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT =
+
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput")
+ .internal()
+ .doc(s"If true, combine small columnar batches together right after
shuffle read. " +
+ s"The default minimum output batch size is equal to 0.25 *
${COLUMNAR_MAX_BATCH_SIZE.key}")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.internal()
@@ -306,6 +317,21 @@ object VeloxConfig {
.intConf
.createOptional
+ val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_OUTPUT_MIN_SIZE =
+
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize")
+ .internal()
+ .doc(
+ s"The minimum batch size for shuffle input and output. " +
+ s"If size of an input batch is " +
+ s"smaller than the value, it will be combined with other " +
+ s"batches before sending to shuffle. " +
+ s"The same applies for batches output by shuffle read. " +
+ s"Only functions when " +
+ s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT.key} or " +
+ s"${COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT.key} is set to
true. " +
+ s"Default value: 0.25 * <max batch size>")
+ .fallbackConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE)
+
val COLUMNAR_VELOX_ENABLE_USER_EXCEPTION_STACKTRACE =
buildConf("spark.gluten.sql.columnar.backend.velox.enableUserExceptionStacktrace")
.internal()
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index 9ed687d337..a8e0863639 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -89,6 +89,7 @@ case class VeloxResizeBatchesExec(
val out = Iterators
.wrap(appender.asScala)
+ .protectInvocationFlow()
.collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
.recyclePayload(_.close())
.recycleIterator {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
new file mode 100644
index 0000000000..e6ae9f1e54
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.config.VeloxConfig
+import org.apache.gluten.execution.VeloxResizeBatchesExec
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
ShuffleQueryStageExec}
+
+/**
+ * Try to append [[VeloxResizeBatchesExec]] for shuffle input and ouput to
make the batch sizes in
+ * good shape.
+ */
+case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan]
{
+ override def apply(plan: SparkPlan): SparkPlan = {
+ val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
+ plan.transformUp {
+ case shuffle: ColumnarShuffleExchangeExec
+ if !shuffle.useSortBasedShuffle &&
+ VeloxConfig.get.veloxResizeBatchesShuffleInput =>
+ val appendBatches =
+ VeloxResizeBatchesExec(shuffle.child, range.min, range.max)
+ shuffle.withNewChildren(Seq(appendBatches))
+ case a @ AQEShuffleReadExec(ShuffleQueryStageExec(_, _:
ColumnarShuffleExchangeExec, _), _)
+ if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ VeloxResizeBatchesExec(a, range.min, range.max)
+ // Since it's transformed in a bottom to up order, so we may first
encountered
+ // ShuffeQueryStageExec, which is transformed to
VeloxResizeBatchesExec(ShuffeQueryStageExec),
+ // then we see AQEShuffleReadExec
+ case a @ AQEShuffleReadExec(
+ VeloxResizeBatchesExec(
+ s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _),
+ _,
+ _),
+ _) if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
+ case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExec, _)
+ if VeloxConfig.get.veloxResizeBatchesShuffleOutput =>
+ VeloxResizeBatchesExec(s, range.min, range.max)
+ }
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 3e52ac8655..30ff88f343 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -16,14 +16,14 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.expression.VeloxDummyExpression
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEShuffleReadExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
@@ -2052,4 +2052,44 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
}
+
+ test("Check VeloxResizeBatches is added in ShuffleRead") {
+ Seq(true, false).foreach(
+ coalesceEnabled => {
+ withSQLConf(
+ VeloxConfig.COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT.key ->
"true",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "10",
+ SQLConf.COALESCE_PARTITIONS_ENABLED.key -> coalesceEnabled.toString
+ ) {
+ runQueryAndCompare(
+ "SELECT l_orderkey, count(1) from lineitem group by
l_orderkey".stripMargin) {
+ df =>
+ val executedPlan = getExecutedPlan(df)
+ if (coalesceEnabled) {
+ //
VeloxResizeBatches(AQEShuffleRead(ShuffleQueryStage(ColumnarShuffleExchange)))
+ assert(executedPlan.sliding(4).exists {
+ case Seq(
+ _: ColumnarShuffleExchangeExec,
+ _: ShuffleQueryStageExec,
+ _: AQEShuffleReadExec,
+ _: VeloxResizeBatchesExec
+ ) =>
+ true
+ case _ => false
+ })
+ } else {
+ //
VeloxResizeBatches(ShuffleQueryStage(ColumnarShuffleExchange))
+ assert(executedPlan.sliding(3).exists {
+ case Seq(
+ _: ColumnarShuffleExchangeExec,
+ _: ShuffleQueryStageExec,
+ _: VeloxResizeBatchesExec) =>
+ true
+ case _ => false
+ })
+ }
+ }
+ }
+ })
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]