This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b9f63f7ca [GLUTEN-8497][VL] Fix columnar batch type mismatch in table 
cache (#9230)
9b9f63f7ca is described below

commit 9b9f63f7cae4c78503e413217507666a259c0c86
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Apr 10 09:15:57 2025 +0100

    [GLUTEN-8497][VL] Fix columnar batch type mismatch in table cache (#9230)
---
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  6 ++-
 .../gluten/execution/VeloxColumnarCacheSuite.scala |  7 +--
 .../extension/columnar/MiscColumnarRules.scala     | 53 +++++++++++++++++++++-
 3 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 04dfbf2558..f14d08ed86 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.columnarbatch.VeloxBatch
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{PreventBatchTypeMismatchInTableCache,
 RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, 
RewriteSubqueryBroadcast}
 import org.apache.gluten.extension.columnar.enumerated.{RasOffload, RemoveSort}
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicTransform}
 import org.apache.gluten.extension.columnar.offload.{OffloadExchange, 
OffloadJoin, OffloadOthers}
@@ -115,6 +115,8 @@ object VeloxRuleApi {
 
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
+    injector.injectFinal(
+      c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatch)))
     injector.injectFinal(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
     injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
     injector.injectFinal(_ => RemoveFallbackTagRule())
@@ -193,6 +195,8 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
ColumnarCollapseTransformStages(c.glutenConf))
     injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
     injector.injectPostTransform(c => 
RemoveGlutenTableCacheColumnarToRow(c.session))
+    injector.injectPostTransform(
+      c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatch)))
     injector.injectPostTransform(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
     injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, 
c.session))
     injector.injectPostTransform(_ => RemoveFallbackTagRule())
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
index 8c7be883bb..f595420793 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxColumnarCacheSuite.scala
@@ -101,7 +101,7 @@ class VeloxColumnarCacheSuite extends 
VeloxWholeStageTransformerSuite with Adapt
     }
   }
 
-  // TODO: Fix this case. See 
https://github.com/apache/incubator-gluten/issues/8497.
+  // See issue https://github.com/apache/incubator-gluten/issues/8497.
   testWithSpecifiedSparkVersion("Input fallen back vanilla Spark columnar 
scan", Some("3.3")) {
     def withId(id: Int): Metadata =
       new MetadataBuilder().putLong("parquet.field.id", id).build()
@@ -127,10 +127,7 @@ class VeloxColumnarCacheSuite extends 
VeloxWholeStageTransformerSuite with Adapt
             .parquet(dir.getCanonicalPath)
           val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
           df.cache()
-          // FIXME: The following call will throw since 
ColumnarCachedBatchSerializer will be
-          //  confused by the input vanilla Parquet scan when its 
#convertColumnarBatchToCachedBatch
-          //  method is called.
-          assertThrows[Exception](df.collect())
+          assert(df.collect().length == 60175)
         }
     }
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index e11c613954..3a112d8ac3 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -17,13 +17,18 @@
 package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
Transitions}
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
Convention, Transitions}
+import org.apache.gluten.extension.columnar.transition.Convention.BatchType
 import org.apache.gluten.utils.PlanUtil
 
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
BroadcastQueryStageExec}
@@ -183,4 +188,48 @@ object MiscColumnarRules {
         }
     }
   }
+
+  // Because of the hard-coded C2R removal code in
+  // 
org.apache.spark.sql.execution.columnar.InMemoryRelation.convertToColumnarIfPossible
+  // from Spark, This rule can be used when we have to make sure the columnar 
query plan
+  // inside the C2R is recognizable by the user-specified columnar batch 
serializer.
+  case class PreventBatchTypeMismatchInTableCache(
+      isCalledByTableCachePlaning: Boolean,
+      allowedBatchTypes: Set[BatchType])
+    extends Rule[SparkPlan] {
+    import PreventColumnarTypeMismatchInTableCache._
+    override def apply(plan: SparkPlan): SparkPlan = {
+      if (!isCalledByTableCachePlaning) {
+        return plan
+      }
+      plan match {
+        case c2r @ ColumnarToRowLike(columnarPlan: SparkPlan)
+            if 
!allowedBatchTypes.contains(Convention.get(columnarPlan).batchType) =>
+          // If the output batch type of 'columnarPlan' is not allowed 
(usually because it's not
+          // supported by a user-specified columnar batch serializer),
+          // We add a transparent row-based unary node to prevent the C2R from 
being removed by
+          // Spark code in
+          // 
org.apache.spark.sql.execution.columnar.InMemoryRelation.convertToColumnarIfPossible.
+          ColumnarToRowRemovalGuard(c2r)
+        case other => other
+      }
+    }
+
+    private object PreventColumnarTypeMismatchInTableCache {
+      // Having this unary node on the top of the query plan would prevent the 
c2r from being
+      // removed by Spark code in
+      // 
org.apache.spark.sql.execution.columnar.InMemoryRelation.convertToColumnarIfPossible.
+      case class ColumnarToRowRemovalGuard(c2r: SparkPlan) extends 
UnaryExecNode {
+        override def supportsColumnar: Boolean = false
+        override protected def doExecute(): RDD[InternalRow] = c2r.execute()
+        override def doExecuteBroadcast[T](): Broadcast[T] = 
c2r.executeBroadcast()
+        override def output: Seq[Attribute] = c2r.output
+        override def outputPartitioning: Partitioning = c2r.outputPartitioning
+        override def outputOrdering: Seq[SortOrder] = c2r.outputOrdering
+        override def child: SparkPlan = c2r
+        override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+          copy(c2r = newChild)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to