This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 353169e5cd [GLUTEN-6920][CORE] Redesign and move trait `GlutenPlan` to
`gluten-core` (#8036)
353169e5cd is described below
commit 353169e5cd3541cf56ee8e5bdd01e73a5d157aaf
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 27 13:37:06 2024 +0800
[GLUTEN-6920][CORE] Redesign and move trait `GlutenPlan` to `gluten-core`
(#8036)
---
.../extension/FallbackBroadcastHashJoinRules.scala | 8 +-
.../GlutenClickHouseTPCDSMetricsSuite.scala | 2 +-
.../metrics/GlutenClickHouseTPCHMetricsSuite.scala | 2 +-
...ckHouseTPCHColumnarShuffleParquetAQESuite.scala | 2 +-
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 2 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 1 -
.../execution/ColumnarPartialProjectExec.scala | 12 ++-
.../gluten/execution/RowToVeloxColumnarExec.scala | 1 -
.../gluten/execution/VeloxResizeBatchesExec.scala | 7 +-
.../ArrowConvertorRule.scala | 3 +-
.../api/python/ColumnarArrowEvalPythonExec.scala | 10 ++-
.../spark/sql/execution/BaseArrowScanExec.scala | 4 +-
.../apache/gluten/execution/FallbackSuite.scala | 1 -
.../gluten/execution/ColumnarToColumnarExec.scala | 14 +---
.../org/apache/gluten/execution/GlutenPlan.scala | 60 +++++++++++++
.../enumerated/planner/plan/GlutenPlanModel.scala | 5 +-
.../enumerated/planner/property/Conv.scala | 2 +-
.../extension/columnar/transition/Convention.scala | 26 +++---
.../columnar/transition/ConventionFunc.scala | 73 +++++++---------
.../columnar/transition/ConventionReq.scala | 6 +-
.../columnar/transition/Transitions.scala | 8 +-
.../BasicPhysicalOperatorTransformer.scala | 9 +-
.../CartesianProductExecTransformer.scala | 5 +-
.../gluten/execution/ColumnarCoalesceExec.scala | 9 +-
.../gluten/execution/ColumnarToRowExecBase.scala | 19 ++---
.../gluten/execution/RowToColumnarExecBase.scala | 9 +-
.../TakeOrderedAndProjectExecTransformer.scala | 7 +-
.../gluten/execution/WholeStageTransformer.scala | 87 ++++++++++++++++++-
.../org/apache/gluten/extension/GlutenPlan.scala | 98 ----------------------
.../extension/columnar/enumerated/RasOffload.scala | 6 +-
.../extension/columnar/enumerated/RemoveSort.scala | 3 +-
.../columnar/heuristic/ExpandFallbackPolicy.scala | 4 +-
.../columnar/offload/OffloadSingleNodeRules.scala | 4 +-
.../extension/columnar/validator/Validators.scala | 2 +-
.../execution/ColumnarBroadcastExchangeExec.scala | 9 +-
.../ColumnarCollapseTransformStages.scala | 15 +++-
.../execution/ColumnarShuffleExchangeExec.scala | 10 ++-
.../execution/ColumnarSubqueryBroadcastExec.scala | 7 +-
.../sql/execution/ColumnarWriteFilesExec.scala | 17 ++--
.../spark/sql/execution/GlutenExplainUtils.scala | 3 +-
.../sql/execution/GlutenFallbackReporter.scala | 2 +-
.../spark/sql/execution/GlutenImplicits.scala | 6 +-
.../datasources/GlutenWriterColumnarRules.scala | 5 +-
.../columnar/transition/TransitionSuite.scala | 4 +-
.../columnar/transition/TransitionSuiteBase.scala | 5 +-
.../org/apache/gluten/test/FallbackUtil.scala | 2 +-
.../org/apache/spark/sql/GlutenQueryTest.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 11 ++-
.../sql/statistics/SparkFunctionStatistics.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 11 ++-
.../sql/statistics/SparkFunctionStatistics.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 11 ++-
.../spark/sql/sources/GlutenInsertSuite.scala | 2 +-
.../sql/statistics/SparkFunctionStatistics.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 11 ++-
.../spark/sql/sources/GlutenInsertSuite.scala | 2 +-
.../sql/statistics/SparkFunctionStatistics.scala | 2 +-
.../sql/GlutenExpressionDataTypesValidation.scala | 6 +-
58 files changed, 381 insertions(+), 279 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
index 4af2d0fc0c..6a788617a6 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
@@ -116,19 +116,19 @@ case class
FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
if (FallbackTags.nonEmpty(bnlj)) {
ValidationResult.failed("broadcast join is already tagged as not
transformable")
} else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ val bnljTransformer =
BackendsApiManager.getSparkPlanExecApiInstance
.genBroadcastNestedLoopJoinExecTransformer(
bnlj.left,
bnlj.right,
bnlj.buildSide,
bnlj.joinType,
bnlj.condition)
- val isTransformable = transformer.doValidate()
- if (isTransformable.ok()) {
+ val isBnljTransformable = bnljTransformer.doValidate()
+ if (isBnljTransformable.ok()) {
val exchangeTransformer = ColumnarBroadcastExchangeExec(mode,
child)
exchangeTransformer.doValidate()
} else {
- isTransformable
+ isBnljTransformable
}
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
index cbf1caf44e..02445270d4 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.execution.metrics
import org.apache.gluten.execution.{ColumnarNativeIterator,
GlutenClickHouseTPCDSAbstractSuite, WholeStageTransformer}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.Attribute
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index 3cfb8cc4fc..015da0dfae 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.execution.metrics
import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.Attribute
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index 885e50b046..1c3e4de4c6 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution.tpch
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.utils.Arm
import org.apache.spark.SparkConf
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 4a2b7040fa..fbefc054fa 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.execution.tpch
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.DataFrame
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 0a62a41baa..7841e6cd94 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.columnarbatch.VeloxBatch
-import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index 84de41daa0..20744f531b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -17,9 +17,11 @@
package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.expression.{ArrowProjection, ExpressionUtils}
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -31,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
Attribute, AttributeRef
import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan,
UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.hive.HiveUdfUtil
-import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType,
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType,
NullType, ShortType, StringType, TimestampType, YearMonthIntervalType}
+import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import scala.collection.mutable.ListBuffer
@@ -51,7 +53,7 @@ import scala.collection.mutable.ListBuffer
case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
replacedAliasUdf: Seq[Alias])
extends UnaryExecNode
- with GlutenPlan {
+ with ValidatablePlan {
private val projectAttributes: ListBuffer[Attribute] = ListBuffer()
private val projectIndexInChild: ListBuffer[Int] = ListBuffer()
@@ -73,6 +75,10 @@ case class ColumnarPartialProjectExec(original: ProjectExec,
child: SparkPlan)(
override def output: Seq[Attribute] = child.output ++
replacedAliasUdf.map(_.toAttribute)
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
final override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
s"${this.getClass.getSimpleName} doesn't support doExecute")
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index a853778484..9921ffbfab 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -43,7 +43,6 @@ import org.apache.arrow.memory.ArrowBuf
import scala.collection.mutable.ListBuffer
case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBase(child = child) {
-
override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index 5283ab61e3..9ed687d337 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -16,7 +16,8 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.utils.VeloxBatchResizer
@@ -53,6 +54,10 @@ case class VeloxResizeBatchesExec(
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append
/ split batches")
)
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
similarity index 97%
rename from
backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
rename to
backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
index b1b0b813f6..5e02cf54b0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.datasource
+package org.apache.gluten.extension
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.datasource.ArrowCSVFileFormat
import org.apache.gluten.datasource.v2.ArrowCSVTable
import org.apache.gluten.sql.shims.SparkShimLoader
diff --git
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index a2eee53660..548dec13be 100644
---
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -19,9 +19,9 @@ package org.apache.spark.api.python
import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.PullOutProjectHelper
@@ -213,11 +213,13 @@ case class ColumnarArrowEvalPythonExec(
evalType: Int)
extends EvalPythonExec
with GlutenPlan
- with KnownChildrenConventions {
+ with KnownChildConvention {
override def batchType(): Convention.BatchType = ArrowJavaBatch
- override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
+ override def requiredChildConvention(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(ArrowJavaBatch)))
override lazy val metrics = Map(
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
index 6617e8b138..1aacc1b954 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.execution
import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention
trait BaseArrowScanExec extends GlutenPlan {
final override def batchType(): Convention.BatchType = {
ArrowBatches.ArrowJavaBatch
}
+
+ final override def rowType0(): Convention.RowType = Convention.RowType.None
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 0f7b70bf00..0f94b8648e 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarShuffleExchangeExec, SparkPlan}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index 7ca4b36b06..e27b6a4e2f 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -30,9 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
abstract class ColumnarToColumnarExec(from: Convention.BatchType, to:
Convention.BatchType)
extends ColumnarToColumnarTransition
- with Convention.KnownBatchType
- with Convention.KnownRowTypeForSpark33AndLater
- with ConventionReq.KnownChildrenConventions {
+ with GlutenPlan {
def child: SparkPlan
protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch]
@@ -46,21 +44,13 @@ abstract class ColumnarToColumnarExec(from:
Convention.BatchType, to: Convention
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
convert batches")
)
- final override val supportsColumnar: Boolean = {
- batchType() != Convention.BatchType.None
- }
-
override def batchType(): Convention.BatchType = to
- final override val supportsRowBased: Boolean = {
- rowType() != Convention.RowType.None
- }
-
override def rowType0(): Convention.RowType = {
Convention.RowType.None
}
- override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+ override def requiredChildConvention(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(from)))
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
new file mode 100644
index 0000000000..460326d8d9
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
+
+import org.apache.spark.sql.execution.SparkPlan
+
+trait GlutenPlan
+ extends SparkPlan
+ with Convention.KnownBatchType
+ with Convention.KnownRowTypeForSpark33AndLater
+ with GlutenPlan.SupportsRowBasedCompatible
+ with ConventionReq.KnownChildConvention {
+
+ final override val supportsColumnar: Boolean = {
+ batchType() != Convention.BatchType.None
+ }
+
+ override def batchType(): Convention.BatchType
+
+ final override val supportsRowBased: Boolean = {
+ rowType() != Convention.RowType.None
+ }
+
+ override def rowType0(): Convention.RowType
+
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ // In the normal case, children's convention should follow parent node's
convention.
+ val childReq = Convention.of(rowType(), batchType()).asReq()
+ Seq.tabulate(children.size)(
+ _ => {
+ childReq
+ })
+ }
+}
+
+object GlutenPlan {
+ // To be compatible with Spark (version < 3.3)
+ trait SupportsRowBasedCompatible {
+ def supportsRowBased(): Boolean = {
+ throw new GlutenException("Illegal state: The method is not expected to
be called")
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index 568ea50396..20a238a47d 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.extension.columnar.enumerated.planner.plan
+import org.apache.gluten.execution.GlutenPlan
import
org.apache.gluten.extension.columnar.enumerated.planner.metadata.GlutenMetadata
import org.apache.gluten.extension.columnar.enumerated.planner.property.{Conv,
ConvDef}
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
@@ -37,13 +38,15 @@ object GlutenPlanModel {
PlanModelImpl
}
+ // TODO: Make this inherit from GlutenPlan.
case class GroupLeafExec(
groupId: Int,
metadata: GlutenMetadata,
constraintSet: PropertySet[SparkPlan])
extends LeafExecNode
with Convention.KnownBatchType
- with Convention.KnownRowTypeForSpark33AndLater {
+ with Convention.KnownRowTypeForSpark33AndLater
+ with GlutenPlan.SupportsRowBasedCompatible {
private val req: Conv.Req =
constraintSet.get(ConvDef).asInstanceOf[Conv.Req]
override protected def doExecute(): RDD[InternalRow] = throw new
IllegalStateException()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
index 7b2b801ac9..cfb32e7644 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
@@ -92,7 +92,7 @@ object ConvDef extends PropertyDef[SparkPlan, Conv] {
override def getChildrenConstraints(
constraint: Property[SparkPlan],
plan: SparkPlan): Seq[Conv] = {
- val out = List.tabulate(plan.children.size)(_ =>
Conv.req(ConventionReq.get(plan)))
+ val out = ConventionReq.get(plan).map(Conv.req)
out
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index b57f3e0c0a..b9fbe023b2 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.exception.GlutenException
-
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec,
SparkPlan}
import org.apache.spark.util.SparkVersionUtil
@@ -55,6 +53,19 @@ object Convention {
}
Convention.of(rowType(), batchType())
}
+
+ def asReq(): ConventionReq = {
+ val rowTypeReq = conv.rowType match {
+ case Convention.RowType.None => ConventionReq.RowType.Any
+ case r => ConventionReq.RowType.Is(r)
+ }
+
+ val batchTypeReq = conv.batchType match {
+ case Convention.BatchType.None => ConventionReq.BatchType.Any
+ case b => ConventionReq.BatchType.Is(b)
+ }
+ ConventionReq.of(rowTypeReq, batchTypeReq)
+ }
}
private case class Impl(override val rowType: RowType, override val
batchType: BatchType)
@@ -142,19 +153,10 @@ object Convention {
def batchType(): BatchType
}
- sealed trait KnownRowType extends KnownRowType.SupportsRowBasedCompatible {
+ sealed trait KnownRowType {
def rowType(): RowType
}
- object KnownRowType {
- // To be compatible with Spark (version < 3.3)
- sealed trait SupportsRowBasedCompatible {
- def supportsRowBased(): Boolean = {
- throw new GlutenException("Illegal state: The method is not expected
to be called")
- }
- }
- }
-
trait KnownRowTypeForSpark33AndLater extends KnownRowType {
this: SparkPlan =>
import KnownRowTypeForSpark33AndLater._
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index 5cb3d44a15..2be9271a1f 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension.columnar.transition
import org.apache.gluten.backend.Backend
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnionExec}
@@ -28,14 +28,14 @@ import
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
/** ConventionFunc is a utility to derive [[Convention]] or [[ConventionReq]]
from a query plan. */
sealed trait ConventionFunc {
def conventionOf(plan: SparkPlan): Convention
- def conventionReqOf(plan: SparkPlan): ConventionReq
+ def conventionReqOf(plan: SparkPlan): Seq[ConventionReq]
}
object ConventionFunc {
trait Override {
def rowTypeOf: PartialFunction[SparkPlan, Convention.RowType] =
PartialFunction.empty
def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType] =
PartialFunction.empty
- def conventionReqOf: PartialFunction[SparkPlan, ConventionReq] =
PartialFunction.empty
+ def conventionReqOf: PartialFunction[SparkPlan, Seq[ConventionReq]] =
PartialFunction.empty
}
object Override {
@@ -72,7 +72,6 @@ object ConventionFunc {
}
private class BuiltinFunc(o: Override) extends ConventionFunc {
- import BuiltinFunc._
override def conventionOf(plan: SparkPlan): Convention = {
val conv = conventionOf0(plan)
conv
@@ -158,61 +157,45 @@ object ConventionFunc {
}
}
- override def conventionReqOf(plan: SparkPlan): ConventionReq = {
+ override def conventionReqOf(plan: SparkPlan): Seq[ConventionReq] = {
val req = o.conventionReqOf.applyOrElse(plan, conventionReqOf0)
+ assert(req.size == plan.children.size)
req
}
- private def conventionReqOf0(plan: SparkPlan): ConventionReq = plan match {
- case k: KnownChildrenConventions =>
- val reqs = k.requiredChildrenConventions().distinct
- // This can be a temporary restriction.
- assert(
- reqs.size == 1,
- "KnownChildrenConventions#requiredChildrenConventions should output
the same element" +
- " for all children")
- reqs.head
+ private def conventionReqOf0(plan: SparkPlan): Seq[ConventionReq] = plan
match {
+ case k: KnownChildConvention =>
+ val reqs = k.requiredChildConvention()
+ reqs
case RowToColumnarLike(_) =>
- ConventionReq.of(
- ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
- ConventionReq.BatchType.Any)
+ Seq(
+ ConventionReq.of(
+ ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+ ConventionReq.BatchType.Any))
case ColumnarToRowExec(_) =>
- ConventionReq.of(
- ConventionReq.RowType.Any,
- ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatch))
+ Seq(
+ ConventionReq.of(
+ ConventionReq.RowType.Any,
+ ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatch)))
case write: DataWritingCommandExec if
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
// To align with
ApplyColumnarRulesAndInsertTransitions#insertTransitions
- ConventionReq.any
+ Seq(ConventionReq.any)
case u: UnionExec =>
// We force vanilla union to output row data to get the best
compatibility with vanilla
// Spark.
// As a result it's a common practice to rewrite it with GlutenPlan
for offloading.
- ConventionReq.of(
- ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
- ConventionReq.BatchType.Any)
+ Seq.tabulate(u.children.size)(
+ _ =>
+ ConventionReq.of(
+ ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+ ConventionReq.BatchType.Any))
case other =>
// In the normal case, children's convention should follow parent
node's convention.
- // Note, we don't have to consider C2R / R2C here since they are
already removed by
- // RemoveTransitions.
- val thisConv = conventionOf0(other)
- thisConv.asReq()
- }
- }
-
- private object BuiltinFunc {
- implicit private class ConventionOps(conv: Convention) {
- def asReq(): ConventionReq = {
- val rowTypeReq = conv.rowType match {
- case Convention.RowType.None => ConventionReq.RowType.Any
- case r => ConventionReq.RowType.Is(r)
- }
-
- val batchTypeReq = conv.batchType match {
- case Convention.BatchType.None => ConventionReq.BatchType.Any
- case b => ConventionReq.BatchType.Is(b)
- }
- ConventionReq.of(rowTypeReq, batchTypeReq)
- }
+ val childReq = conventionOf0(other).asReq()
+ Seq.tabulate(other.children.size)(
+ _ => {
+ childReq
+ })
}
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index a081f21434..86637f2d5a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -55,12 +55,12 @@ object ConventionReq {
val row: ConventionReq = ofRow(RowType.Is(Convention.RowType.VanillaRow))
val vanillaBatch: ConventionReq =
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatch))
- def get(plan: SparkPlan): ConventionReq =
ConventionFunc.create().conventionReqOf(plan)
+ def get(plan: SparkPlan): Seq[ConventionReq] =
ConventionFunc.create().conventionReqOf(plan)
def of(rowType: RowType, batchType: BatchType): ConventionReq =
Impl(rowType, batchType)
def ofRow(rowType: RowType): ConventionReq = Impl(rowType, BatchType.Any)
def ofBatch(batchType: BatchType): ConventionReq = Impl(RowType.Any,
batchType)
- trait KnownChildrenConventions {
- def requiredChildrenConventions(): Seq[ConventionReq]
+ trait KnownChildConvention {
+ def requiredChildConvention(): Seq[ConventionReq]
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 10d50f453d..cec09ee7a1 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -42,13 +42,13 @@ case class InsertTransitions(convReq: ConventionReq)
extends Rule[SparkPlan] {
if (node.children.isEmpty) {
return node
}
- val convReq = convFunc.conventionReqOf(node)
- val newChildren = node.children.map {
- child =>
+ val convReqs = convFunc.conventionReqOf(node)
+ val newChildren = node.children.zip(convReqs).map {
+ case (child, convReq) =>
val from = convFunc.conventionOf(child)
if (from.isNone) {
// For example, a union op with row child and columnar child at the
same time,
- // The plan is actually not executable and we cannot tell about its
convention.
+ // The plan is actually not executable, and we cannot tell about its
convention.
child
} else {
val transition =
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 2830ef404c..f9755605ca 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -19,7 +19,8 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter,
ExpressionTransformer}
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.`type`.TypeBuilder
import org.apache.gluten.substrait.SubstraitContext
@@ -261,13 +262,17 @@ abstract class ProjectExecTransformerBase(val list:
Seq[NamedExpression], val in
}
// An alternatives for UnionExec.
-case class ColumnarUnionExec(children: Seq[SparkPlan]) extends GlutenPlan {
+case class ColumnarUnionExec(children: Seq[SparkPlan]) extends ValidatablePlan
{
children.foreach {
case w: WholeStageTransformer =>
w.setOutputSchemaForPlan(output)
case _ =>
}
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
override def output: Seq[Attribute] = {
children.map(_.output).transpose.map {
attrs =>
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 28fb691896..3e3169aa55 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -18,7 +18,8 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ExpressionConverter
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.rel.RelBuilder
@@ -45,6 +46,8 @@ import java.io.{IOException, ObjectOutputStream}
*/
case class ColumnarCartesianProductBridge(child: SparkPlan) extends
UnaryExecNode with GlutenPlan {
override def output: Seq[Attribute] = child.output
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] =
throw new UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
child.executeColumnar()
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
index 3b13207c93..107bf544cd 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
@@ -16,7 +16,8 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
@@ -28,7 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCoalesceExec(numPartitions: Int, child: SparkPlan)
extends UnaryExecNode
- with GlutenPlan {
+ with ValidatablePlan {
override def output: Seq[Attribute] = child.output
@@ -36,6 +37,10 @@ case class ColumnarCoalesceExec(numPartitions: Int, child:
SparkPlan)
if (numPartitions == 1) SinglePartition else
UnknownPartitioning(numPartitions)
}
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException()
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index fae3115981..8bf1fbe447 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -17,9 +17,8 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
@@ -30,8 +29,8 @@ import
org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
abstract class ColumnarToRowExecBase(child: SparkPlan)
extends ColumnarToRowTransition
- with GlutenPlan
- with KnownChildrenConventions {
+ with KnownChildConvention
+ with ValidatablePlan {
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics =
@@ -47,6 +46,12 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ List(
+ ConventionReq.ofBatch(
+
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
+ }
+
override def doExecuteBroadcast[T](): Broadcast[T] = {
// Require for explicit implementation, otherwise throw error.
super.doExecuteBroadcast[T]()
@@ -57,10 +62,4 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
override def doExecute(): RDD[InternalRow] = {
doExecuteInternal()
}
-
- override def requiredChildrenConventions(): Seq[ConventionReq] = {
- List(
- ConventionReq.ofBatch(
-
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
- }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
index 2a52616361..9f36b8fc67 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
@@ -17,8 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
@@ -46,8 +45,14 @@ abstract class RowToColumnarExecBase(child: SparkPlan)
final override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
override def rowType0(): Convention.RowType = Convention.RowType.None
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ Seq(ConventionReq.row)
+ }
+
final override def doExecute(): RDD[InternalRow] = {
child.execute()
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
index c960bda249..f19960ec1c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
@@ -17,7 +17,8 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -40,9 +41,11 @@ case class TakeOrderedAndProjectExecTransformer(
child: SparkPlan,
offset: Int = 0)
extends UnaryExecNode
- with GlutenPlan {
+ with ValidatablePlan {
override def outputPartitioning: Partitioning = SinglePartition
override def outputOrdering: Seq[SortOrder] = sortOrder
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override def output: Seq[Attribute] = {
projectList.map(_.toAttribute)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 6414b67a80..dbfc11c136 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -18,14 +18,17 @@ package org.apache.gluten.execution
import org.apache.gluten.{GlutenConfig, GlutenNumaBindingInfo}
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
import org.apache.gluten.expression._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater}
import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode}
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo}
+import org.apache.gluten.test.TestStats
import org.apache.gluten.utils.SubstraitPlanPrinterUtil
import org.apache.spark._
@@ -53,7 +56,74 @@ case class TransformContext(outputAttributes:
Seq[Attribute], root: RelNode)
case class WholeStageTransformContext(root: PlanNode, substraitContext:
SubstraitContext = null)
-trait TransformSupport extends GlutenPlan {
+/**
+ * Base interface for a Gluten query plan that is also open to validation
calls.
+ *
+ * Since https://github.com/apache/incubator-gluten/pull/2185.
+ */
+trait ValidatablePlan extends GlutenPlan with LogLevelUtil {
+ protected def glutenConf: GlutenConfig = GlutenConfig.getConf
+
+ protected lazy val enableNativeValidation = glutenConf.enableNativeValidation
+
+ /**
+ * Validate whether this SparkPlan supports to be transformed into substrait
node in Native Code.
+ */
+ final def doValidate(): ValidationResult = {
+ val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
+ .doSchemaValidate(schema)
+ .map {
+ reason =>
+ ValidationResult.failed(s"Found schema check failure for $schema,
due to: $reason")
+ }
+ .getOrElse(ValidationResult.succeeded)
+ if (!schemaValidationResult.ok()) {
+ TestStats.addFallBackClassName(this.getClass.toString)
+ return schemaValidationResult
+ }
+ try {
+ TransformerState.enterValidation
+ val res = doValidateInternal()
+ if (!res.ok()) {
+ TestStats.addFallBackClassName(this.getClass.toString)
+ }
+ res
+ } catch {
+ case e @ (_: GlutenNotSupportException | _:
UnsupportedOperationException) =>
+ if (!e.isInstanceOf[GlutenNotSupportException]) {
+ logDebug(s"Just a warning. This exception perhaps needs to be
fixed.", e)
+ }
+ // FIXME: Use a validation-specific method to catch validation failures
+ TestStats.addFallBackClassName(this.getClass.toString)
+ logValidationMessage(
+ s"Validation failed with exception for plan: $nodeName, due to:
${e.getMessage}",
+ e)
+ ValidationResult.failed(e.getMessage)
+ } finally {
+ TransformerState.finishValidation
+ }
+ }
+
+ protected def doValidateInternal(): ValidationResult =
ValidationResult.succeeded
+
+ private def logValidationMessage(msg: => String, e: Throwable): Unit = {
+ if (glutenConf.printStackOnValidationFailure) {
+ logOnLevel(glutenConf.validationLogLevel, msg, e)
+ } else {
+ logOnLevel(glutenConf.validationLogLevel, msg)
+ }
+ }
+}
+
+/** Base interface for a query plan that can be interpreted to Substrait
representation. */
+trait TransformSupport extends ValidatablePlan {
+ override def batchType(): Convention.BatchType = {
+ BackendsApiManager.getSettings.primaryBatchType
+ }
+
+ override def rowType0(): Convention.RowType = {
+ Convention.RowType.None
+ }
final override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
@@ -68,6 +138,17 @@ trait TransformSupport extends GlutenPlan {
*/
def columnarInputRDDs: Seq[RDD[ColumnarBatch]]
+ // Since https://github.com/apache/incubator-gluten/pull/2185.
+ protected def doNativeValidation(context: SubstraitContext, node: RelNode):
ValidationResult = {
+ if (node != null && glutenConf.enableNativeValidation) {
+ val planNode = PlanBuilder.makePlan(context, Lists.newArrayList(node))
+ BackendsApiManager.getValidatorApiInstance
+ .doNativeValidateWithFailureReason(planNode)
+ } else {
+ ValidationResult.succeeded
+ }
+ }
+
final def transform(context: SubstraitContext): TransformContext = {
if (isCanonicalizedPlan) {
throw new IllegalStateException(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 3639ac522f..b49917c59f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -16,25 +16,12 @@
*/
package org.apache.gluten.extension
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.expression.TransformerState
import org.apache.gluten.extension.columnar.FallbackTag
import org.apache.gluten.extension.columnar.FallbackTag.{Appendable, Converter}
import org.apache.gluten.extension.columnar.FallbackTags.add
-import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.extension.columnar.validator.Validator
-import org.apache.gluten.logging.LogLevelUtil
-import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.substrait.rel.RelNode
-import org.apache.gluten.test.TestStats
import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.execution.SparkPlan
-
-import com.google.common.collect.Lists
sealed trait ValidationResult {
def ok(): Boolean
@@ -80,88 +67,3 @@ object ValidationResult {
}
}
}
-
-/** Every Gluten Operator should extend this trait. */
-trait GlutenPlan
- extends SparkPlan
- with Convention.KnownBatchType
- with Convention.KnownRowTypeForSpark33AndLater
- with LogLevelUtil {
- protected lazy val enableNativeValidation = glutenConf.enableNativeValidation
-
- protected def glutenConf: GlutenConfig = GlutenConfig.getConf
-
- /**
- * Validate whether this SparkPlan supports to be transformed into substrait
node in Native Code.
- */
- final def doValidate(): ValidationResult = {
- val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
- .doSchemaValidate(schema)
- .map {
- reason =>
- ValidationResult.failed(s"Found schema check failure for $schema,
due to: $reason")
- }
- .getOrElse(ValidationResult.succeeded)
- if (!schemaValidationResult.ok()) {
- TestStats.addFallBackClassName(this.getClass.toString)
- return schemaValidationResult
- }
- try {
- TransformerState.enterValidation
- val res = doValidateInternal()
- if (!res.ok()) {
- TestStats.addFallBackClassName(this.getClass.toString)
- }
- res
- } catch {
- case e @ (_: GlutenNotSupportException | _:
UnsupportedOperationException) =>
- if (!e.isInstanceOf[GlutenNotSupportException]) {
- logDebug(s"Just a warning. This exception perhaps needs to be
fixed.", e)
- }
- // FIXME: Use a validation-specific method to catch validation failures
- TestStats.addFallBackClassName(this.getClass.toString)
- logValidationMessage(
- s"Validation failed with exception for plan: $nodeName, due to:
${e.getMessage}",
- e)
- ValidationResult.failed(e.getMessage)
- } finally {
- TransformerState.finishValidation
- }
- }
-
- final override val supportsColumnar: Boolean = {
- batchType() != Convention.BatchType.None
- }
-
- override def batchType(): Convention.BatchType = {
- BackendsApiManager.getSettings.primaryBatchType
- }
-
- final override val supportsRowBased: Boolean = {
- rowType() != Convention.RowType.None
- }
-
- override def rowType0(): Convention.RowType = {
- Convention.RowType.None
- }
-
- protected def doValidateInternal(): ValidationResult =
ValidationResult.succeeded
-
- protected def doNativeValidation(context: SubstraitContext, node: RelNode):
ValidationResult = {
- if (node != null && glutenConf.enableNativeValidation) {
- val planNode = PlanBuilder.makePlan(context, Lists.newArrayList(node))
- BackendsApiManager.getValidatorApiInstance
- .doNativeValidateWithFailureReason(planNode)
- } else {
- ValidationResult.succeeded
- }
- }
-
- private def logValidationMessage(msg: => String, e: Throwable): Unit = {
- if (glutenConf.printStackOnValidationFailure) {
- logOnLevel(glutenConf.validationLogLevel, msg, e)
- } else {
- logOnLevel(glutenConf.validationLogLevel, msg)
- }
- }
-}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
index 49401797f9..982abc16c5 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.enumerated
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan, ValidatablePlan}
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
import org.apache.gluten.extension.columnar.rewrite.RewriteSingleNode
import org.apache.gluten.extension.columnar.validator.Validator
@@ -119,7 +119,9 @@ object RasOffload {
validator.validate(from) match {
case Validator.Passed =>
val offloadedPlan = base.offload(from)
- val offloadedNodes = offloadedPlan.collect[GlutenPlan] { case
t: GlutenPlan => t }
+ val offloadedNodes = offloadedPlan.collect[ValidatablePlan] {
+ case t: ValidatablePlan => t
+ }
val outComes =
offloadedNodes.map(_.doValidate()).filter(!_.ok())
if (outComes.nonEmpty) {
// 5. If native validation fails on at least one of the
offloaded nodes, return
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
index 5b5d5e541e..fd56240616 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
@@ -16,8 +16,7 @@
*/
package org.apache.gluten.extension.columnar.enumerated
-import org.apache.gluten.execution.{HashAggregateExecBaseTransformer,
ShuffledHashJoinExecTransformerBase, SortExecTransformer}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan,
HashAggregateExecBaseTransformer, ShuffledHashJoinExecTransformerBase,
SortExecTransformer}
import org.apache.gluten.ras.path.Pattern._
import org.apache.gluten.ras.path.Pattern.Matchers._
import org.apache.gluten.ras.rule.{RasRule, Shape}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
index 44ed81f565..3418d3dddc 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
@@ -17,12 +17,11 @@
package org.apache.gluten.extension.columnar.heuristic
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.{FallbackTag, FallbackTags}
import org.apache.gluten.extension.columnar.FallbackTags.add
import org.apache.gluten.extension.columnar.transition.{BackendTransitions,
ColumnarToRowLike, RowToColumnarLike}
import org.apache.gluten.utils.PlanUtil
-
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution._
@@ -35,6 +34,7 @@ import org.apache.spark.sql.execution.exchange.Exchange
+
// format: off
/**
* Note, this rule should only fallback to row-based plan if there is no harm.
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 0fee61acac..fa698cd244 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -330,8 +329,7 @@ object OffloadOthers {
child)
case p if !p.isInstanceOf[GlutenPlan] =>
logDebug(s"Transformation for ${p.getClass} is currently not
supported.")
- val children = plan.children
- p.withNewChildren(children)
+ p
case other => other
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 7f41e81072..7e7d732c29 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.validator
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi}
import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.execution.{BasicScanExecTransformer,
ColumnarCoalesceExec, ColumnarUnionExec, ExpandExecTransformer,
HashAggregateExecBaseTransformer, LimitExecTransformer, ProjectExecTransformer,
ScanTransformerFactory, SortExecTransformer,
TakeOrderedAndProjectExecTransformer, WindowExecTransformer,
WindowGroupLimitExecTransformer, WriteFilesExecTransformer}
+import org.apache.gluten.execution._
import org.apache.gluten.expression.ExpressionUtils
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.extension.columnar.offload.OffloadJoin
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 01a4380a14..1de490ad61 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -41,7 +42,7 @@ import scala.util.control.NonFatal
case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
extends BroadcastExchangeLike
- with GlutenPlan {
+ with ValidatablePlan {
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
@transient override lazy val metrics: Map[String, SQLMetric] =
@@ -125,6 +126,10 @@ case class ColumnarBroadcastExchangeExec(mode:
BroadcastMode, child: SparkPlan)
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
override def doCanonicalize(): SparkPlan = {
val canonicalized =
BackendsApiManager.getSparkPlanExecApiInstance.doCanonicalizeForBroadcastMode(mode)
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 9ec078e003..67895b439c 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution._
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.rel.RelBuilder
@@ -173,13 +173,22 @@ case class ColumnarCollapseTransformStages(
}
}
+// TODO: Make this inherit from GlutenPlan.
case class ColumnarInputAdapter(child: SparkPlan)
extends InputAdapterGenerateTreeStringShim
- with Convention.KnownBatchType {
+ with Convention.KnownBatchType
+ with Convention.KnownRowTypeForSpark33AndLater
+ with GlutenPlan.SupportsRowBasedCompatible
+ with ConventionReq.KnownChildConvention {
override def output: Seq[Attribute] = child.output
- override val supportsColumnar: Boolean = true
+ final override val supportsColumnar: Boolean = true
+ final override val supportsRowBased: Boolean = false
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def requiredChildConvention(): Seq[ConventionReq] = Seq(
+ ConventionReq.ofBatch(
+
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
child.executeColumnar()
override def outputPartitioning: Partitioning = child.outputPartitioning
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index d4b33be292..007a93bd06 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -18,7 +18,9 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark._
@@ -45,7 +47,7 @@ case class ColumnarShuffleExchangeExec(
projectOutputAttributes: Seq[Attribute],
advisoryPartitionSize: Option[Long] = None)
extends ShuffleExchangeLike
- with GlutenPlan {
+ with ValidatablePlan {
private[sql] lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
@@ -148,6 +150,10 @@ case class ColumnarShuffleExchangeExec(
super.stringArgs ++ Iterator(s"[shuffle_writer_type=$shuffleWriterType]")
}
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException()
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
index 2c1edd04bb..6275fbb3aa 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.spark.rdd.RDD
@@ -107,6 +108,10 @@ case class ColumnarSubqueryBroadcastExec(
relationFuture
}
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(
"SubqueryBroadcastExec does not support the execute() code path.")
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index 25d6c4ed61..45f2026370 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -18,9 +18,9 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
import org.apache.gluten.extension.columnar.transition.Convention.RowType
-import org.apache.gluten.extension.columnar.transition.ConventionReq
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.TaskContext
@@ -44,17 +44,19 @@ abstract class ColumnarWriteFilesExec protected (
override val right: SparkPlan)
extends BinaryExecNode
with GlutenPlan
- with ConventionReq.KnownChildrenConventions
with ColumnarWriteFilesExec.ExecuteWriteCompatible {
val child: SparkPlan = left
override lazy val references: AttributeSet = AttributeSet.empty
- override def requiredChildrenConventions(): Seq[ConventionReq] = {
- List(
- ConventionReq.ofBatch(
-
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ val req = ConventionReq.ofBatch(
+
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType))
+ Seq.tabulate(2)(
+ _ => {
+ req
+ })
}
/**
@@ -67,6 +69,7 @@ abstract class ColumnarWriteFilesExec protected (
*
* Since https://github.com/apache/incubator-gluten/pull/6745.
*/
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
override def rowType0(): RowType = {
RowType.VanillaRow
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index d6167c931c..7a33350524 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -16,8 +16,7 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.execution.WholeStageTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PlanUtil
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
index 481e16b0a5..c28395941b 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.events.GlutenPlanFallbackEvent
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.logging.LogLevelUtil
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 435bf9239e..709673feab 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -16,16 +16,14 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.execution.WholeStageTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
import org.apache.gluten.utils.PlanUtil
-
import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AQEShuffleReadExec, QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index fb42c55ba0..126417bf18 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -17,9 +17,8 @@
package org.apache.spark.sql.execution.datasources
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.ColumnarToRowExecBase
+import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan}
import org.apache.gluten.execution.datasource.GlutenFormatFactory
-import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.{Convention,
Transitions}
import org.apache.spark.rdd.RDD
@@ -61,6 +60,8 @@ case class FakeRowAdaptor(child: SparkPlan)
override def output: Seq[Attribute] = child.output
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
override protected def doExecute(): RDD[InternalRow] = {
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
index 5daca9bede..fec36ac1ac 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -17,8 +17,7 @@
package org.apache.gluten.extension.columnar.transition
import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.ColumnarToColumnarExec
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -117,6 +116,7 @@ object TransitionSuite extends TransitionSuiteBase {
extends RowToColumnarTransition
with GlutenPlan {
override def batchType(): Convention.BatchType = toBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
copy(child = newChild)
override protected def doExecute(): RDD[InternalRow] =
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index 43805b3d65..7ab9b5d0cf 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -27,6 +27,7 @@ trait TransitionSuiteBase {
case class BatchLeaf(override val batchType: Convention.BatchType)
extends LeafExecNode
with GlutenPlan {
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
@@ -36,6 +37,7 @@ trait TransitionSuiteBase {
case class BatchUnary(override val batchType: Convention.BatchType, override
val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
copy(child = newChild)
@@ -51,6 +53,7 @@ trait TransitionSuiteBase {
override val right: SparkPlan)
extends BinaryExecNode
with GlutenPlan {
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def withNewChildrenInternal(
newLeft: SparkPlan,
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
index 3d26dd16c4..2a0ecaf92c 100644
--- a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
+++ b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.test
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
b/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
index 164083a8d8..8507233a57 100644
--- a/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
+++ b/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
@@ -21,8 +21,8 @@ package org.apache.spark.sql
* 1. We need to modify the way org.apache.spark.sql.CHQueryTest#compare
compares double
*/
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.execution.TransformSupport
-import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SPARK_VERSION_SHORT
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 7d4315e8d7..754c292418 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.RemoveFallbackTagRule
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention,
InsertBackendTransitions}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -200,6 +201,8 @@ private object FallbackStrategiesSuite {
// For replacing LeafOp.
case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
@@ -208,6 +211,8 @@ private object FallbackStrategiesSuite {
case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan):
UnaryOp1Transformer =
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index 7a3c4d1056..e525eb1a9c 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.statistics
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index e8cc7898c2..88e0ecf65a 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.gluten.extension.columnar.{FallbackTags,
RemoveFallbackTagRule}
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention,
InsertBackendTransitions}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -229,6 +230,8 @@ private object FallbackStrategiesSuite {
// For replacing LeafOp.
case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
@@ -237,6 +240,8 @@ private object FallbackStrategiesSuite {
case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan):
UnaryOp1Transformer =
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index 40874cd69d..34e97273d6 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.statistics
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index e8cc7898c2..88e0ecf65a 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.gluten.extension.columnar.{FallbackTags,
RemoveFallbackTagRule}
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention,
InsertBackendTransitions}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -229,6 +230,8 @@ private object FallbackStrategiesSuite {
// For replacing LeafOp.
case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
@@ -237,6 +240,8 @@ private object FallbackStrategiesSuite {
case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan):
UnaryOp1Transformer =
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index 74c4df1977..e6cc2937a4 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.sources
import org.apache.gluten.execution.{ProjectExecTransformer,
SortExecTransformer}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.spark.SparkConf
import org.apache.spark.executor.OutputMetrics
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index e5e122c906..f7cc114859 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.statistics
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 1d45e8a672..8908047a33 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
import org.apache.gluten.extension.columnar.{FallbackTags,
RemoveFallbackTagRule}
import
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
import
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy,
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention,
InsertBackendTransitions}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -230,6 +231,8 @@ private object FallbackStrategiesSuite {
// For replacing LeafOp.
case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = Seq.empty
}
@@ -238,6 +241,8 @@ private object FallbackStrategiesSuite {
case class UnaryOp1Transformer(override val child: SparkPlan)
extends UnaryExecNode
with GlutenPlan {
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: SparkPlan):
UnaryOp1Transformer =
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index 1cb905e10a..38e032aec3 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.sources
import org.apache.gluten.GlutenColumnarWriteTestSupport
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.execution.SortExecTransformer
-import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.SparkConf
import org.apache.spark.executor.OutputMetrics
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index e5e122c906..f7cc114859 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.statistics
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
index d2a9611471..2088c90c01 100644
---
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
+++
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.{ProjectExecTransformer,
WholeStageTransformerSuite}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{ProjectExecTransformer, TransformSupport,
WholeStageTransformerSuite}
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.spark.SparkConf
@@ -100,7 +99,8 @@ class GlutenExpressionDataTypesValidation extends
WholeStageTransformerSuite {
case _ => throw new UnsupportedOperationException("Not supported type: "
+ t)
}
}
- def generateGlutenProjectPlan(expr: Expression): GlutenPlan = {
+
+ def generateGlutenProjectPlan(expr: Expression): TransformSupport = {
val namedExpr = Seq(Alias(expr, "r")())
ProjectExecTransformer(namedExpr, DummyPlan())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]