This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9b9f63f7ca [GLUTEN-8497][VL] Fix columnar batch type mismatch in table
cache (#9230)
9b9f63f7ca is described below
commit 9b9f63f7cae4c78503e413217507666a259c0c86
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Apr 10 09:15:57 2025 +0100
[GLUTEN-8497][VL] Fix columnar batch type mismatch in table cache (#9230)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 6 ++-
.../gluten/execution/VeloxColumnarCacheSuite.scala | 7 +--
.../extension/columnar/MiscColumnarRules.scala | 53 +++++++++++++++++++++-
3 files changed, 58 insertions(+), 8 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 04dfbf2558..f14d08ed86 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.{PreventBatchTypeMismatchInTableCache,
RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow,
RewriteSubqueryBroadcast}
import org.apache.gluten.extension.columnar.enumerated.{RasOffload, RemoveSort}
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicTransform}
import org.apache.gluten.extension.columnar.offload.{OffloadExchange,
OffloadJoin, OffloadOthers}
@@ -115,6 +115,8 @@ object VeloxRuleApi {
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
+ injector.injectFinal(
+ c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatch)))
injector.injectFinal(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
@@ -193,6 +195,8 @@ object VeloxRuleApi {
injector.injectPostTransform(c =>
ColumnarCollapseTransformStages(c.glutenConf))
injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
injector.injectPostTransform(c =>
RemoveGlutenTableCacheColumnarToRow(c.session))
+ injector.injectPostTransform(
+ c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatch)))
injector.injectPostTransform(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf,
c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
index 8c7be883bb..f595420793 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
@@ -101,7 +101,7 @@ class VeloxColumnarCacheSuite extends
VeloxWholeStageTransformerSuite with Adapt
}
}
- // TODO: Fix this case. See
https://github.com/apache/incubator-gluten/issues/8497.
+ // See issue https://github.com/apache/incubator-gluten/issues/8497.
testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar
scan", Some("3.3")) {
def withId(id: Int): Metadata =
new MetadataBuilder().putLong("parquet.field.id", id).build()
@@ -127,10 +127,7 @@ class VeloxColumnarCacheSuite extends
VeloxWholeStageTransformerSuite with Adapt
.parquet(dir.getCanonicalPath)
val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
df.cache()
- // FIXME: The following call will throw since
ColumnarCachedBatchSerializer will be
- // confused by the input vanilla Parquet scan when its
#convertColumnarBatchToCachedBatch
- // method is called.
- assertThrows[Exception](df.collect())
+ assert(df.collect().length == 60175)
}
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index e11c613954..3a112d8ac3 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -17,13 +17,18 @@
package org.apache.gluten.extension.columnar
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
Transitions}
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
Convention, Transitions}
+import org.apache.gluten.extension.columnar.transition.Convention.BatchType
import org.apache.gluten.utils.PlanUtil
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
SortOrder}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
BroadcastQueryStageExec}
@@ -183,4 +188,48 @@ object MiscColumnarRules {
}
}
}
+
+ // Because of the hard-coded C2R removal code in
+ //
org.apache.spark.sql.execution.columnar.InMemoryRelation.convertToColumnarIfPossible
+ // from Spark, This rule can be used when we have to make sure the columnar
query plan
+ // inside the C2R is recognizable by the user-specified columnar batch
serializer.
+ case class PreventBatchTypeMismatchInTableCache(
+ isCalledByTableCachePlaning: Boolean,
+ allowedBatchTypes: Set[BatchType])
+ extends Rule[SparkPlan] {
+ import PreventColumnarTypeMismatchInTableCache._
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!isCalledByTableCachePlaning) {
+ return plan
+ }
+ plan match {
+ case c2r @ ColumnarToRowLike(columnarPlan: SparkPlan)
+ if
!allowedBatchTypes.contains(Convention.get(columnarPlan).batchType) =>
+ // If the output batch type of 'columnarPlan' is not allowed
(usually because it's not
+ // supported by a user-specified columnar batch serializer),
+ // We add a transparent row-based unary node to prevent the C2R from
being removed by
+ // Spark code in
+ //
org.apache.spark.sql.execution.columnar.InMemoryRelation.convertToColumnarIfPossible.
+ ColumnarToRowRemovalGuard(c2r)
+ case other => other
+ }
+ }
+
+ private object PreventColumnarTypeMismatchInTableCache {
+ // Having this unary node on the top of the query plan would prevent the
c2r from being
+ // removed by Spark code in
+ //
org.apache.spark.sql.execution.columnar.InMemoryRelation.convertToColumnarIfPossible.
+ case class ColumnarToRowRemovalGuard(c2r: SparkPlan) extends
UnaryExecNode {
+ override def supportsColumnar: Boolean = false
+ override protected def doExecute(): RDD[InternalRow] = c2r.execute()
+ override def doExecuteBroadcast[T](): Broadcast[T] =
c2r.executeBroadcast()
+ override def output: Seq[Attribute] = c2r.output
+ override def outputPartitioning: Partitioning = c2r.outputPartitioning
+ override def outputOrdering: Seq[SortOrder] = c2r.outputOrdering
+ override def child: SparkPlan = c2r
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(c2r = newChild)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]