This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 10182a52d [CORE] Rework planner C2R / R2C code with new transition
facilities (#5767)
10182a52d is described below
commit 10182a52d409cb659169cbfe1d7f1869d9205dce
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon May 20 15:46:39 2024 +0800
[CORE] Rework planner C2R / R2C code with new transition facilities (#5767)
---
.../clickhouse/CHSparkPlanExecApi.scala | 24 +--
.../gluten/backendsapi/clickhouse/package.scala | 37 ++++
...nClickHouseTPCDSParquetGraceHashJoinSuite.scala | 2 +-
...nClickHouseTPCDSParquetSortMergeJoinSuite.scala | 2 +-
.../GlutenClickHouseTPCDSParquetSuite.scala | 2 +-
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 50 ++---
.../apache/gluten/backendsapi/velox/package.scala | 43 ++++
.../datasource/v2/ArrowBatchScanExec.scala | 6 +
.../python/ColumnarArrowEvalPythonExec.scala | 2 +-
.../sql/execution/ArrowFileSourceScanExec.scala | 6 +
.../sql/execution/VeloxParquetWriteSuite.scala | 2 +-
.../gluten/backendsapi/SparkPlanExecApi.scala | 21 +-
.../gluten/expression/ExpressionConverter.scala | 19 +-
.../gluten/extension/ColumnarOverrides.scala | 3 +-
.../org/apache/gluten/extension/GlutenPlan.scala | 17 +-
.../extension/columnar/ColumnarTransitions.scala | 109 ----------
.../extension/columnar/ExpandFallbackPolicy.scala | 15 +-
.../extension/columnar/MiscColumnarRules.scala | 32 +--
.../columnar/enumerated/EnumeratedApplier.scala | 8 +-
.../columnar/enumerated/PushFilterToScan.scala | 6 +
.../columnar/heuristic/HeuristicApplier.scala | 8 +-
.../extension/columnar/transition/Convention.scala | 113 ++++++++++
.../columnar/transition/ConventionFunc.scala | 115 ++++++++++
.../columnar/transition/ConventionReq.scala | 54 +++++
.../extension/columnar/transition/Transition.scala | 186 ++++++++++++++++
.../columnar/transition/Transitions.scala | 164 +++++++++++++++
.../extension/columnar/transition/package.scala | 58 +++++
.../gluten/planner/cost/GlutenCostModel.scala | 7 +-
.../gluten/planner/property/Convention.scala | 26 +--
.../scala/org/apache/gluten/utils/PlanUtil.scala | 46 +---
.../ColumnarCollapseTransformStages.scala | 15 +-
.../datasources/GlutenWriterColumnarRules.scala | 3 +-
.../execution/WholeStageTransformerSuite.scala | 3 +-
.../columnar/transition/TransitionSuite.scala | 234 +++++++++++++++++++++
.../org/apache/gluten/test}/FallbackUtil.scala | 8 +-
.../apache/gluten/columnarbatch/ArrowBatch.scala | 41 ++++
gluten-ut/pom.xml | 7 +
.../sql/execution/FallbackStrategiesSuite.scala | 2 +-
.../benchmarks/ParquetReadBenchmark.scala | 5 +-
.../spark/sql/GlutenStringFunctionsSuite.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../benchmarks/ParquetReadBenchmark.scala | 5 +-
.../spark/sql/GlutenStringFunctionsSuite.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../benchmarks/ParquetReadBenchmark.scala | 4 +-
.../spark/sql/GlutenStringFunctionsSuite.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 3 +-
.../benchmarks/ParquetReadBenchmark.scala | 5 +-
.../org/apache/gluten/sql/shims/SparkShims.scala | 2 +
.../gluten/sql/shims/spark34/Spark34Shims.scala | 6 +
.../gluten/sql/shims/spark35/Spark35Shims.scala | 8 +-
51 files changed, 1212 insertions(+), 334 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 465041621..cb706d817 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -25,6 +25,7 @@ import
org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.extension.{CountDistinctWithoutExpand,
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage,
RewriteToDateExpresstionRule}
import org.apache.gluten.extension.columnar.AddTransformHintRule
import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.CHJoinValidateUtil
@@ -71,6 +72,9 @@ import scala.collection.mutable.ArrayBuffer
class CHSparkPlanExecApi extends SparkPlanExecApi {
+ /** The columnar-batch type this backend is using. */
+ override def batchType: Convention.BatchType = CHBatch
+
/** Transform GetArrayItem to Substrait. */
override def genGetArrayItemExpressionNode(
substraitExprName: String,
@@ -89,26 +93,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
ConverterUtils.getTypeNode(original.dataType, original.nullable))
}
- /**
- * Generate ColumnarToRowExecBase.
- *
- * @param child
- * @return
- */
- override def genColumnarToRowExec(child: SparkPlan): ColumnarToRowExecBase =
{
- CHColumnarToRowExec(child)
- }
-
- /**
- * Generate RowToColumnarExec.
- *
- * @param child
- * @return
- */
- override def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase =
{
- RowToCHNativeColumnarExec(child)
- }
-
override def genProjectExecTransformer(
projectList: Seq[NamedExpression],
child: SparkPlan): ProjectExecTransformer = {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/package.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/package.scala
new file mode 100644
index 000000000..8704fac7b
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/package.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi
+
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.execution.{CHColumnarToRowExec,
RowToCHNativeColumnarExec, SparkPlan}
+
+package object clickhouse {
+ case object CHBatch extends Convention.BatchType {
+ fromRow(
+ () =>
+ (plan: SparkPlan) => {
+ RowToCHNativeColumnarExec(plan)
+ })
+
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ CHColumnarToRowExec(plan)
+ })
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
index 0fe04ea2a..0b7ad9a6d 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Not}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
index bbedcda18..b1b9841a3 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.spark.SparkConf
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
index 27efba648..a63e47888 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression,
Not}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index cf7d38d62..dc6bafdea 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -21,12 +21,13 @@ import org.apache.gluten.backendsapi.SparkPlanExecApi
import org.apache.gluten.datasource.ArrowConvertorRule
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
-import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
import org.apache.gluten.expression._
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.expression.aggregate.{HLLAdapter,
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
-import org.apache.gluten.extension.{ArrowScanReplaceRule,
BloomFilterMightContainJointRewriteRule, CollectRewriteRule,
FlushableHashAggregateRule, HLLRewriteRule}
+import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar.TransformHints
+import org.apache.gluten.extension.columnar.transition.Convention
+import
org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, IfThenNode}
import org.apache.gluten.vectorized.{ColumnarBatchSerializer,
ColumnarBatchSerializeResult}
@@ -50,6 +51,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation,
HashedRelationBroadcastMode}
@@ -73,6 +75,22 @@ import scala.collection.mutable.ListBuffer
class VeloxSparkPlanExecApi extends SparkPlanExecApi {
+ /** The columnar-batch type this backend is using. */
+ override def batchType: Convention.BatchType = {
+ VeloxBatch
+ }
+
+ /**
+ * Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
+ * determine the convention (its row-based processing / columnar-batch
processing support) of a
+ * plan with a user-defined function that accepts a plan then returns batch
type it outputs.
+ */
+ override def batchTypeFunc(): BatchOverride = {
+ case i: InMemoryTableScanExec
+ if
i.relation.cacheBuilder.serializer.isInstanceOf[ColumnarCachedBatchSerializer]
=>
+ VeloxBatch
+ }
+
/**
* Transform GetArrayItem to Substrait.
*
@@ -275,28 +293,6 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, children, expr)
}
- /**
- * * Plans.
- */
-
- /**
- * Generate ColumnarToRowExecBase.
- *
- * @param child
- * @return
- */
- override def genColumnarToRowExec(child: SparkPlan): ColumnarToRowExecBase =
- VeloxColumnarToRowExec(child)
-
- /**
- * Generate RowToColumnarExec.
- *
- * @param child
- * @return
- */
- override def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase =
- RowToVeloxColumnarExec(child)
-
/**
* Generate FilterExecTransformer.
*
@@ -857,10 +853,4 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
case other => other
}
}
-
- override def outputNativeColumnarSparkCompatibleData(plan: SparkPlan):
Boolean = plan match {
- case _: ArrowFileSourceScanExec => true
- case _: ArrowBatchScanExec => true
- case _ => false
- }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/package.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/package.scala
new file mode 100644
index 000000000..8ab68b7fe
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/package.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi
+
+import org.apache.gluten.columnarbatch.ArrowBatch
+import org.apache.gluten.execution.{RowToVeloxColumnarExec,
VeloxColumnarToRowExec}
+import org.apache.gluten.extension.columnar.transition.{Convention,
TransitionDef}
+
+import org.apache.spark.sql.execution.SparkPlan
+
+package object velox {
+ case object VeloxBatch extends Convention.BatchType {
+ fromRow(
+ () =>
+ (plan: SparkPlan) => {
+ RowToVeloxColumnarExec(plan)
+ })
+
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ VeloxColumnarToRowExec(plan)
+ })
+
+ // Velox batch is considered one-way compatible with Arrow batch.
+ // This is practically achieved by utilizing C++ API
VeloxColumnarBatch::from at runtime.
+ fromBatch(ArrowBatch, TransitionDef.empty)
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
index 3c1c53820..ee0acbf3f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
@@ -16,7 +16,9 @@
*/
package org.apache.gluten.execution.datasource.v2
+import org.apache.gluten.columnarbatch.ArrowBatch
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -31,6 +33,10 @@ case class ArrowBatchScanExec(original: BatchScanExec)
@transient lazy val batch: Batch = original.batch
+ override protected def batchType0(): Convention.BatchType = {
+ ArrowBatch
+ }
+
override lazy val readerFactory: PartitionReaderFactory =
original.readerFactory
override lazy val inputRDD: RDD[InternalRow] = original.inputRDD
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
index d3112c974..fd8dfc25b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
@@ -293,7 +293,7 @@ case class ColumnarArrowEvalPythonExec(
e =>
if (!e.isInstanceOf[AttributeReference]) {
throw new GlutenException(
- "ColumnarArrowEvalPythonExec should only has
[AttributeReference] inputs.")
+ "ColumnarArrowEvalPythonExec should only have
[AttributeReference] inputs.")
} else if (allInputs.exists(_.semanticEquals(e))) {
allInputs.indexWhere(_.semanticEquals(e))
} else {
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index 133bf88b3..e3298d704 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -16,7 +16,9 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.columnarbatch.ArrowBatch
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -39,6 +41,10 @@ case class ArrowFileSourceScanExec(original:
FileSourceScanExec)
override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
+ override protected def batchType0(): Convention.BatchType = {
+ ArrowBatch
+ }
+
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val scanTime = longMetric("scanTime")
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index c50c7bd75..2e4436a59 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
import org.apache.gluten.execution.VeloxWholeStageTransformerSuite
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.spark.SparkConf
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index c694d5804..a6228e671 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionFunc}
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
import org.apache.spark.ShuffleDependency
@@ -55,21 +56,15 @@ import scala.collection.JavaConverters._
trait SparkPlanExecApi {
- /**
- * Generate ColumnarToRowExecBase.
- *
- * @param child
- * @return
- */
- def genColumnarToRowExec(child: SparkPlan): ColumnarToRowExecBase
+ /** The columnar-batch type this backend is using. */
+ def batchType: Convention.BatchType
/**
- * Generate RowToColumnarExec.
- *
- * @param child
- * @return
+ * Overrides
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is
using to
+ * determine the convention (its row-based processing / columnar-batch
processing support) of a
+ * plan with a user-defined function that accepts a plan then returns batch
type it outputs.
*/
- def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase
+ def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty
/**
* Generate FilterExecTransformer.
@@ -735,6 +730,4 @@ trait SparkPlanExecApi {
arrowEvalPythonExec
def maybeCollapseTakeOrderedAndProject(plan: SparkPlan): SparkPlan = plan
-
- def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = false
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 5aacbed05..b64a23e86 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -19,10 +19,10 @@ package org.apache.gluten.expression
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.execution.{ColumnarToRowExecBase,
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.test.TestStats
-import org.apache.gluten.utils.{DecimalArithmeticUtil, PlanUtil}
+import org.apache.gluten.utils.DecimalArithmeticUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
@@ -680,20 +680,7 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
def convertBroadcastExchangeToColumnar(
exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
- val newChild = exchange.child match {
- // get WholeStageTransformer directly
- case c2r: ColumnarToRowExecBase => c2r.child
- // in fallback case
- case plan: UnaryExecNode if !PlanUtil.isGlutenColumnarOp(plan) =>
- plan.child match {
- case _: ColumnarToRowExec =>
- val wholeStageTransformer =
exchange.find(_.isInstanceOf[WholeStageTransformer])
- wholeStageTransformer.getOrElse(
-
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan))
- case _ =>
-
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)
- }
- }
+ val newChild = Transitions.toBackendBatchPlan(exchange.child)
ColumnarBroadcastExchangeExec(exchange.mode, newChild)
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
index 63127727b..067976b63 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.{GlutenConfig,
GlutenSparkExtensionsInjector}
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.utils.LogLevelUtil
import org.apache.spark.broadcast.Broadcast
@@ -115,7 +116,7 @@ case class ColumnarOverrideRules(session: SparkSession)
override def postColumnarTransitions: Rule[SparkPlan] = plan => {
val outputsColumnar = OutputsColumnarTester.inferOutputsColumnar(plan)
val unwrapped = OutputsColumnarTester.unwrap(plan)
- val vanillaPlan = ColumnarTransitions.insertTransitions(unwrapped,
outputsColumnar)
+ val vanillaPlan = Transitions.insertTransitions(unwrapped, outputsColumnar)
val applier: ColumnarRuleApplier = if (GlutenConfig.getConf.enableRas) {
new EnumeratedApplier(session)
} else {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 85901d21d..033e44b8c 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.TransformerState
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.gluten.substrait.rel.RelNode
@@ -50,7 +51,7 @@ object ValidationResult {
}
/** Every Gluten Operator should extend this trait. */
-trait GlutenPlan extends SparkPlan with LogLevelUtil {
+trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with
LogLevelUtil {
private lazy val validationLogLevel = glutenConf.validationLogLevel
private lazy val printStackOnValidationFailure =
glutenConf.printStackOnValidationFailure
@@ -85,6 +86,20 @@ trait GlutenPlan extends SparkPlan with LogLevelUtil {
}
}
+ final override def batchType(): Convention.BatchType = {
+ if (!supportsColumnar) {
+ throw new UnsupportedOperationException(
+ s"Node $nodeName doesn't support columnar-batch processing")
+ }
+ val batchType = batchType0()
+ assert(batchType != Convention.BatchType.None)
+ batchType
+ }
+
+ protected def batchType0(): Convention.BatchType = {
+ BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ }
+
protected def doValidateInternal(): ValidationResult = ValidationResult.ok
protected def doNativeValidation(context: SubstraitContext, node: RelNode):
ValidationResult = {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
deleted file mode 100644
index 5dd266433..000000000
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension.columnar
-
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.utils.PlanUtil
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
-import org.apache.spark.sql.execution.{ApplyColumnarRulesAndInsertTransitions,
ColumnarToRowExec, ColumnarToRowTransition, RowToColumnarExec,
RowToColumnarTransition, SparkPlan}
-
-/** See rule code from vanilla Spark:
[[ApplyColumnarRulesAndInsertTransitions]]. */
-case class InsertTransitions(outputsColumnar: Boolean) extends Rule[SparkPlan]
{
- private object RemoveRedundantTransitions extends Rule[SparkPlan] {
- override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
- case ColumnarToRowExec(RowToColumnarExec(child)) => child
- case RowToColumnarExec(ColumnarToRowExec(child)) => child
- }
- }
-
- private val rules = List(
- ApplyColumnarRulesAndInsertTransitions(List(), outputsColumnar),
- RemoveRedundantTransitions)
- override def apply(plan: SparkPlan): SparkPlan = rules.foldLeft(plan) {
- case (p, r) => r.apply(p)
- }
-}
-
-object RemoveTransitions extends Rule[SparkPlan] {
- import ColumnarTransitions._
- override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
- case ColumnarToRowLike(child) => child
- case RowToColumnarLike(child) => child
- }
-}
-
-// This rule will try to add RowToColumnarExecBase and ColumnarToRowExec
-// to support vanilla columnar operators.
-case class InsertColumnarToColumnarTransitions(session: SparkSession) extends
Rule[SparkPlan] {
- @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
- private def replaceWithVanillaColumnarToRow(p: SparkPlan): SparkPlan =
p.transformUp {
- case plan if PlanUtil.isGlutenColumnarOp(plan) =>
- plan.withNewChildren(plan.children.map {
- case child if PlanUtil.isVanillaColumnarOp(child) =>
- BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
- ColumnarToRowExec(child))
- case other => other
- })
- }
-
- private def replaceWithVanillaRowToColumnar(p: SparkPlan): SparkPlan =
p.transformUp {
- case plan if PlanUtil.isVanillaColumnarOp(plan) =>
- plan.withNewChildren(plan.children.map {
- case child if PlanUtil.isGlutenColumnarOp(child) =>
- RowToColumnarExec(
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child))
- case other => other
- })
- }
-
- def apply(plan: SparkPlan): SparkPlan = {
- val newPlan =
replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
- planChangeLogger.logRule(ruleName, plan, newPlan)
- newPlan
- }
-}
-
-object ColumnarTransitions {
- def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
= {
- InsertTransitions(outputsColumnar).apply(plan)
- }
-
- // Extractor for Spark/Gluten's C2R
- object ColumnarToRowLike {
- def unapply(plan: SparkPlan): Option[SparkPlan] = {
- plan match {
- case c2r: ColumnarToRowTransition =>
- Some(c2r.child)
- case _ => None
- }
- }
- }
-
- // Extractor for Spark/Gluten's R2C
- object RowToColumnarLike {
- def unapply(plan: SparkPlan): Option[SparkPlan] = {
- plan match {
- case c2r: RowToColumnarTransition =>
- Some(c2r.child)
- case _ => None
- }
- }
- }
-}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
index 471141f49..6f8d7cde7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.BroadcastHashJoinExecTransformerBase
import org.apache.gluten.extension.GlutenPlan
-import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPostOverrides
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike, Transitions}
import org.apache.gluten.utils.PlanUtil
import org.apache.spark.rdd.RDD
@@ -78,7 +78,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
case _: CommandResultExec | _: ExecutedCommandExec => // ignore
// we plan exchange to columnar exchange in columnar rules and the
exchange does not
// support columnar, so the output columnar is always false in AQE
postStageCreationRules
- case ColumnarToRowExec(s: Exchange) if isAdaptiveContext =>
+ case ColumnarToRowLike(s: Exchange) if isAdaptiveContext =>
countFallbackInternal(s)
case u: UnaryExecNode
if !PlanUtil.isGlutenColumnarOp(u) &&
PlanUtil.isGlutenTableCache(u.child) =>
@@ -86,15 +86,15 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
// which is a kind of `ColumnarToRowExec`.
transitionCost = transitionCost + 1
countFallbackInternal(u.child)
- case ColumnarToRowExec(p: GlutenPlan) =>
+ case ColumnarToRowLike(p: GlutenPlan) =>
logDebug(s"Find a columnar to row for gluten plan:\n$p")
transitionCost = transitionCost + 1
countFallbackInternal(p)
- case r: RowToColumnarExec =>
+ case RowToColumnarLike(child) =>
if (!ignoreRowToColumnar) {
transitionCost = transitionCost + 1
}
- countFallbackInternal(r.child)
+ countFallbackInternal(child)
case leafPlan: LeafExecNode if PlanUtil.isGlutenTableCache(leafPlan) =>
case leafPlan: LeafExecNode if !PlanUtil.isGlutenColumnarOp(leafPlan)
=>
// Possible fallback for leaf node.
@@ -236,9 +236,8 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
}
private def fallbackToRowBasedPlan(outputsColumnar: Boolean): SparkPlan = {
- val transformPostOverrides = TransformPostOverrides()
- val planWithTransitions =
ColumnarTransitions.insertTransitions(originalPlan, outputsColumnar)
- transformPostOverrides.apply(planWithTransitions)
+ val planWithTransitions = Transitions.insertTransitions(originalPlan,
outputsColumnar)
+ planWithTransitions
}
private def countTransitionCostForVanillaSparkPlan(plan: SparkPlan): Int = {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index 08c63000e..fab973ffb 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -16,8 +16,7 @@
*/
package org.apache.gluten.extension.columnar
-import org.apache.gluten.backendsapi.BackendsApiManager
-import
org.apache.gluten.extension.columnar.ColumnarTransitions.ColumnarToRowLike
+import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
import org.apache.spark.sql.SparkSession
@@ -59,35 +58,6 @@ object MiscColumnarRules {
}
}
- // This rule will try to convert the row-to-columnar and columnar-to-row
- // into native implementations.
- case class TransformPostOverrides() extends Rule[SparkPlan] {
- @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
- def replaceWithTransformerPlan(plan: SparkPlan): SparkPlan =
plan.transformDown {
- case RowToColumnarExec(child) =>
- logDebug(s"ColumnarPostOverrides RowToColumnarExec(${child.getClass})")
-
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child)
- case c2r @ ColumnarToRowExec(child)
- if PlanUtil.outputNativeColumnarData(child) &&
- !PlanUtil.outputNativeColumnarSparkCompatibleData(child) =>
- logDebug(s"ColumnarPostOverrides ColumnarToRowExec(${child.getClass})")
- val nativeC2r =
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child)
- if (nativeC2r.doValidate().isValid) {
- nativeC2r
- } else {
- c2r
- }
- }
-
- // apply for the physical not final plan
- def apply(plan: SparkPlan): SparkPlan = {
- val newPlan = replaceWithTransformerPlan(plan)
- planChangeLogger.logRule(ruleName, plan, newPlan)
- newPlan
- }
- }
-
// Remove topmost columnar-to-row otherwise AQE throws error.
// See:
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec#newQueryStage
//
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 92d64abf3..3f8ee8706 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -19,7 +19,8 @@ package org.apache.gluten.extension.columnar.enumerated
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, TransformPostOverrides}
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow}
+import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
@@ -151,10 +152,7 @@ class EnumeratedApplier(session: SparkSession)
*/
private def postRules(): List[SparkSession => Rule[SparkPlan]] =
List(
- (_: SparkSession) => TransformPostOverrides(),
- (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
- (s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())
- ) :::
+ (s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
:::
List((_: SparkSession) =>
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
SparkRuleUtil.extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarPostRules)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
index 7306b734a..388668287 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
@@ -71,11 +71,17 @@ class PushFilterToScan(validator: Validator) extends
RasRule[SparkPlan] {
private object FilterAndScan {
def unapply(node: SparkPlan): Option[(FilterExec, SparkPlan)] = node match
{
case f @ FilterExec(cond, ColumnarToRowExec(scan)) =>
+ ensureScan(scan)
Some(f, scan)
case f @ FilterExec(cond, scan) =>
+ ensureScan(scan)
Some(f, scan)
case _ =>
None
}
+
+ private def ensureScan(node: SparkPlan): Unit = {
+ assert(node.isInstanceOf[FileSourceScanExec] ||
node.isInstanceOf[BatchScanExec])
+ }
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 0e905ced1..2b5b18abb 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -19,8 +19,9 @@ package org.apache.gluten.extension.columnar.heuristic
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, TransformPreOverrides}
import
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
+import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
@@ -146,10 +147,7 @@ class HeuristicApplier(session: SparkSession)
*/
private def postRules(): List[SparkSession => Rule[SparkPlan]] =
List(
- (_: SparkSession) => TransformPostOverrides(),
- (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
- (s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())
- ) :::
+ (s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
:::
List((_: SparkSession) =>
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
SparkRuleUtil.extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarPostRules)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
new file mode 100644
index 000000000..2774497d9
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec,
SparkPlan}
+
+/**
+ * Convention of a query plan consists of the row data type and columnar data
type it supports to
+ * output.
+ */
+sealed trait Convention {
+ def rowType: Convention.RowType
+ def batchType: Convention.BatchType
+}
+
+object Convention {
+ implicit class ConventionOps(val conv: Convention) extends AnyVal {
+ def isNone: Boolean = {
+ conv.rowType == RowType.None && conv.batchType == BatchType.None
+ }
+
+ def &&(other: Convention): Convention = {
+ def rowType(): RowType = {
+ if (conv.rowType == other.rowType) {
+ return conv.rowType
+ }
+ RowType.None
+ }
+ def batchType(): BatchType = {
+ if (conv.batchType == other.batchType) {
+ return conv.batchType
+ }
+ BatchType.None
+ }
+ Convention.of(rowType(), batchType())
+ }
+ }
+
+ private case class Impl(override val rowType: RowType, override val
batchType: BatchType)
+ extends Convention
+
+ def get(plan: SparkPlan): Convention = {
+ ConventionFunc.create().conventionOf(plan)
+ }
+
+ def of(rowType: RowType, batchType: BatchType): Convention = {
+ Impl(rowType, batchType)
+ }
+
+ sealed trait RowType
+
+ object RowType {
+ // None indicates that the plan doesn't support row-based processing.
+ final case object None extends RowType
+ final case object VanillaRow extends RowType
+ }
+
+ trait BatchType {
+ final def fromRow(transitionDef: TransitionDef): Unit = {
+ Transition.factory.update().defineFromRowTransition(this, transitionDef)
+ }
+
+ final def toRow(transitionDef: TransitionDef): Unit = {
+ Transition.factory.update().defineToRowTransition(this, transitionDef)
+ }
+
+ final def fromBatch(from: BatchType, transitionDef: TransitionDef): Unit =
{
+ assert(from != this)
+ Transition.factory.update().defineBatchTransition(from, this,
transitionDef)
+ }
+
+ final def toBatch(to: BatchType, transitionDef: TransitionDef): Unit = {
+ assert(to != this)
+ Transition.factory.update().defineBatchTransition(this, to,
transitionDef)
+ }
+ }
+
+ object BatchType {
+ // None indicates that the plan doesn't support batch-based processing.
+ final case object None extends BatchType
+ final case object VanillaBatch extends BatchType {
+ fromRow(
+ () =>
+ (plan: SparkPlan) => {
+ RowToColumnarExec(plan)
+ })
+
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ ColumnarToRowExec(plan)
+ })
+ }
+ }
+
+ trait KnownBatchType {
+ def batchType(): BatchType
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
new file mode 100644
index 000000000..28bd1d12c
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention.{BatchType,
RowType}
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+
+/** ConventionFunc is a utility to derive [[Convention]] from a query plan. */
+trait ConventionFunc {
+ def conventionOf(plan: SparkPlan): Convention
+}
+
+object ConventionFunc {
+ type BatchOverride = PartialFunction[SparkPlan, BatchType]
+
+ // For testing, to make things work without a backend loaded.
+ private var ignoreBackend: Boolean = false
+
+ // Visible for testing
+ def ignoreBackend[T](body: => T): T = synchronized {
+ assert(!ignoreBackend)
+ ignoreBackend = true
+ try {
+ body
+ } finally {
+ ignoreBackend = false
+ }
+ }
+
+ def create(): ConventionFunc = {
+ synchronized {
+ if (ignoreBackend) {
+ // For testing
+ return new BuiltinFunc(PartialFunction.empty)
+ }
+ }
+ val batchOverride =
BackendsApiManager.getSparkPlanExecApiInstance.batchTypeFunc()
+ new BuiltinFunc(batchOverride)
+ }
+
+ private class BuiltinFunc(o: BatchOverride) extends ConventionFunc {
+
+ override def conventionOf(plan: SparkPlan): Convention = {
+ val conv = conventionOf0(plan)
+ conv
+ }
+
+ private def conventionOf0(plan: SparkPlan): Convention = plan match {
+ case p if canPropagateConvention(p) =>
+ val childrenConventions = p.children.map(conventionOf0).distinct
+ if (childrenConventions.size > 1) {
+ childrenConventions.reduce(_ && _)
+ } else {
+ assert(childrenConventions.size == 1)
+ childrenConventions.head
+ }
+ case q: QueryStageExec => conventionOf0(q.plan)
+ case r: ReusedExchangeExec => conventionOf0(r.child)
+ case a: AdaptiveSparkPlanExec =>
+ val rowType = rowTypeOf(a)
+ val batchType = if (a.supportsColumnar) {
+ // By default, we execute columnar AQE with backend batch output.
+ // See
org.apache.gluten.extension.columnar.transition.InsertTransitions.apply
+ BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ } else {
+ BatchType.None
+ }
+ val conv = Convention.of(rowType, batchType)
+ conv
+ case other =>
+ val conv = Convention.of(rowTypeOf(other), batchTypeOf(other))
+ conv
+ }
+
+ private def rowTypeOf(plan: SparkPlan): RowType = {
+ if (!SparkShimLoader.getSparkShims.supportsRowBased(plan)) {
+ return RowType.None
+ }
+ RowType.VanillaRow
+ }
+
+ private def batchTypeOf(plan: SparkPlan): BatchType = {
+ if (!plan.supportsColumnar) {
+ return BatchType.None
+ }
+ o.applyOrElse(
+ plan,
+ (p: SparkPlan) =>
+ p match {
+ case g: Convention.KnownBatchType => g.batchType()
+ case _ => BatchType.VanillaBatch
+ }
+ )
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
new file mode 100644
index 000000000..aac2084a7
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+/**
+ * ConventionReq describes the requirement for [[Convention]]. This is mostly
used in determining
+ * the acceptable conventions for its children of a parent plan node.
+ */
+sealed trait ConventionReq {
+ def requiredRowType: ConventionReq.RowType
+ def requiredBatchType: ConventionReq.BatchType
+}
+
+object ConventionReq {
+ sealed trait RowType
+
+ object RowType {
+ final case object Any extends RowType
+ final case class Is(t: Convention.RowType) extends RowType {
+ assert(t != Convention.RowType.None)
+ }
+ }
+
+ sealed trait BatchType
+
+ object BatchType {
+ final case object Any extends BatchType
+ final case class Is(t: Convention.BatchType) extends BatchType {
+ assert(t != Convention.BatchType.None)
+ }
+ }
+
+ private case class Impl(
+ override val requiredRowType: RowType,
+ override val requiredBatchType: BatchType
+ ) extends ConventionReq
+
+ val any: ConventionReq = Impl(RowType.Any, BatchType.Any)
+ def of(rowType: RowType, batchType: BatchType): ConventionReq = new
Impl(rowType, batchType)
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
new file mode 100644
index 000000000..9b745f94d
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.exception.GlutenException
+
+import org.apache.spark.sql.execution.SparkPlan
+
+import scala.collection.mutable
+
+/**
+ * Transition is a simple function to convert a query plan to interested
[[ConventionReq]].
+ *
+ * Transitions can be registered through the utility APIs in
+ * [[org.apache.gluten.extension.columnar.transition.Convention.BatchType]]'s
definition.
+ */
+trait Transition {
+ def apply(plan: SparkPlan): SparkPlan
+}
+
+trait TransitionDef {
+ def create(): Transition
+}
+
+object TransitionDef {
+ val empty: TransitionDef = () => Transition.empty
+}
+
+object Transition {
+ val empty: Transition = (plan: SparkPlan) => plan
+ val factory: Factory = Factory.newBuiltin()
+
+ def notFound(plan: SparkPlan): GlutenException = {
+ new GlutenException(s"No viable transition found from plan's child to
itself: $plan")
+ }
+
+ def notFound(plan: SparkPlan, required: ConventionReq): GlutenException = {
+ new GlutenException(s"No viable transition to [$required] found for plan:
$plan")
+ }
+
+ private class ChainedTransition(first: Transition, second: Transition)
extends Transition {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ second(first(plan))
+ }
+ }
+
+ private def chain(first: Transition, second: Transition): Transition = {
+ new ChainedTransition(first, second)
+ }
+
+ trait Factory {
+ final def findTransition(
+ from: Convention,
+ to: ConventionReq,
+ otherwise: Exception): Transition = {
+ findTransition(from, to) {
+ throw otherwise
+ }
+ }
+
+ protected def findTransition(from: Convention, to: ConventionReq)(
+ orElse: => Transition): Transition
+ private[transition] def update(): MutableFactory
+ }
+
+ trait MutableFactory extends Factory {
+ def defineFromRowTransition(to: Convention.BatchType, transitionDef:
TransitionDef): Unit
+ def defineToRowTransition(from: Convention.BatchType, transitionDef:
TransitionDef): Unit
+ def defineBatchTransition(
+ from: Convention.BatchType,
+ to: Convention.BatchType,
+ transitionDef: TransitionDef): Unit
+ }
+
+ private object Factory {
+ def newBuiltin(): Factory = {
+ new BuiltinFactory
+ }
+
+ private class BuiltinFactory extends MutableFactory {
+ private val fromRowTransitions: mutable.Map[Convention.BatchType,
TransitionDef] =
+ mutable.Map()
+ private val toRowTransitions: mutable.Map[Convention.BatchType,
TransitionDef] = mutable.Map()
+ private val batchTransitions
+ : mutable.Map[(Convention.BatchType, Convention.BatchType),
TransitionDef] =
+ mutable.Map()
+
+ override def defineFromRowTransition(
+ to: Convention.BatchType,
+ transitionDef: TransitionDef): Unit = {
+ assert(!fromRowTransitions.contains(to))
+ fromRowTransitions += to -> transitionDef
+ }
+
+ override def defineToRowTransition(
+ from: Convention.BatchType,
+ transitionDef: TransitionDef): Unit = {
+ assert(!toRowTransitions.contains(from))
+ toRowTransitions += from -> transitionDef
+ }
+
+ override def defineBatchTransition(
+ from: Convention.BatchType,
+ to: Convention.BatchType,
+ transitionDef: TransitionDef): Unit = {
+ assert(!batchTransitions.contains((from, to)))
+ batchTransitions += (from, to) -> transitionDef
+ }
+
+ override def findTransition(from: Convention, to: ConventionReq)(
+ orElse: => Transition): Transition = {
+ assert(
+ !from.isNone,
+ "#findTransition called with on a plan that doesn't support either
row or columnar " +
+ "output")
+ val out = (to.requiredRowType, to.requiredBatchType) match {
+ case (ConventionReq.RowType.Is(toRowType),
ConventionReq.BatchType.Is(toBatchType)) =>
+ if (from.rowType == toRowType && from.batchType == toBatchType) {
+ return Transition.empty
+ } else {
+ throw new UnsupportedOperationException(
+ "Transiting to plan that both have row and columnar-batch
output is not yet " +
+ "supported")
+ }
+ case (ConventionReq.RowType.Is(toRowType),
ConventionReq.BatchType.Any) =>
+ from.rowType match {
+ case Convention.RowType.None =>
+
toRowTransitions.get(from.batchType).map(_.create()).getOrElse(orElse)
+ case fromRowType =>
+ // We have only one single built-in row type.
+ assert(toRowType == fromRowType)
+ Transition.empty
+ }
+ case (ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(toBatchType)) =>
+ from.batchType match {
+ case Convention.BatchType.None =>
+
fromRowTransitions.get(toBatchType).map(_.create()).getOrElse(orElse)
+ case fromBatchType =>
+ if (toBatchType == fromBatchType) {
+ Transition.empty
+ } else {
+ // Batch type conversion needed.
+ //
+ // We first look up for batch-to-batch transition. If found
one, return that
+ // transition to caller. Otherwise, look for from/to row
transitions, then
+ // return a bridged batch-to-row-to-batch transition.
+ if (batchTransitions.contains((fromBatchType, toBatchType)))
{
+ // 1. Found batch-to-batch transition.
+ batchTransitions((fromBatchType, toBatchType)).create()
+ } else {
+ // 2. Otherwise, build up batch-to-row-to-batch transition.
+ val batchToRow =
+
toRowTransitions.get(fromBatchType).map(_.create()).getOrElse(orElse)
+ val rowToBatch =
+
fromRowTransitions.get(toBatchType).map(_.create()).getOrElse(orElse)
+ chain(batchToRow, rowToBatch)
+ }
+ }
+ }
+ case (ConventionReq.RowType.Any, ConventionReq.BatchType.Any) =>
+ Transition.empty
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Illegal convention requirement: $ConventionReq")
+ }
+ out
+ }
+
+ override private[transition] def update(): MutableFactory = this
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
new file mode 100644
index 000000000..e0758cff7
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnionExec}
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+
+import scala.annotation.tailrec
+
+case class InsertTransitions(outputsColumnar: Boolean) extends Rule[SparkPlan]
{
+ import InsertTransitions._
+ private val convFunc = ConventionFunc.create()
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ // Remove all transitions at first.
+ val removed = RemoveTransitions.apply(plan)
+ val filled = fillWithTransitions(removed)
+ if (!outputsColumnar) {
+ return Transitions.toRowPlan(filled)
+ }
+ Transitions.toBackendBatchPlan(filled)
+ }
+
+ private def fillWithTransitions(plan: SparkPlan): SparkPlan =
plan.transformUp {
+ case p => applyForNode(p)
+ }
+
+ private def applyForNode(node: SparkPlan): SparkPlan = {
+ if (node.children.isEmpty) {
+ return node
+ }
+ val convReq = childrenConvReqOf(node)
+ val newChildren = node.children.map {
+ child =>
+ val from = convFunc.conventionOf(child)
+ if (from.isNone) {
+ // For example, a union op with row child and columnar child at the
same time,
+ // The plan is actually not executable and we cannot tell about its
convention.
+ child
+ } else {
+ val transition =
+ Transition.factory.findTransition(from, convReq,
Transition.notFound(node))
+ val newChild = transition.apply(child)
+ newChild
+ }
+ }
+ node.withNewChildren(newChildren)
+ }
+
+ private def childrenConvReqOf(node: SparkPlan): ConventionReq = node match {
+ // TODO: Consider C2C transitions as well when we have some.
+ case ColumnarToRowLike(_) | RowToColumnarLike(_) =>
+ // C2R / R2C here since they are already removed by
+ // RemoveTransitions.
+ // It's current rule's mission to add C2Rs / R2Cs on demand.
+ throw new IllegalStateException("Unreachable code")
+ case write: DataWritingCommandExec if
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
+ // To align with ApplyColumnarRulesAndInsertTransitions#insertTransitions
+ ConventionReq.any
+ case u: UnionExec =>
+ // We force vanilla union to output row data to get best compatibility
with vanilla Spark.
+ // As a result it's a common practice to rewrite it with GlutenPlan for
offloading.
+ ConventionReq.of(
+ ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+ ConventionReq.BatchType.Any)
+ case other =>
+ // In the normal case, children's convention should follow parent node's
convention.
+ // Note, we don't have consider C2R / R2C here since they are already
removed by
+ // RemoveTransitions.
+ val thisConv = convFunc.conventionOf(other)
+ thisConv.asReq()
+ }
+}
+
+object InsertTransitions {
+ implicit private class ConventionOps(conv: Convention) {
+ def asReq(): ConventionReq = {
+ val rowTypeReq = conv.rowType match {
+ case Convention.RowType.None => ConventionReq.RowType.Any
+ case r => ConventionReq.RowType.Is(r)
+ }
+
+ val batchTypeReq = conv.batchType match {
+ case Convention.BatchType.None => ConventionReq.BatchType.Any
+ case b => ConventionReq.BatchType.Is(b)
+ }
+ ConventionReq.of(rowTypeReq, batchTypeReq)
+ }
+ }
+}
+
+object RemoveTransitions extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { case p
=> removeForNode(p) }
+
+ @tailrec
+ private[transition] def removeForNode(plan: SparkPlan): SparkPlan = plan
match {
+ // TODO: Consider C2C transitions as well when we have some.
+ case ColumnarToRowLike(child) => removeForNode(child)
+ case RowToColumnarLike(child) => removeForNode(child)
+ case other => other
+ }
+}
+
+object Transitions {
+ def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
= {
+ val out = InsertTransitions(outputsColumnar).apply(plan)
+ out
+ }
+
+ def toRowPlan(plan: SparkPlan): SparkPlan = {
+ val convFunc = ConventionFunc.create()
+ val req = ConventionReq.of(
+ ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+ ConventionReq.BatchType.Any)
+ val removed = RemoveTransitions.removeForNode(plan)
+ val transition = Transition.factory.findTransition(
+ convFunc.conventionOf(removed),
+ req,
+ Transition.notFound(removed, req))
+ val out = transition.apply(removed)
+ out
+ }
+
+ def toBackendBatchPlan(plan: SparkPlan): SparkPlan = {
+ val backendBatchType =
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ val out = toBatchPlan(plan, backendBatchType)
+ out
+ }
+
+ def toVanillaBatchPlan(plan: SparkPlan): SparkPlan = {
+ val out = toBatchPlan(plan, Convention.BatchType.VanillaBatch)
+ out
+ }
+
+ private def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType):
SparkPlan = {
+ val convFunc = ConventionFunc.create()
+ val req = ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(toBatchType))
+ val removed = RemoveTransitions.removeForNode(plan)
+ val transition = Transition.factory.findTransition(
+ convFunc.conventionOf(removed),
+ req,
+ Transition.notFound(removed, req))
+ val out = transition.apply(removed)
+ out
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
new file mode 100644
index 000000000..b0d04c273
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
+import org.apache.spark.sql.execution.debug.DebugExec
+
+package object transition {
+ // These 5 plan operators (as of Spark 3.5) are operators that have the
+ // same convention with their children.
+ //
+ // Extend this list in shim layer once Spark has more.
+ def canPropagateConvention(plan: SparkPlan): Boolean = plan match {
+ case p: DebugExec => true
+ case p: UnionExec => true
+ case p: AQEShuffleReadExec => true
+ case p: InputAdapter => true
+ case p: WholeStageCodegenExec => true
+ case _ => false
+ }
+
+ // Extractor for Spark/Gluten's C2R
+ object ColumnarToRowLike {
+ def unapply(plan: SparkPlan): Option[SparkPlan] = {
+ plan match {
+ case c2r: ColumnarToRowTransition =>
+ Some(c2r.child)
+ case _ => None
+ }
+ }
+ }
+
+ // Extractor for Spark/Gluten's R2C
+ object RowToColumnarLike {
+ def unapply(plan: SparkPlan): Option[SparkPlan] = {
+ plan match {
+ case c2r: RowToColumnarTransition =>
+ Some(c2r.child)
+ case _ => None
+ }
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
index 2920c0a39..fa69eedb5 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
@@ -16,7 +16,8 @@
*/
package org.apache.gluten.planner.cost
-import org.apache.gluten.extension.columnar.{ColumnarTransitions, OffloadJoin}
+import org.apache.gluten.extension.columnar.OffloadJoin
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike}
import org.apache.gluten.planner.plan.GlutenPlanModel.GroupLeafExec
import org.apache.gluten.ras.{Cost, CostModel}
import org.apache.gluten.utils.PlanUtil
@@ -63,8 +64,8 @@ object GlutenCostModel {
infLongCost
case ColumnarToRowExec(child) => 3L
case RowToColumnarExec(child) => 3L
- case ColumnarTransitions.ColumnarToRowLike(child) => 3L
- case ColumnarTransitions.RowToColumnarLike(child) => 3L
+ case ColumnarToRowLike(child) => 3L
+ case RowToColumnarLike(child) => 3L
case p if PlanUtil.isGlutenColumnarOp(p) => 2L
case p if PlanUtil.isVanillaColumnarOp(p) => 3L
// Other row ops. Usually a vanilla row op.
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
index 36751f4ca..5fe96ab79 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
@@ -16,9 +16,9 @@
*/
package org.apache.gluten.planner.property
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.RowToColumnarExecBase
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.ColumnarTransitions
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike, Transitions}
import org.apache.gluten.planner.plan.GlutenPlanModel.GroupLeafExec
import org.apache.gluten.ras.{Property, PropertyDef}
import org.apache.gluten.ras.rule.{RasRule, Shape, Shapes}
@@ -62,8 +62,8 @@ object ConventionDef extends PropertyDef[SparkPlan,
Convention] {
case g: GroupLeafExec => g.propertySet.get(ConventionDef)
case ColumnarToRowExec(child) => Conventions.ROW_BASED
case RowToColumnarExec(child) => Conventions.VANILLA_COLUMNAR
- case ColumnarTransitions.ColumnarToRowLike(child) => Conventions.ROW_BASED
- case ColumnarTransitions.RowToColumnarLike(child) =>
Conventions.GLUTEN_COLUMNAR
+ case ColumnarToRowLike(child) => Conventions.ROW_BASED
+ case RowToColumnarLike(child) => Conventions.GLUTEN_COLUMNAR
case q: QueryStageExec => conventionOf(q.plan)
case r: ReusedExchangeExec => conventionOf(r.child)
case a: AdaptiveSparkPlanExec => conventionOf(a.executedPlan)
@@ -82,8 +82,8 @@ object ConventionDef extends PropertyDef[SparkPlan,
Convention] {
constraint: Property[SparkPlan],
plan: SparkPlan): Seq[Convention] = plan match {
case ColumnarToRowExec(child) => Seq(Conventions.VANILLA_COLUMNAR)
- case ColumnarTransitions.ColumnarToRowLike(child) =>
Seq(Conventions.GLUTEN_COLUMNAR)
- case ColumnarTransitions.RowToColumnarLike(child) =>
Seq(Conventions.ROW_BASED)
+ case ColumnarToRowLike(child) => Seq(Conventions.GLUTEN_COLUMNAR)
+ case RowToColumnarLike(child) => Seq(Conventions.ROW_BASED)
case p if canPropagateConvention(p) =>
p.children.map(_ => constraint.asInstanceOf[Convention])
case other =>
@@ -127,22 +127,18 @@ case class ConventionEnforcerRule(reqConv: Convention)
extends RasRule[SparkPlan
case (Conventions.ROW_BASED, Conventions.VANILLA_COLUMNAR) =>
List(RowToColumnarExec(node))
case (Conventions.GLUTEN_COLUMNAR, Conventions.ROW_BASED) =>
-
List(BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(node))
+ List(Transitions.toRowPlan(node))
case (Conventions.ROW_BASED, Conventions.GLUTEN_COLUMNAR) =>
- val attempt =
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(node)
- if (attempt.doValidate().isValid) {
+ val attempt = Transitions.toBackendBatchPlan(node)
+ if (attempt.asInstanceOf[RowToColumnarExecBase].doValidate().isValid) {
List(attempt)
} else {
List.empty
}
case (Conventions.VANILLA_COLUMNAR, Conventions.GLUTEN_COLUMNAR) =>
- List(
- BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
- ColumnarToRowExec(node)))
+ List(Transitions.toBackendBatchPlan(ColumnarToRowExec(node)))
case (Conventions.GLUTEN_COLUMNAR, Conventions.VANILLA_COLUMNAR) =>
- List(
- RowToColumnarExec(
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(node)))
+ List(RowToColumnarExec(Transitions.toRowPlan(node)))
case _ => List.empty
}
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index 4c02687a6..fe7eb5566 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -17,18 +17,15 @@
package org.apache.gluten.utils
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.exchange._
object PlanUtil {
- def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
- // `ColumnarCachedBatchSerializer` is at velox module, so use class name
here
- i.relation.cacheBuilder.serializer.getClass.getSimpleName ==
"ColumnarCachedBatchSerializer" &&
- i.supportsColumnar
+ private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
+ Convention.get(i).batchType ==
BackendsApiManager.getSparkPlanExecApiInstance.batchType
}
def isGlutenTableCache(plan: SparkPlan): Boolean = {
@@ -42,44 +39,11 @@ object PlanUtil {
}
}
- def outputNativeColumnarData(plan: SparkPlan): Boolean = {
- plan match {
- case a: AQEShuffleReadExec => outputNativeColumnarData(a.child)
- case s: QueryStageExec => outputNativeColumnarData(s.plan)
- case s: ReusedExchangeExec => outputNativeColumnarData(s.child)
- case s: InputAdapter => outputNativeColumnarData(s.child)
- case s: WholeStageCodegenExec => outputNativeColumnarData(s.child)
- case s: AdaptiveSparkPlanExec => outputNativeColumnarData(s.executedPlan)
- case i: InMemoryTableScanExec => PlanUtil.isGlutenTableCache(i)
- case _: GlutenPlan => true
- case _ => false
- }
- }
-
- def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = {
-
BackendsApiManager.getSparkPlanExecApiInstance.outputNativeColumnarSparkCompatibleData(plan)
- }
-
def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
- plan match {
- case i: InMemoryTableScanExec =>
- if (PlanUtil.isGlutenTableCache(i)) {
- // `InMemoryTableScanExec` do not need extra RowToColumnar or
ColumnarToRow
- false
- } else {
- !plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
- }
- case a: AQEShuffleReadExec => isVanillaColumnarOp(a.child)
- case s: QueryStageExec => isVanillaColumnarOp(s.plan)
- case _: RowToColumnarExec => false
- case _: InputAdapter => false
- case _: WholeStageCodegenExec => false
- case r: ReusedExchangeExec => isVanillaColumnarOp(r.child)
- case _ => !plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
- }
+ Convention.get(plan).batchType == Convention.BatchType.VanillaBatch
}
def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
- plan.isInstanceOf[GlutenPlan]
+ Convention.get(plan).batchType ==
BackendsApiManager.getSparkPlanExecApiInstance.batchType
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index fce07eab4..23746846e 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution._
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.rel.RelBuilder
@@ -158,16 +159,18 @@ case class ColumnarCollapseTransformStages(
}
}
-case class ColumnarInputAdapter(child: SparkPlan) extends UnaryExecNode {
+case class ColumnarInputAdapter(child: SparkPlan)
+ extends UnaryExecNode
+ with Convention.KnownBatchType {
override def output: Seq[Attribute] = child.output
- override def supportsColumnar: Boolean = child.supportsColumnar
- override protected def doExecute(): RDD[InternalRow] =
- child.execute()
+ override def supportsColumnar: Boolean = true
+ override def batchType(): Convention.BatchType =
+ BackendsApiManager.getSparkPlanExecApiInstance.batchType
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
child.executeColumnar()
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
- override def vectorTypes: Option[Seq[String]] = child.vectorTypes
- override protected[sql] def doExecuteBroadcast[T](): Broadcast[T] =
child.executeBroadcast()
+ override protected[sql] def doExecuteBroadcast[T](): Broadcast[T] =
child.doExecuteBroadcast()
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
// Node name's required to be "InputAdapter" to correctly draw UI graph.
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index f1adb09e2..859cca842 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.ColumnarToRowExecBase
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
@@ -68,7 +69,7 @@ case class FakeRowAdaptor(child: SparkPlan)
if (child.supportsColumnar) {
child.executeColumnar()
} else {
- val r2c =
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child)
+ val r2c = Transitions.toBackendBatchPlan(child)
r2c.executeColumnar()
}
}
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
index bb1867d96..8e8743857 100644
---
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
+++
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
@@ -17,7 +17,8 @@
package org.apache.gluten.execution
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.utils.{Arm, FallbackUtil}
+import org.apache.gluten.test.FallbackUtil
+import org.apache.gluten.utils.Arm
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
new file mode 100644
index 000000000..5c6d692ae
--- /dev/null
+++
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.extension.GlutenPlan
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.test.SharedSparkSession
+
+class TransitionSuite extends SharedSparkSession {
+ import TransitionSuite._
+ test("Trivial C2R") {
+ val in = BatchLeaf(TypeA)
+ val out = ConventionFunc.ignoreBackend {
+ Transitions.insertTransitions(in, outputsColumnar = false)
+ }
+ assert(out == BatchToRow(TypeA, BatchLeaf(TypeA)))
+ }
+
+ test("Insert C2R") {
+ val in = RowUnary(BatchLeaf(TypeA))
+ val out = ConventionFunc.ignoreBackend {
+ Transitions.insertTransitions(in, outputsColumnar = false)
+ }
+ assert(out == RowUnary(BatchToRow(TypeA, BatchLeaf(TypeA))))
+ }
+
+ test("Insert R2C") {
+ val in = BatchUnary(TypeA, RowLeaf())
+ val out = ConventionFunc.ignoreBackend {
+ Transitions.insertTransitions(in, outputsColumnar = false)
+ }
+ assert(out == BatchToRow(TypeA, BatchUnary(TypeA, RowToBatch(TypeA,
RowLeaf()))))
+ }
+
+ test("Insert C2R2C") {
+ val in = BatchUnary(TypeA, BatchLeaf(TypeB))
+ val out = ConventionFunc.ignoreBackend {
+ Transitions.insertTransitions(in, outputsColumnar = false)
+ }
+ assert(
+ out == BatchToRow(
+ TypeA,
+ BatchUnary(TypeA, RowToBatch(TypeA, BatchToRow(TypeB,
BatchLeaf(TypeB))))))
+ }
+
+ test("Insert C2C") {
+ val in = BatchUnary(TypeA, BatchLeaf(TypeC))
+ val out = ConventionFunc.ignoreBackend {
+ Transitions.insertTransitions(in, outputsColumnar = false)
+ }
+ assert(
+ out == BatchToRow(
+ TypeA,
+ BatchUnary(TypeA, BatchToBatch(from = TypeC, to = TypeA,
BatchLeaf(TypeC)))))
+ }
+
+ test("No transitions found") {
+ val in = BatchUnary(TypeA, BatchLeaf(TypeD))
+ assertThrows[GlutenException] {
+ ConventionFunc.ignoreBackend {
+ Transitions.insertTransitions(in, outputsColumnar = false)
+ }
+ }
+ }
+}
+
+object TransitionSuite {
+ object TypeA extends Convention.BatchType {
+ fromRow(
+ () =>
+ (plan: SparkPlan) => {
+ RowToBatch(this, plan)
+ })
+
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ BatchToRow(this, plan)
+ })
+ }
+
+ object TypeB extends Convention.BatchType {
+ fromRow(
+ () =>
+ (plan: SparkPlan) => {
+ RowToBatch(this, plan)
+ })
+
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ BatchToRow(this, plan)
+ })
+ }
+
+ object TypeC extends Convention.BatchType {
+ fromRow(
+ () =>
+ (plan: SparkPlan) => {
+ RowToBatch(this, plan)
+ })
+
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ BatchToRow(this, plan)
+ })
+
+ fromBatch(
+ TypeA,
+ () =>
+ (plan: SparkPlan) => {
+ BatchToBatch(TypeA, this, plan)
+ })
+
+ toBatch(
+ TypeA,
+ () =>
+ (plan: SparkPlan) => {
+ BatchToBatch(this, TypeA, plan)
+ })
+ }
+
+ object TypeD extends Convention.BatchType {}
+
+ case class RowToBatch(toBatchType: Convention.BatchType, override val child:
SparkPlan)
+ extends RowToColumnarTransition
+ with GlutenPlan {
+ override def supportsColumnar: Boolean = true
+ override protected def batchType0(): Convention.BatchType = toBatchType
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] =
+ throw new UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ }
+
+ case class BatchToRow(fromBatchType: Convention.BatchType, override val
child: SparkPlan)
+ extends ColumnarToRowTransition {
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] =
+ throw new UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ }
+
+ case class BatchToBatch(
+ from: Convention.BatchType,
+ to: Convention.BatchType,
+ override val child: SparkPlan)
+ extends UnaryExecNode
+ with GlutenPlan {
+ override def supportsColumnar: Boolean = true
+ override protected def batchType0(): Convention.BatchType = to
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ }
+
+ case class BatchLeaf(override val batchType0: Convention.BatchType)
+ extends LeafExecNode
+ with GlutenPlan {
+ override def supportsColumnar: Boolean = true
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = List.empty
+ }
+
+ case class BatchUnary(
+ override val batchType0: Convention.BatchType,
+ override val child: SparkPlan)
+ extends UnaryExecNode
+ with GlutenPlan {
+ override def supportsColumnar: Boolean = true
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ }
+
+ case class BatchBinary(
+ override val batchType0: Convention.BatchType,
+ override val left: SparkPlan,
+ override val right: SparkPlan)
+ extends BinaryExecNode
+ with GlutenPlan {
+ override def supportsColumnar: Boolean = true
+ override protected def withNewChildrenInternal(
+ newLeft: SparkPlan,
+ newRight: SparkPlan): SparkPlan = copy(left = newLeft, right =
newRight)
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = left.output ++ right.output
+ }
+
+ case class RowLeaf() extends LeafExecNode {
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = List.empty
+ }
+
+ case class RowUnary(override val child: SparkPlan) extends UnaryExecNode {
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ }
+
+ case class RowBinary(override val left: SparkPlan, override val right:
SparkPlan)
+ extends BinaryExecNode {
+ override protected def withNewChildrenInternal(
+ newLeft: SparkPlan,
+ newRight: SparkPlan): SparkPlan = copy(left = newLeft, right =
newRight)
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = left.output ++ right.output
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/utils/FallbackUtil.scala
b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
similarity index 92%
rename from
gluten-core/src/main/scala/org/apache/gluten/utils/FallbackUtil.scala
rename to gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
index c40cdd675..d2626ab27 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/FallbackUtil.scala
+++ b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.utils
+package org.apache.gluten.test
+
+import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
@@ -66,12 +68,12 @@ object FallbackUtil extends Logging with
AdaptiveSparkPlanHelper {
var fallbackOperator: Seq[SparkPlan] = null
if (plan.isInstanceOf[AdaptiveSparkPlanExec]) {
fallbackOperator = collectWithSubqueries(plan) {
- case plan if !PlanUtil.isGlutenColumnarOp(plan) && !skip(plan) =>
+ case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
plan
}
} else {
fallbackOperator = plan.collectWithSubqueries {
- case plan if !PlanUtil.isGlutenColumnarOp(plan) && !skip(plan) =>
+ case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
plan
}
}
diff --git
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
new file mode 100644
index 000000000..3f40793d9
--- /dev/null
+++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.columnarbatch
+
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
+
+/**
+ * ArrowBatch stands for Gluten's Arrow-based columnar batch implementation.
Vanilla Spark's
+ * ColumnarBatch consisting of
[[org.apache.spark.sql.vectorized.ArrowColumnVector]]s is still
+ * treated as [[Convention.BatchType.VanillaBatch]].
+ *
+ * As of now, ArrowBatch should have
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
+ * populated in it. ArrowBatch can be loaded from / offloaded to native to C++
ArrowColumnarBatch
+ * through API in [[ColumnarBatches]]. After being offloaded, ArrowBatch is no
longer considered a
+ * legal ArrowBatch and cannot be accepted by trivial ColumnarToRowExec. To
follow that rule, Any
+ * plan with this batch type should promise it emits loaded batch only.
+ */
+object ArrowBatch extends Convention.BatchType {
+ toRow(
+ () =>
+ (plan: SparkPlan) => {
+ ColumnarToRowExec(plan)
+ })
+}
diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml
index 8015b5cec..2396087fc 100644
--- a/gluten-ut/pom.xml
+++ b/gluten-ut/pom.xml
@@ -42,6 +42,13 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index e2a99ef7e..7c7aa0879 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,8 +18,8 @@ package org.apache.spark.sql.execution
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.InsertTransitions
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.rdd.RDD
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index 89a0ee18a..ad08318bb 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.benchmarks
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer,
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.gluten.vectorized.JniLibLoader
@@ -124,8 +124,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
val newWholeStage = wholeStageTransform.withNewChildren(Seq(fileScan))
// generate ColumnarToRow
- val columnarToRowPlan =
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+ val columnarToRowPlan = Transitions.toRowPlan(newWholeStage)
val newWholeStageRDD = newWholeStage.executeColumnar()
val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
index c5e4fa0a4..c58284e44 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper
import org.apache.spark.sql.functions._
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 31517a141..fff883d49 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
TRANSFORM_UNSUPPORTED, TransformHints}
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.rdd.RDD
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index d99d52924..7d8a29204 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.benchmarks
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer,
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.gluten.vectorized.JniLibLoader
@@ -124,8 +124,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
val newWholeStage = wholeStageTransform.withNewChildren(Seq(fileScan))
// generate ColumnarToRow
- val columnarToRowPlan =
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+ val columnarToRowPlan = Transitions.toRowPlan(newWholeStage)
val newWholeStageRDD = newWholeStage.executeColumnar()
val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
index c5e4fa0a4..c58284e44 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper
import org.apache.spark.sql.functions._
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 079620bf8..7976288dd 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
TRANSFORM_UNSUPPORTED, TransformHints}
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.rdd.RDD
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index d99d52924..b5481f4d8 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.benchmarks
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer,
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.gluten.vectorized.JniLibLoader
@@ -125,7 +125,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
// generate ColumnarToRow
val columnarToRowPlan =
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+ Transitions.toBackendBatchPlan(newWholeStage)
val newWholeStageRDD = newWholeStage.executeColumnar()
val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
index c5e4fa0a4..c58284e44 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
import org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper
import org.apache.spark.sql.functions._
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 079620bf8..7976288dd 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
TRANSFORM_UNSUPPORTED, TransformHints}
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.rdd.RDD
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index d99d52924..7d8a29204 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.benchmarks
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.{FileSourceScanExecTransformer,
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
import org.apache.gluten.vectorized.JniLibLoader
@@ -124,8 +124,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
val newWholeStage = wholeStageTransform.withNewChildren(Seq(fileScan))
// generate ColumnarToRow
- val columnarToRowPlan =
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+ val columnarToRowPlan = Transitions.toRowPlan(newWholeStage)
val newWholeStageRDD = newWholeStage.executeColumnar()
val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index dbefc22ef..fd8cd24c3 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec,
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex,
WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
@@ -232,4 +233,5 @@ trait SparkShims {
def dateTimestampFormatInReadIsDefaultValue(csvOptions: CSVOptions,
timeZone: String): Boolean
+ def isPlannedV1Write(write: DataWritingCommandExec): Boolean = false
}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 0b045972a..6d70d67f3 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -40,11 +40,13 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -420,4 +422,8 @@ class Spark34Shims extends SparkShims {
csvOptions.timestampFormatInRead == default.timestampFormatInRead &&
csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
}
+
+ override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
+ write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
+ }
}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index c839c8c2a..00f9d62fd 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -30,7 +30,7 @@ import
org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions._
-import
org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate,
RegrIntercept, RegrR2, RegrReplacement, RegrSlope, RegrSXY,
TypedImperativeAggregate}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution, KeyGroupedPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -40,12 +40,14 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
import org.apache.spark.sql.execution.window.{WindowGroupLimitExec,
WindowGroupLimitExecShim}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -448,4 +450,8 @@ class Spark35Shims extends SparkShims {
csvOptions.timestampFormatInRead == default.timestampFormatInRead &&
csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
}
+
+ override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
+ write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]