This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e5b4b4ce08 [GLUTEN-6920][CORE] Move API `Backend#defaultBatchType`
down to `BackendSettingsApi` in module gluten-substrait (#8016)
e5b4b4ce08 is described below
commit e5b4b4ce081f99090ac532b3d16ea31b2de12aac
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Nov 25 14:29:54 2024 +0800
[GLUTEN-6920][CORE] Move API `Backend#defaultBatchType` down to
`BackendSettingsApi` in module gluten-substrait (#8016)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 17 ++++++-
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 3 +-
.../gluten/backendsapi/velox/VeloxBackend.scala | 10 +++--
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 5 ++-
.../execution/ColumnarPartialProjectExec.scala | 2 -
.../gluten/execution/VeloxResizeBatchesExec.scala | 1 -
.../api/python/ColumnarArrowEvalPythonExec.scala | 3 +-
.../sql/execution/ArrowFileSourceScanExec.scala | 2 -
.../spark/sql/execution/BaseArrowScanExec.scala | 3 +-
.../columnar/transition/VeloxTransitionSuite.scala | 46 +++++++++----------
.../scala/org/apache/gluten/backend/Backend.scala | 5 +--
.../gluten/execution/ColumnarToColumnarExec.scala | 21 ++++++---
.../gluten/extension/GlutenColumnarRule.scala | 9 ++--
.../columnar/enumerated/EnumeratedTransform.scala | 4 +-
.../enumerated/planner/cost/LongCostModel.scala | 9 ++--
.../enumerated/planner/plan/GlutenPlanModel.scala | 19 ++++----
.../enumerated/planner/property/Conv.scala | 12 +++++
.../extension/columnar/transition/Convention.scala | 52 +++++++++++++++++++++-
.../columnar/transition/ConventionFunc.scala | 52 ++++++++++++++--------
.../columnar/transition/ConventionReq.scala | 13 +++---
.../extension/columnar/transition/Transition.scala | 19 +++++---
.../columnar/transition/Transitions.scala | 40 ++++++++---------
.../gluten/extension/injector/GlutenInjector.scala | 4 +-
.../org/apache/gluten/iterator/Iterators.scala | 2 +-
.../org/apache/spark/util/SparkVersionUtil.scala | 18 ++++----
.../gluten/backendsapi/BackendSettingsApi.scala | 5 +++
.../BasicPhysicalOperatorTransformer.scala | 2 -
.../CartesianProductExecTransformer.scala | 1 -
.../gluten/execution/ColumnarCoalesceExec.scala | 2 -
.../gluten/execution/ColumnarToRowExecBase.scala | 11 +++--
.../gluten/execution/RowToColumnarExecBase.scala | 5 ++-
.../TakeOrderedAndProjectExecTransformer.scala | 1 -
.../gluten/execution/WholeStageTransformer.scala | 2 -
.../org/apache/gluten/extension/GlutenPlan.scala | 30 ++++++++-----
.../extension/columnar/MiscColumnarRules.scala | 4 +-
.../columnar/heuristic/ExpandFallbackPolicy.scala | 7 ++-
.../columnar/transition/BackendTransitions.scala | 22 ++++++---
.../execution/ColumnarBroadcastExchangeExec.scala | 2 -
.../ColumnarCollapseTransformStages.scala | 5 +--
.../execution/ColumnarShuffleExchangeExec.scala | 1 -
.../sql/execution/ColumnarWriteFilesExec.scala | 14 +++---
.../datasources/GlutenWriterColumnarRules.scala | 6 ++-
.../columnar/transition/TransitionSuite.scala | 15 +++----
.../columnar/transition/TransitionSuiteBase.scala | 11 ++---
.../sql/execution/FallbackStrategiesSuite.scala | 20 ++++-----
.../sql/execution/FallbackStrategiesSuite.scala | 20 ++++-----
.../sql/execution/FallbackStrategiesSuite.scala | 20 ++++-----
.../benchmarks/ParquetReadBenchmark.scala | 3 +-
.../sql/execution/FallbackStrategiesSuite.scala | 20 ++++-----
.../sql/execution/AbstractFileSourceScanExec.scala | 7 ++-
.../sql/execution/AbstractFileSourceScanExec.scala | 7 ++-
.../sql/execution/AbstractFileSourceScanExec.scala | 12 ++---
.../sql/execution/AbstractFileSourceScanExec.scala | 12 ++---
53 files changed, 360 insertions(+), 278 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 061ec9856e..f6cacba42b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionFunc}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
@@ -34,6 +34,8 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -45,10 +47,11 @@ import java.util.Locale
import scala.util.control.Breaks.{break, breakable}
class CHBackend extends SubstraitBackend {
+ import CHBackend._
override def name(): String = CHConf.BACKEND_NAME
- override def defaultBatchType: Convention.BatchType = CHBatch
override def buildInfo(): Backend.BuildInfo =
Backend.BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
+ override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
override def iteratorApi(): IteratorApi = new CHIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new CHSparkPlanExecApi
override def transformerApi(): TransformerApi = new CHTransformerApi
@@ -59,7 +62,17 @@ class CHBackend extends SubstraitBackend {
override def settings(): BackendSettingsApi = CHBackendSettings
}
+object CHBackend {
+ private class ConvFunc() extends ConventionFunc.Override {
+ override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType]
= {
+ case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
+ CHBatch
+ }
+ }
+}
+
object CHBackendSettings extends BackendSettingsApi with Logging {
+ override def primaryBatchType: Convention.BatchType = CHBatch
private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD =
"spark.gluten.sql.columnar.separate.scan.rdd.for.ch"
private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD_DEFAULT = "false"
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 081e815588..f6d2b85d9d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
+import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
@@ -114,7 +115,7 @@ object CHRuleApi {
intercept(
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
c.session)))
- injector.injectPostTransform(c => InsertTransitions(c.outputsColumnar))
+ injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, CHBatch))
// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 03d5aa2549..e05fd92e32 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -37,6 +37,7 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CharVarcharUtils}
import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer,
SparkPlan}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
@@ -52,10 +53,9 @@ import scala.util.control.Breaks.breakable
class VeloxBackend extends SubstraitBackend {
import VeloxBackend._
override def name(): String = VeloxBackend.BACKEND_NAME
- override def defaultBatchType: Convention.BatchType = VeloxBatch
- override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
override def buildInfo(): Backend.BuildInfo =
Backend.BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION,
VELOX_REVISION_TIME)
+ override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
override def iteratorApi(): IteratorApi = new VeloxIteratorApi
override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi
override def transformerApi(): TransformerApi = new VeloxTransformerApi
@@ -72,6 +72,8 @@ object VeloxBackend {
private class ConvFunc() extends ConventionFunc.Override {
override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType]
= {
+ case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
+ VeloxBatch
case i: InMemoryTableScanExec
if i.supportsColumnar && i.relation.cacheBuilder.serializer
.isInstanceOf[ColumnarCachedBatchSerializer] =>
@@ -81,13 +83,15 @@ object VeloxBackend {
}
object VeloxBackendSettings extends BackendSettingsApi {
-
val SHUFFLE_SUPPORTED_CODEC = Set("lz4", "zstd")
val GLUTEN_VELOX_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX +
".udfLibraryPaths"
val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX +
".driver.udfLibraryPaths"
val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX +
".internal.udfLibraryPaths"
val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX +
".udfAllowTypeConversion"
+ /** The columnar-batch type this backend is by default using. */
+ override def primaryBatchType: Convention.BatchType = VeloxBatch
+
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index c5f46dae67..0a62a41baa 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
+import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
@@ -101,7 +102,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => EliminateLocalSort)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
- injector.injectPostTransform(c => InsertTransitions(c.outputsColumnar))
+ injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
@@ -187,7 +188,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => EliminateLocalSort)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
- injector.injectPostTransform(c => InsertTransitions(c.outputsColumnar))
+ injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
injector.injectPostTransform(
c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index d993e399db..84de41daa0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -82,8 +82,6 @@ case class ColumnarPartialProjectExec(original: ProjectExec,
child: SparkPlan)(
replacedAliasUdf :: Nil
}
- final override val supportsColumnar: Boolean = true
-
private def validateExpression(expr: Expression): Boolean = {
expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children
.forall(validateExpression)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index 995582024b..5283ab61e3 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -53,7 +53,6 @@ case class VeloxResizeBatchesExec(
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append
/ split batches")
)
- override def supportsColumnar: Boolean = true
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index f1f5eb9062..a2eee53660 100644
---
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -214,9 +214,8 @@ case class ColumnarArrowEvalPythonExec(
extends EvalPythonExec
with GlutenPlan
with KnownChildrenConventions {
- override def supportsColumnar: Boolean = true
- override protected def batchType0(): Convention.BatchType = ArrowJavaBatch
+ override def batchType(): Convention.BatchType = ArrowJavaBatch
override def requiredChildrenConventions(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(ArrowJavaBatch)))
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index 85bc682234..16b8fb0e9f 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -33,8 +33,6 @@ case class ArrowFileSourceScanExec(original:
FileSourceScanExec)
override def output: Seq[Attribute] = original.output
- override def supportsColumnar: Boolean = original.supportsColumnar
-
override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
index 38a6d1803d..6617e8b138 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -21,8 +21,7 @@ import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention
trait BaseArrowScanExec extends GlutenPlan {
-
- final override protected def batchType0(): Convention.BatchType = {
+ final override def batchType(): Convention.BatchType = {
ArrowBatches.ArrowJavaBatch
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
index d12faae0f7..e14ffd43d8 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
@@ -33,37 +33,37 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("Vanilla C2R - outputs row") {
val in = BatchLeaf(VanillaBatch)
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == ColumnarToRowExec(BatchLeaf(VanillaBatch)))
}
test("Vanilla C2R - requires row input") {
val in = RowUnary(BatchLeaf(VanillaBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(VanillaBatch))))
}
test("Vanilla R2C - requires vanilla input") {
val in = BatchUnary(VanillaBatch, RowLeaf())
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch,
RowToColumnarExec(RowLeaf()))))
}
test("ArrowNative C2R - outputs row") {
val in = BatchLeaf(ArrowNativeBatch)
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out ==
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
}
test("ArrowNative C2R - requires row input") {
val in = RowUnary(BatchLeaf(ArrowNativeBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out ==
RowUnary(ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
}
test("ArrowNative R2C - requires Arrow input") {
val in = BatchUnary(ArrowNativeBatch, RowLeaf())
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(ArrowNativeBatch,
RowToVeloxColumnarExec(RowLeaf())))))
@@ -71,7 +71,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("ArrowNative-to-Velox C2C") {
val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
// No explicit transition needed for ArrowNative-to-Velox.
// FIXME: Add explicit transitions.
// See https://github.com/apache/incubator-gluten/issues/7313.
@@ -82,7 +82,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("Velox-to-ArrowNative C2C") {
val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(ArrowNativeBatch,
BatchLeaf(VeloxBatch)))))
@@ -90,7 +90,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("Vanilla-to-ArrowNative C2C") {
val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(
@@ -100,7 +100,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("ArrowNative-to-Vanilla C2C") {
val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowNativeBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
BatchUnary(VanillaBatch,
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
@@ -108,19 +108,19 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("ArrowJava C2R - outputs row") {
val in = BatchLeaf(ArrowJavaBatch)
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatch)))
}
test("ArrowJava C2R - requires row input") {
val in = RowUnary(BatchLeaf(ArrowJavaBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(ArrowJavaBatch))))
}
test("ArrowJava R2C - requires Arrow input") {
val in = BatchUnary(ArrowJavaBatch, RowLeaf())
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
BatchUnary(ArrowJavaBatch,
LoadArrowDataExec(RowToVeloxColumnarExec(RowLeaf())))))
@@ -128,7 +128,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("ArrowJava-to-Velox C2C") {
val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowJavaBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(
@@ -138,7 +138,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("Velox-to-ArrowJava C2C") {
val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VeloxBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
BatchUnary(ArrowJavaBatch, LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
@@ -146,7 +146,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("Vanilla-to-ArrowJava C2C") {
val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VanillaBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
BatchUnary(
@@ -156,37 +156,37 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("ArrowJava-to-Vanilla C2C") {
val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowJavaBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch,
BatchLeaf(ArrowJavaBatch))))
}
test("Velox C2R - outputs row") {
val in = BatchLeaf(VeloxBatch)
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
}
test("Velox C2R - requires row input") {
val in = RowUnary(BatchLeaf(VeloxBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
}
test("Velox R2C - outputs Velox") {
val in = RowLeaf()
- val out = Transitions.insertTransitions(in, outputsColumnar = true)
+ val out = BackendTransitions.insert(in, outputsColumnar = true)
assert(out == RowToVeloxColumnarExec(RowLeaf()))
}
test("Velox R2C - requires Velox input") {
val in = BatchUnary(VeloxBatch, RowLeaf())
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(RowLeaf()))))
}
test("Vanilla-to-Velox C2C") {
val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
@@ -194,7 +194,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("Velox-to-Vanilla C2C") {
val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
- val out = Transitions.insertTransitions(in, outputsColumnar = false)
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(BatchUnary(VanillaBatch,
LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
index 346181e140..f406a6ac4d 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.backend
-import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionFunc}
+import org.apache.gluten.extension.columnar.transition.ConventionFunc
import org.apache.gluten.extension.injector.Injector
import org.apache.spark.SparkContext
@@ -39,9 +39,6 @@ trait Backend {
def onExecutorStart(pc: PluginContext): Unit = {}
def onExecutorShutdown(): Unit = {}
- /** The columnar-batch type this backend is by default using. */
- def defaultBatchType: Convention.BatchType
-
/**
* Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
* determine the convention (its row-based processing / columnar-batch
processing support) of a
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index d0eacc1e4d..7ca4b36b06 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -17,8 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
-import
org.apache.gluten.extension.columnar.transition.Convention.KnownBatchType
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.gluten.iterator.Iterators
import org.apache.spark.rdd.RDD
@@ -32,8 +30,9 @@ import java.util.concurrent.atomic.AtomicLong
abstract class ColumnarToColumnarExec(from: Convention.BatchType, to:
Convention.BatchType)
extends ColumnarToColumnarTransition
- with KnownBatchType
- with KnownChildrenConventions {
+ with Convention.KnownBatchType
+ with Convention.KnownRowTypeForSpark33AndLater
+ with ConventionReq.KnownChildrenConventions {
def child: SparkPlan
protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch]
@@ -47,8 +46,20 @@ abstract class ColumnarToColumnarExec(from:
Convention.BatchType, to: Convention
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
convert batches")
)
- override def supportsColumnar: Boolean = true
+ final override val supportsColumnar: Boolean = {
+ batchType() != Convention.BatchType.None
+ }
+
override def batchType(): Convention.BatchType = to
+
+ final override val supportsRowBased: Boolean = {
+ rowType() != Convention.RowType.None
+ }
+
+ override def rowType0(): Convention.RowType = {
+ Convention.RowType.None
+ }
+
override def requiredChildrenConventions(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(from)))
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
index 3378344253..5b440302a0 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
@@ -31,11 +31,9 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.vectorized.ColumnarBatch
object GlutenColumnarRule {
-
// Utilities to infer columnar rule's caller's property:
// ApplyColumnarRulesAndInsertTransitions#outputsColumnar.
-
- case class DummyRowOutputExec(override val child: SparkPlan) extends
UnaryExecNode {
+ private case class DummyRowOutputExec(override val child: SparkPlan) extends
UnaryExecNode {
override def supportsColumnar: Boolean = false
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
@@ -47,7 +45,7 @@ object GlutenColumnarRule {
copy(child = newChild)
}
- case class DummyColumnarOutputExec(override val child: SparkPlan) extends
UnaryExecNode {
+ private case class DummyColumnarOutputExec(override val child: SparkPlan)
extends UnaryExecNode {
override def supportsColumnar: Boolean = true
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
@@ -99,9 +97,8 @@ case class GlutenColumnarRule(
"This should not happen. Please leave an issue at" +
" https://github.com/apache/incubator-gluten.")
}
- val vanillaPlan = Transitions.insertTransitions(originalPlan,
outputsColumnar)
+ val vanillaPlan = Transitions.insert(originalPlan, outputsColumnar)
val applier = applierBuilder.apply(session)
applier.apply(vanillaPlan, outputsColumnar)
}
-
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index fad0ae386c..67399e25d4 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -56,10 +56,10 @@ case class EnumeratedTransform(costModel:
CostModel[SparkPlan], rules: Seq[RasRu
.create()
}
- private val reqConvention = Conv.any
+ private val convReq = Conv.any
override def apply(plan: SparkPlan): SparkPlan = {
- val constraintSet = PropertySet(List(reqConvention))
+ val constraintSet = PropertySet(Seq(convReq))
val planner = optimization.newPlanner(plan, constraintSet)
val out = planner.plan()
out
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
index 1cfe132d84..393ac35de4 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
@@ -69,19 +69,20 @@ object LongCostModel extends Logging {
*/
sealed trait Kind {
import Kind._
- values.synchronized {
+ all.synchronized {
val n = name()
- if (values.contains(n)) {
+ if (all.contains(n)) {
throw new GlutenException(s"Cost mode kind $n already registered")
}
- values += n -> this
+ all += n -> this
}
def name(): String
}
object Kind {
- val values: mutable.Map[String, Kind] = mutable.Map()
+ private val all: mutable.Map[String, Kind] = mutable.Map()
+ def values(): Map[String, Kind] = all.toMap
}
/**
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index fa92eacd4d..568ea50396 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -19,7 +19,6 @@ package
org.apache.gluten.extension.columnar.enumerated.planner.plan
import
org.apache.gluten.extension.columnar.enumerated.planner.metadata.GlutenMetadata
import org.apache.gluten.extension.columnar.enumerated.planner.property.{Conv,
ConvDef}
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
-import
org.apache.gluten.extension.columnar.transition.Convention.{KnownBatchType,
KnownRowType}
import org.apache.gluten.ras.{Metadata, PlanModel}
import org.apache.gluten.ras.property.PropertySet
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -43,17 +42,13 @@ object GlutenPlanModel {
metadata: GlutenMetadata,
constraintSet: PropertySet[SparkPlan])
extends LeafExecNode
- with KnownBatchType
- with KnownRowType {
+ with Convention.KnownBatchType
+ with Convention.KnownRowTypeForSpark33AndLater {
private val req: Conv.Req =
constraintSet.get(ConvDef).asInstanceOf[Conv.Req]
override protected def doExecute(): RDD[InternalRow] = throw new
IllegalStateException()
override def output: Seq[Attribute] = metadata.schema().output
- override def supportsColumnar: Boolean = {
- batchType != Convention.BatchType.None
- }
-
override val batchType: Convention.BatchType = {
val out = req.req.requiredBatchType match {
case ConventionReq.BatchType.Any => Convention.BatchType.None
@@ -62,13 +57,21 @@ object GlutenPlanModel {
out
}
- override val rowType: Convention.RowType = {
+ final override val supportsColumnar: Boolean = {
+ batchType != Convention.BatchType.None
+ }
+
+ override val rowType0: Convention.RowType = {
val out = req.req.requiredRowType match {
case ConventionReq.RowType.Any => Convention.RowType.None
case ConventionReq.RowType.Is(r) => r
}
out
}
+
+ final override val supportsRowBased: Boolean = {
+ rowType() != Convention.RowType.None
+ }
}
private object PlanModelImpl extends PlanModel[SparkPlan] {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
index 831b212e1f..7b2b801ac9 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
@@ -29,6 +29,18 @@ sealed trait Conv extends Property[SparkPlan] {
}
override def satisfies(other: Property[SparkPlan]): Boolean = {
+ // The following enforces strict type checking against `this` and `other`
+ // to make sure:
+ //
+ // 1. `this`, which came from user implementation of
PropertyDef.getProperty, must be a `Prop`
+ // 2. `other` which came from user implementation of
PropertyDef.getChildrenConstraints,
+ // must be a `Req`
+ //
+ // If the user implementation doesn't follow the criteria, cast error will
be thrown.
+ //
+ // This can be a common practice to implement a safe Property for RAS.
+ //
+ // TODO: Add a similar case to RAS UTs.
val req = other.asInstanceOf[Req]
if (req.isAny) {
return true
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index 840b62fb67..b57f3e0c0a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -16,10 +16,15 @@
*/
package org.apache.gluten.extension.columnar.transition
+import org.apache.gluten.exception.GlutenException
+
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec,
SparkPlan}
+import org.apache.spark.util.SparkVersionUtil
import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable
+
/**
* Convention of a query plan consists of the row data type and columnar data
type it supports to
* output.
@@ -74,6 +79,7 @@ object Convention {
}
trait BatchType extends TransitionGraph.Vertex with Serializable {
+ import BatchType._
private val initialized: AtomicBoolean = new AtomicBoolean(false)
final def ensureRegistered(): Unit = {
@@ -84,7 +90,8 @@ object Convention {
register()
}
- final private def register(): Unit = {
+ final private def register(): Unit = BatchType.synchronized {
+ assert(all.add(this))
Transition.graph.addVertex(this)
registerTransitions()
}
@@ -117,6 +124,8 @@ object Convention {
}
object BatchType {
+ private val all: mutable.Set[BatchType] = mutable.Set()
+ def values(): Set[BatchType] = all.toSet
// None indicates that the plan doesn't support batch-based processing.
final case object None extends BatchType {
override protected[this] def registerTransitions(): Unit = {}
@@ -133,7 +142,46 @@ object Convention {
def batchType(): BatchType
}
- trait KnownRowType {
+ sealed trait KnownRowType extends KnownRowType.SupportsRowBasedCompatible {
def rowType(): RowType
}
+
+ object KnownRowType {
+ // To be compatible with Spark (version < 3.3)
+ sealed trait SupportsRowBasedCompatible {
+ def supportsRowBased(): Boolean = {
+ throw new GlutenException("Illegal state: The method is not expected
to be called")
+ }
+ }
+ }
+
+ trait KnownRowTypeForSpark33AndLater extends KnownRowType {
+ this: SparkPlan =>
+ import KnownRowTypeForSpark33AndLater._
+
+ final override def rowType(): RowType = {
+ if (lteSpark32) {
+ // It's known that in Spark 3.2, one Spark plan node is considered
either only having
+ // row-based support or only having columnar support at a time.
+ // Hence, if the plan supports columnar output, we'd disable its
row-based support.
+ // The same for the opposite.
+ if (supportsColumnar) {
+ Convention.RowType.None
+ } else {
+ Convention.RowType.VanillaRow
+ }
+ } else {
+ rowType0()
+ }
+ }
+
+ def rowType0(): RowType
+ }
+
+ object KnownRowTypeForSpark33AndLater {
+ private val lteSpark32: Boolean = {
+ val v = SparkVersionUtil.majorMinorVersion()
+ SparkVersionUtil.compareMajorMinorVersion(v, (3, 2)) <= 0
+ }
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index c3feefe943..5cb3d44a15 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -21,7 +21,7 @@ import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildr
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnionExec}
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -89,17 +89,6 @@ object ConventionFunc {
}
case q: QueryStageExec => conventionOf0(q.plan)
case r: ReusedExchangeExec => conventionOf0(r.child)
- case a: AdaptiveSparkPlanExec =>
- val rowType = rowTypeOf(a)
- val batchType = if (a.supportsColumnar) {
- // By default, we execute columnar AQE with backend batch output.
- // See
org.apache.gluten.extension.columnar.transition.InsertTransitions.apply
- Backend.get().defaultBatchType
- } else {
- Convention.BatchType.None
- }
- val conv = Convention.of(rowType, batchType)
- conv
case other =>
val conv = Convention.of(rowTypeOf(other), batchTypeOf(other))
conv
@@ -119,12 +108,24 @@ object ConventionFunc {
case _ =>
Convention.RowType.None
}
- assert(
- out == Convention.RowType.None ||
plan.isInstanceOf[Convention.KnownRowType] ||
- SparkShimLoader.getSparkShims.supportsRowBased(plan))
+ checkRowType(plan, out)
out
}
+ private def checkRowType(plan: SparkPlan, rowType: Convention.RowType):
Unit = {
+ if (SparkShimLoader.getSparkShims.supportsRowBased(plan)) {
+ assert(
+ rowType != Convention.RowType.None,
+ s"Plan ${plan.nodeName} supports row-based execution, " +
+ s"however #rowTypeOf returns None")
+ } else {
+ assert(
+ rowType == Convention.RowType.None,
+ s"Plan ${plan.nodeName} doesn't support row-based " +
+ s"execution, however #rowTypeOf returns $rowType")
+ }
+ }
+
private def batchTypeOf(plan: SparkPlan): Convention.BatchType = {
val out = o.batchTypeOf.applyOrElse(plan, batchTypeOf0)
out
@@ -139,10 +140,24 @@ object ConventionFunc {
case _ =>
Convention.BatchType.None
}
- assert(out == Convention.BatchType.None || plan.supportsColumnar)
+ checkBatchType(plan, out)
out
}
+ private def checkBatchType(plan: SparkPlan, batchType:
Convention.BatchType): Unit = {
+ if (plan.supportsColumnar) {
+ assert(
+ batchType != Convention.BatchType.None,
+ s"Plan ${plan.nodeName} supports columnar " +
+ s"execution, however #batchTypeOf returns None")
+ } else {
+ assert(
+ batchType == Convention.BatchType.None,
+ s"Plan ${plan.nodeName} doesn't support " +
+ s"columnar execution, however #batchTypeOf returns $batchType")
+ }
+ }
+
override def conventionReqOf(plan: SparkPlan): ConventionReq = {
val req = o.conventionReqOf.applyOrElse(plan, conventionReqOf0)
req
@@ -169,14 +184,15 @@ object ConventionFunc {
// To align with
ApplyColumnarRulesAndInsertTransitions#insertTransitions
ConventionReq.any
case u: UnionExec =>
- // We force vanilla union to output row data to get best compatibility
with vanilla Spark.
+ // We force vanilla union to output row data to get the best
compatibility with vanilla
+ // Spark.
// As a result it's a common practice to rewrite it with GlutenPlan
for offloading.
ConventionReq.of(
ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
ConventionReq.BatchType.Any)
case other =>
// In the normal case, children's convention should follow parent
node's convention.
- // Note, we don't have consider C2R / R2C here since they are already
removed by
+ // Note, we don't have to consider C2R / R2C here since they are
already removed by
// RemoveTransitions.
val thisConv = conventionOf0(other)
thisConv.asReq()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index ce613bf7db..a081f21434 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.backend.Backend
-
import org.apache.spark.sql.execution.SparkPlan
/**
@@ -53,15 +51,14 @@ object ConventionReq {
override val requiredBatchType: BatchType
) extends ConventionReq
- val any: ConventionReq = Impl(RowType.Any, BatchType.Any)
- val row: ConventionReq = Impl(RowType.Is(Convention.RowType.VanillaRow),
BatchType.Any)
- val vanillaBatch: ConventionReq =
- Impl(RowType.Any, BatchType.Is(Convention.BatchType.VanillaBatch))
- lazy val backendBatch: ConventionReq =
- Impl(RowType.Any, BatchType.Is(Backend.get().defaultBatchType))
+ val any: ConventionReq = of(RowType.Any, BatchType.Any)
+ val row: ConventionReq = ofRow(RowType.Is(Convention.RowType.VanillaRow))
+ val vanillaBatch: ConventionReq =
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatch))
def get(plan: SparkPlan): ConventionReq =
ConventionFunc.create().conventionReqOf(plan)
def of(rowType: RowType, batchType: BatchType): ConventionReq =
Impl(rowType, batchType)
+ def ofRow(rowType: RowType): ConventionReq = Impl(rowType, BatchType.Any)
+ def ofBatch(batchType: BatchType): ConventionReq = Impl(RowType.Any,
batchType)
trait KnownChildrenConventions {
def requiredChildrenConventions(): Seq[ConventionReq]
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
index ced9378ad6..0a7f635b8b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
@@ -101,22 +101,27 @@ object Transition {
case (ConventionReq.RowType.Is(toRowType),
ConventionReq.BatchType.Any) =>
from.rowType match {
case Convention.RowType.None =>
+ // Input query plan doesn't have recognizable row-based output,
+ // find columnar-to-row transition.
graph.transitionOfOption(from.batchType,
toRowType).getOrElse(orElse)
- case fromRowType =>
+ case fromRowType if toRowType == fromRowType =>
// We have only one single built-in row type.
- assert(toRowType == fromRowType)
Transition.empty
+ case _ =>
+ throw new UnsupportedOperationException(
+ "Row-to-row transition is not yet supported")
}
case (ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(toBatchType)) =>
from.batchType match {
case Convention.BatchType.None =>
+ // Input query plan doesn't have recognizable columnar output,
+ // find row-to-columnar transition.
graph.transitionOfOption(from.rowType,
toBatchType).getOrElse(orElse)
+ case fromBatchType if toBatchType == fromBatchType =>
+ Transition.empty
case fromBatchType =>
- if (toBatchType == fromBatchType) {
- Transition.empty
- } else {
- graph.transitionOfOption(fromBatchType,
toBatchType).getOrElse(orElse)
- }
+ // Find columnar-to-columnar transition.
+ graph.transitionOfOption(fromBatchType,
toBatchType).getOrElse(orElse)
}
case (ConventionReq.RowType.Any, ConventionReq.BatchType.Any) =>
Transition.empty
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 2f2840b52b..10d50f453d 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -16,24 +16,22 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.extension.columnar.transition.Convention.BatchType
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import scala.annotation.tailrec
-case class InsertTransitions(outputsColumnar: Boolean) extends Rule[SparkPlan]
{
+case class InsertTransitions(convReq: ConventionReq) extends Rule[SparkPlan] {
private val convFunc = ConventionFunc.create()
override def apply(plan: SparkPlan): SparkPlan = {
// Remove all transitions at first.
val removed = RemoveTransitions.apply(plan)
val filled = fillWithTransitions(removed)
- if (!outputsColumnar) {
- return Transitions.toRowPlan(filled)
- }
- Transitions.toBackendBatchPlan(filled)
+ val out = Transitions.enforceReq(filled, convReq)
+ out
}
private def fillWithTransitions(plan: SparkPlan): SparkPlan =
plan.transformUp {
@@ -63,6 +61,17 @@ case class InsertTransitions(outputsColumnar: Boolean)
extends Rule[SparkPlan] {
}
}
+object InsertTransitions {
+ def create(outputsColumnar: Boolean, batchType: BatchType):
InsertTransitions = {
+ val conventionReq = if (outputsColumnar) {
+ ConventionReq.ofBatch(ConventionReq.BatchType.Is(batchType))
+ } else {
+ ConventionReq.row
+ }
+ InsertTransitions(conventionReq)
+ }
+}
+
object RemoveTransitions extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { case p
=> removeForNode(p) }
@@ -76,8 +85,8 @@ object RemoveTransitions extends Rule[SparkPlan] {
}
object Transitions {
- def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
= {
- InsertTransitions(outputsColumnar).apply(plan)
+ def insert(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+ InsertTransitions.create(outputsColumnar,
BatchType.VanillaBatch).apply(plan)
}
def toRowPlan(plan: SparkPlan): SparkPlan = {
@@ -88,24 +97,13 @@ object Transitions {
ConventionReq.BatchType.Any))
}
- def toBackendBatchPlan(plan: SparkPlan): SparkPlan = {
- val backendBatchType = Backend.get().defaultBatchType
- val out = toBatchPlan(plan, backendBatchType)
- out
- }
-
- def toVanillaBatchPlan(plan: SparkPlan): SparkPlan = {
- val out = toBatchPlan(plan, Convention.BatchType.VanillaBatch)
- out
- }
-
- private def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType):
SparkPlan = {
+ def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType):
SparkPlan = {
enforceReq(
plan,
ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(toBatchType)))
}
- private def enforceReq(plan: SparkPlan, req: ConventionReq): SparkPlan = {
+ def enforceReq(plan: SparkPlan, req: ConventionReq): SparkPlan = {
val convFunc = ConventionFunc.create()
val removed = RemoveTransitions.removeForNode(plan)
val transition = Transition
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index 89314dfcf3..efe584d441 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -150,8 +150,8 @@ object GlutenInjector {
private def findCostModel(
registry: LongCostModel.Registry,
aliasOrClass: String): CostModel[SparkPlan] = {
- if (LongCostModel.Kind.values.contains(aliasOrClass)) {
- val kind = LongCostModel.Kind.values(aliasOrClass)
+ if (LongCostModel.Kind.values().contains(aliasOrClass)) {
+ val kind = LongCostModel.Kind.values()(aliasOrClass)
val model = registry.get(kind)
return model
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala
b/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala
index 2de1c7b4ed..ef8ca2c974 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala
@@ -41,7 +41,7 @@ object Iterators {
}
def wrap[A](in: Iterator[A]): WrapperBuilder[A] = {
- wrap(V1, in)
+ wrap(DEFAULT_VERSION, in)
}
def wrap[A](version: Version, in: Iterator[A]): WrapperBuilder[A] = {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
similarity index 64%
copy from
backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
copy to gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
index 38a6d1803d..6864d3caa0 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
@@ -14,15 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.spark.util
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
-trait BaseArrowScanExec extends GlutenPlan {
+object SparkVersionUtil {
+ def majorMinorVersion(): (Int, Int) = {
+ VersionUtils.majorMinorVersion(org.apache.spark.SPARK_VERSION)
+ }
- final override protected def batchType0(): Convention.BatchType = {
- ArrowBatches.ArrowJavaBatch
+ // Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one >
other.
+ def compareMajorMinorVersion(one: (Int, Int), other: (Int, Int)): Int = {
+ val base = 1000
+ assert(one._2 < base && other._2 < base)
+ one._1 * base + one._2 - (other._1 * base + other._2)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 506424b79c..1eb69da6e5 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -28,6 +29,10 @@ import
org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF
import org.apache.spark.sql.types.StructField
trait BackendSettingsApi {
+
+ /** The columnar-batch type this backend is by default using. */
+ def primaryBatchType: Convention.BatchType
+
def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 76505da3e0..dbe667ebb2 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -263,8 +263,6 @@ case class ColumnarUnionExec(children: Seq[SparkPlan])
extends GlutenPlan {
case _ =>
}
- override def supportsColumnar: Boolean = true
-
override def output: Seq[Attribute] = {
children.map(_.output).transpose.map {
attrs =>
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 7f7e54e9c7..28fb691896 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -45,7 +45,6 @@ import java.io.{IOException, ObjectOutputStream}
*/
case class ColumnarCartesianProductBridge(child: SparkPlan) extends
UnaryExecNode with GlutenPlan {
override def output: Seq[Attribute] = child.output
- override def supportsColumnar: Boolean = true
override protected def doExecute(): RDD[InternalRow] =
throw new UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
child.executeColumnar()
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
index f40a7f8f07..3b13207c93 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
@@ -30,8 +30,6 @@ case class ColumnarCoalesceExec(numPartitions: Int, child:
SparkPlan)
extends UnaryExecNode
with GlutenPlan {
- override def supportsColumnar: Boolean = true
-
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index fd86106bf3..fae3115981 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.ConventionReq
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.spark.broadcast.Broadcast
@@ -43,6 +43,10 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
final override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ override def batchType(): Convention.BatchType = Convention.BatchType.None
+
+ override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+
override def doExecuteBroadcast[T](): Broadcast[T] = {
// Require for explicit implementation, otherwise throw error.
super.doExecuteBroadcast[T]()
@@ -55,7 +59,8 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
}
override def requiredChildrenConventions(): Seq[ConventionReq] = {
- List(ConventionReq.backendBatch)
+ List(
+ ConventionReq.ofBatch(
+
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
}
-
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
index f4dd160b58..2a52616361 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
@@ -45,6 +46,8 @@ abstract class RowToColumnarExecBase(child: SparkPlan)
final override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
final override def doExecute(): RDD[InternalRow] = {
child.execute()
}
@@ -54,8 +57,6 @@ abstract class RowToColumnarExecBase(child: SparkPlan)
super.doExecuteBroadcast[T]()
}
- final override def supportsColumnar: Boolean = true
-
def doExecuteColumnarInternal(): RDD[ColumnarBatch]
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
index c4e192d183..c960bda249 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
@@ -43,7 +43,6 @@ case class TakeOrderedAndProjectExecTransformer(
with GlutenPlan {
override def outputPartitioning: Partitioning = SinglePartition
override def outputOrdering: Seq[SortOrder] = sortOrder
- override def supportsColumnar: Boolean = true
override def output: Seq[Attribute] = {
projectList.map(_.toAttribute)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index beb7fe5f99..e8a42883a5 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -60,8 +60,6 @@ trait TransformSupport extends GlutenPlan {
s"${this.getClass.getSimpleName} doesn't support doExecute")
}
- final override lazy val supportsColumnar: Boolean = true
-
/**
* Returns all the RDDs of ColumnarBatch which generates the input rows.
*
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index a6ca9b1dcc..3639ac522f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -17,11 +17,11 @@
package org.apache.gluten.extension
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.TransformerState
import org.apache.gluten.extension.columnar.FallbackTag
+import org.apache.gluten.extension.columnar.FallbackTag.{Appendable, Converter}
import org.apache.gluten.extension.columnar.FallbackTags.add
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.extension.columnar.validator.Validator
@@ -34,7 +34,6 @@ import org.apache.gluten.test.TestStats
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.SparkPlan
-import FallbackTag.{Appendable, Converter}
import com.google.common.collect.Lists
sealed trait ValidationResult {
@@ -83,7 +82,11 @@ object ValidationResult {
}
/** Every Gluten Operator should extend this trait. */
-trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with
LogLevelUtil {
+trait GlutenPlan
+ extends SparkPlan
+ with Convention.KnownBatchType
+ with Convention.KnownRowTypeForSpark33AndLater
+ with LogLevelUtil {
protected lazy val enableNativeValidation = glutenConf.enableNativeValidation
protected def glutenConf: GlutenConfig = GlutenConfig.getConf
@@ -126,17 +129,20 @@ trait GlutenPlan extends SparkPlan with
Convention.KnownBatchType with LogLevelU
}
}
- final override def batchType(): Convention.BatchType = {
- if (!supportsColumnar) {
- return Convention.BatchType.None
- }
- val batchType = batchType0()
- assert(batchType != Convention.BatchType.None)
- batchType
+ final override val supportsColumnar: Boolean = {
+ batchType() != Convention.BatchType.None
+ }
+
+ override def batchType(): Convention.BatchType = {
+ BackendsApiManager.getSettings.primaryBatchType
+ }
+
+ final override val supportsRowBased: Boolean = {
+ rowType() != Convention.RowType.None
}
- protected def batchType0(): Convention.BatchType = {
- Backend.get().defaultBatchType
+ override def rowType0(): Convention.RowType = {
+ Convention.RowType.None
}
protected def doValidateInternal(): ValidationResult =
ValidationResult.succeeded
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index a199b5920c..e11c613954 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.extension.columnar
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
Transitions}
import org.apache.gluten.utils.PlanUtil
@@ -106,7 +107,8 @@ object MiscColumnarRules {
private def toColumnarBroadcastExchange(
exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
- val newChild = Transitions.toBackendBatchPlan(exchange.child)
+ val newChild =
+ Transitions.toBatchPlan(exchange.child,
BackendsApiManager.getSettings.primaryBatchType)
ColumnarBroadcastExchangeExec(exchange.mode, newChild)
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
index e1c8b6f41f..44ed81f565 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.{FallbackTag, FallbackTags}
import org.apache.gluten.extension.columnar.FallbackTags.add
-import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike, Transitions}
+import org.apache.gluten.extension.columnar.transition.{BackendTransitions,
ColumnarToRowLike, RowToColumnarLike}
import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.catalyst.rules.Rule
@@ -32,6 +32,9 @@ import
org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.exchange.Exchange
+
+
+
// format: off
/**
* Note, this rule should only fallback to row-based plan if there is no harm.
@@ -226,7 +229,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
case _ =>
}
- val planWithTransitions = Transitions.insertTransitions(originalPlan,
outputsColumnar)
+ val planWithTransitions = BackendTransitions.insert(originalPlan,
outputsColumnar)
planWithTransitions
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/BackendTransitions.scala
similarity index 56%
copy from
backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
copy to
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/BackendTransitions.scala
index 38a6d1803d..86d4b40d55 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/BackendTransitions.scala
@@ -14,15 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.backendsapi.BackendsApiManager
-trait BaseArrowScanExec extends GlutenPlan {
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
- final override protected def batchType0(): Convention.BatchType = {
- ArrowBatches.ArrowJavaBatch
+case class InsertBackendTransitions(outputsColumnar: Boolean) extends
Rule[SparkPlan] {
+ def apply(plan: SparkPlan): SparkPlan = {
+ InsertTransitions
+ .create(outputsColumnar, BackendsApiManager.getSettings.primaryBatchType)
+ .apply(plan)
+ }
+}
+
+object BackendTransitions {
+ def insert(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+ InsertBackendTransitions(outputsColumnar)(plan)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index d55733fe4e..01a4380a14 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -121,8 +121,6 @@ case class ColumnarBroadcastExchangeExec(mode:
BroadcastMode, child: SparkPlan)
@transient
private val timeout: Long = SQLConf.get.broadcastTimeout
- override def supportsColumnar: Boolean = true
-
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 5ea5e4159d..9ec078e003 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution._
import org.apache.gluten.extension.columnar.transition.Convention
@@ -178,9 +177,9 @@ case class ColumnarInputAdapter(child: SparkPlan)
extends InputAdapterGenerateTreeStringShim
with Convention.KnownBatchType {
override def output: Seq[Attribute] = child.output
- override def supportsColumnar: Boolean = true
+ override val supportsColumnar: Boolean = true
override def batchType(): Convention.BatchType =
- Backend.get().defaultBatchType
+ BackendsApiManager.getSettings.primaryBatchType
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
child.executeColumnar()
override def outputPartitioning: Partitioning = child.outputPartitioning
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index 4f62377b09..d4b33be292 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -126,7 +126,6 @@ case class ColumnarShuffleExchangeExec(
override def nodeName: String = "ColumnarExchange"
- override def supportsColumnar: Boolean = true
override def numMappers: Int = shuffleDependency.rdd.getNumPartitions
override def numPartitions: Int = shuffleDependency.partitioner.numPartitions
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index fcd82d8c19..25d6c4ed61 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenPlan
-import
org.apache.gluten.extension.columnar.transition.Convention.{KnownRowType,
RowType}
+import org.apache.gluten.extension.columnar.transition.Convention.RowType
import org.apache.gluten.extension.columnar.transition.ConventionReq
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.TaskContext
@@ -45,18 +44,17 @@ abstract class ColumnarWriteFilesExec protected (
override val right: SparkPlan)
extends BinaryExecNode
with GlutenPlan
- with KnownChildrenConventions
- with KnownRowType
+ with ConventionReq.KnownChildrenConventions
with ColumnarWriteFilesExec.ExecuteWriteCompatible {
val child: SparkPlan = left
override lazy val references: AttributeSet = AttributeSet.empty
- override def supportsColumnar: Boolean = true
-
override def requiredChildrenConventions(): Seq[ConventionReq] = {
- List(ConventionReq.backendBatch)
+ List(
+ ConventionReq.ofBatch(
+
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
}
/**
@@ -69,7 +67,7 @@ abstract class ColumnarWriteFilesExec protected (
*
* Since https://github.com/apache/incubator-gluten/pull/6745.
*/
- override def rowType(): RowType = {
+ override def rowType0(): RowType = {
RowType.VanillaRow
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 917f6c03df..fb42c55ba0 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.ColumnarToRowExecBase
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Transitions
+import org.apache.gluten.extension.columnar.transition.{Convention,
Transitions}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
@@ -61,6 +61,8 @@ case class FakeRowAdaptor(child: SparkPlan)
override def output: Seq[Attribute] = child.output
+ override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+
override protected def doExecute(): RDD[InternalRow] = {
doExecuteColumnar().map(cb => new FakeRow(cb))
}
@@ -74,7 +76,7 @@ case class FakeRowAdaptor(child: SparkPlan)
if (child.supportsColumnar) {
child.executeColumnar()
} else {
- val r2c = Transitions.toBackendBatchPlan(child)
+ val r2c = Transitions.toBatchPlan(child,
BackendsApiManager.getSettings.primaryBatchType)
r2c.executeColumnar()
}
}
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
index 9712bd2c21..5daca9bede 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -32,7 +32,7 @@ class TransitionSuite extends SharedSparkSession {
test("Trivial C2R") {
val in = BatchLeaf(TypeA)
val out = ConventionFunc.ignoreBackend {
- Transitions.insertTransitions(in, outputsColumnar = false)
+ Transitions.insert(in, outputsColumnar = false)
}
assert(out == BatchToRow(TypeA, BatchLeaf(TypeA)))
}
@@ -40,7 +40,7 @@ class TransitionSuite extends SharedSparkSession {
test("Insert C2R") {
val in = RowUnary(BatchLeaf(TypeA))
val out = ConventionFunc.ignoreBackend {
- Transitions.insertTransitions(in, outputsColumnar = false)
+ Transitions.insert(in, outputsColumnar = false)
}
assert(out == RowUnary(BatchToRow(TypeA, BatchLeaf(TypeA))))
}
@@ -48,7 +48,7 @@ class TransitionSuite extends SharedSparkSession {
test("Insert R2C") {
val in = BatchUnary(TypeA, RowLeaf())
val out = ConventionFunc.ignoreBackend {
- Transitions.insertTransitions(in, outputsColumnar = false)
+ Transitions.insert(in, outputsColumnar = false)
}
assert(out == BatchToRow(TypeA, BatchUnary(TypeA, RowToBatch(TypeA,
RowLeaf()))))
}
@@ -56,7 +56,7 @@ class TransitionSuite extends SharedSparkSession {
test("Insert C2R2C") {
val in = BatchUnary(TypeA, BatchLeaf(TypeB))
val out = ConventionFunc.ignoreBackend {
- Transitions.insertTransitions(in, outputsColumnar = false)
+ Transitions.insert(in, outputsColumnar = false)
}
assert(
out == BatchToRow(
@@ -67,7 +67,7 @@ class TransitionSuite extends SharedSparkSession {
test("Insert C2C") {
val in = BatchUnary(TypeA, BatchLeaf(TypeC))
val out = ConventionFunc.ignoreBackend {
- Transitions.insertTransitions(in, outputsColumnar = false)
+ Transitions.insert(in, outputsColumnar = false)
}
assert(
out == BatchToRow(
@@ -79,7 +79,7 @@ class TransitionSuite extends SharedSparkSession {
val in = BatchUnary(TypeA, BatchLeaf(TypeD))
assertThrows[GlutenException] {
ConventionFunc.ignoreBackend {
- Transitions.insertTransitions(in, outputsColumnar = false)
+ Transitions.insert(in, outputsColumnar = false)
}
}
}
@@ -116,8 +116,7 @@ object TransitionSuite extends TransitionSuiteBase {
case class RowToBatch(toBatchType: Convention.BatchType, override val child:
SparkPlan)
extends RowToColumnarTransition
with GlutenPlan {
- override def supportsColumnar: Boolean = true
- override protected def batchType0(): Convention.BatchType = toBatchType
+ override def batchType(): Convention.BatchType = toBatchType
override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
copy(child = newChild)
override protected def doExecute(): RDD[InternalRow] =
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index d82cc3aac9..43805b3d65 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -24,22 +24,18 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{BinaryExecNode, LeafExecNode,
SparkPlan, UnaryExecNode}
trait TransitionSuiteBase {
- case class BatchLeaf(override val batchType0: Convention.BatchType)
+ case class BatchLeaf(override val batchType: Convention.BatchType)
extends LeafExecNode
with GlutenPlan {
- override def supportsColumnar: Boolean = true
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = List.empty
}
- case class BatchUnary(
- override val batchType0: Convention.BatchType,
- override val child: SparkPlan)
+ case class BatchUnary(override val batchType: Convention.BatchType, override
val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
- override def supportsColumnar: Boolean = true
override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
copy(child = newChild)
@@ -50,12 +46,11 @@ trait TransitionSuiteBase {
}
case class BatchBinary(
- override val batchType0: Convention.BatchType,
+ override val batchType: Convention.BatchType,
override val left: SparkPlan,
override val right: SparkPlan)
extends BinaryExecNode
with GlutenPlan {
- override def supportsColumnar: Boolean = true
override protected def withNewChildrenInternal(
newLeft: SparkPlan,
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 7abe8228fa..7d4315e8d7 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.RemoveFallbackTagRule
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -44,7 +44,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
assert(outputPlan == originalPlan)
@@ -61,7 +61,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -79,7 +79,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -99,7 +99,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -119,7 +119,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -199,17 +199,13 @@ private object FallbackStrategiesSuite {
}
// For replacing LeafOp.
- case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
- extends LeafExecNode
- with GlutenPlan {
+ case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
// For replacing UnaryOp1.
- case class UnaryOp1Transformer(
- override val child: SparkPlan,
- override val supportsColumnar: Boolean = true)
+ case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index d03619a2e0..e8cc7898c2 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.extension.columnar.{FallbackTags,
RemoveFallbackTagRule
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -43,7 +43,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
assert(outputPlan == originalPlan)
@@ -60,7 +60,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -78,7 +78,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -98,7 +98,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -118,7 +118,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -228,17 +228,13 @@ private object FallbackStrategiesSuite {
}
// For replacing LeafOp.
- case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
- extends LeafExecNode
- with GlutenPlan {
+ case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
// For replacing UnaryOp1.
- case class UnaryOp1Transformer(
- override val child: SparkPlan,
- override val supportsColumnar: Boolean = true)
+ case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index d03619a2e0..e8cc7898c2 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.extension.columnar.{FallbackTags,
RemoveFallbackTagRule
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -43,7 +43,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
assert(outputPlan == originalPlan)
@@ -60,7 +60,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -78,7 +78,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -98,7 +98,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -118,7 +118,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -228,17 +228,13 @@ private object FallbackStrategiesSuite {
}
// For replacing LeafOp.
- case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
- extends LeafExecNode
- with GlutenPlan {
+ case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
// For replacing UnaryOp1.
- case class UnaryOp1Transformer(
- override val child: SparkPlan,
- override val supportsColumnar: Boolean = true)
+ case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index 640b8cdc6f..cdfb63aa20 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.benchmarks
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer,
WholeStageTransformer}
import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.jni.JniLibLoader
@@ -125,7 +126,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
// generate ColumnarToRow
val columnarToRowPlan =
- Transitions.toBackendBatchPlan(newWholeStage)
+ Transitions.toBatchPlan(newWholeStage,
BackendsApiManager.getSettings.primaryBatchType)
val newWholeStageRDD = newWholeStage.executeColumnar()
val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index f43652a7d4..1d45e8a672 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.extension.columnar.{FallbackTags,
RemoveFallbackTagRule
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -44,7 +44,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
assert(outputPlan == originalPlan)
@@ -61,7 +61,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -79,7 +79,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -99,7 +99,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to fall back the entire plan.
@@ -119,7 +119,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
_ => {
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
},
- c => InsertTransitions(c.outputsColumnar)))
+ c => InsertBackendTransitions(c.outputsColumnar)))
.enableAdaptiveContext()
val outputPlan = rule.apply(originalPlan, false)
// Expect to get the plan with columnar rule applied.
@@ -229,17 +229,13 @@ private object FallbackStrategiesSuite {
}
// For replacing LeafOp.
- case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
- extends LeafExecNode
- with GlutenPlan {
+ case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
// For replacing UnaryOp1.
- case class UnaryOp1Transformer(
- override val child: SparkPlan,
- override val supportsColumnar: Boolean = true)
+ case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 2bdde3b4aa..a3bd5079b0 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -71,10 +71,9 @@ abstract class AbstractFileSourceScanExec(
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {
- // Note that some vals referring the file-based relation are lazy
intentionally
- // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
- override lazy val supportsColumnar: Boolean = {
- relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ override def supportsColumnar: Boolean = {
+ // The value should be defined in GlutenPlan.
+ throw new UnsupportedOperationException("Unreachable code")
}
private lazy val needsUnsafeRowConversion: Boolean = {
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 6b3d679fcd..c885f0cf44 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -75,10 +75,9 @@ abstract class AbstractFileSourceScanExec(
lazy val metadataColumns: Seq[AttributeReference] =
output.collect { case FileSourceMetadataAttribute(attr) => attr }
- // Note that some vals referring the file-based relation are lazy
intentionally
- // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
- override lazy val supportsColumnar: Boolean = {
- relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ override def supportsColumnar: Boolean = {
+ // The value should be defined in GlutenPlan.
+ throw new UnsupportedOperationException("Unreachable code")
}
private lazy val needsUnsafeRowConversion: Boolean = {
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 5e75186186..53ea6f543a 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -67,15 +67,9 @@ abstract class AbstractFileSourceScanExec(
override val disableBucketedScan: Boolean = false)
extends FileSourceScanLike {
- // Note that some vals referring the file-based relation are lazy
intentionally
- // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
- override lazy val supportsColumnar: Boolean = {
- val conf = relation.sparkSession.sessionState.conf
- // Only output columnar if there is WSCG to read it.
- val requiredWholeStageCodegenSettings =
- conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf,
schema)
- requiredWholeStageCodegenSettings &&
- relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ override def supportsColumnar: Boolean = {
+ // The value should be defined in GlutenPlan.
+ throw new UnsupportedOperationException("Unreachable code")
}
private lazy val needsUnsafeRowConversion: Boolean = {
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 32cbd435b0..c8dbcc2fed 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -67,15 +67,9 @@ abstract class AbstractFileSourceScanExec(
override val disableBucketedScan: Boolean = false)
extends FileSourceScanLike {
- // Note that some vals referring the file-based relation are lazy
intentionally
- // so that this plan can be canonicalized on executor side too. See
SPARK-23731.
- override lazy val supportsColumnar: Boolean = {
- val conf = relation.sparkSession.sessionState.conf
- // Only output columnar if there is WSCG to read it.
- val requiredWholeStageCodegenSettings =
- conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf,
schema)
- requiredWholeStageCodegenSettings &&
- relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ override def supportsColumnar: Boolean = {
+ // The value should be defined in GlutenPlan.
+ throw new UnsupportedOperationException("Unreachable code")
}
private lazy val needsUnsafeRowConversion: Boolean = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]