This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d101cb80e4 [CORE] Refactor columnar noop write rule (#8422)
d101cb80e4 is described below
commit d101cb80e4ab973f3bee2ba3c4983ffe53759678
Author: jackylee <[email protected]>
AuthorDate: Thu Jan 9 16:23:50 2025 +0800
[CORE] Refactor columnar noop write rule (#8422)
---
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 2 +
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 3 +
.../datasources/GlutenWriterColumnarRules.scala | 59 ++++++--------
.../datasources/noop/GlutenNoopWriterRule.scala | 42 ++++++++++
.../utils/clickhouse/ClickHouseTestSettings.scala | 3 +
.../gluten/utils/velox/VeloxTestSettings.scala | 3 -
.../ClickHouseAdaptiveQueryExecSuite.scala | 89 +++++++++++++++++++++-
.../velox/VeloxAdaptiveQueryExecSuite.scala | 89 +++++++++++++++++++++-
.../GlutenWriterColumnarRulesSuite.scala | 54 -------------
.../utils/clickhouse/ClickHouseTestSettings.scala | 3 +
.../gluten/utils/velox/VeloxTestSettings.scala | 3 -
.../ClickHouseAdaptiveQueryExecSuite.scala | 89 +++++++++++++++++++++-
.../velox/VeloxAdaptiveQueryExecSuite.scala | 89 +++++++++++++++++++++-
.../GlutenWriterColumnarRulesSuite.scala | 54 -------------
.../utils/clickhouse/ClickHouseTestSettings.scala | 2 +
.../gluten/utils/velox/VeloxTestSettings.scala | 3 +-
.../ClickHouseAdaptiveQueryExecSuite.scala | 38 ++++++++-
.../velox/VeloxAdaptiveQueryExecSuite.scala | 38 ++++++++-
.../utils/clickhouse/ClickHouseTestSettings.scala | 2 +
.../gluten/utils/velox/VeloxTestSettings.scala | 1 -
.../ClickHouseAdaptiveQueryExecSuite.scala | 38 ++++++++-
.../velox/VeloxAdaptiveQueryExecSuite.scala | 38 ++++++++-
.../datasources/GlutenNoopWriterRuleSuite.scala | 70 +++++++++++++++++
23 files changed, 652 insertions(+), 160 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 139b8b3131..21ae342a22 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.delta.DeltaLogFileIndex
import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.util.SparkPlanRules
@@ -132,6 +133,7 @@ object CHRuleApi {
c =>
intercept(
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session)))
+ injector.injectPost(c => GlutenNoopWriterRule.apply(c.session))
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 4f19375e85..f3c75cd983 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -36,6 +36,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
+import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.joins.BaseJoinExec
@@ -110,6 +111,7 @@ object VeloxRuleApi {
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
+ injector.injectPost(c => GlutenNoopWriterRule(c.session))
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
@@ -188,6 +190,7 @@ object VeloxRuleApi {
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPostTransform(c => each(c.session)))
injector.injectPostTransform(c =>
ColumnarCollapseTransformStages(c.glutenConf))
+ injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
injector.injectPostTransform(c =>
RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf,
c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 126417bf18..54b5a34639 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand,
DataWritingCommand, DataWritingCommandExec}
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
OverwriteByExpressionExec}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand,
InsertIntoHiveDirCommand, InsertIntoHiveTable}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -133,19 +132,33 @@ object GlutenWriterColumnarRules {
}
}
- case class NativeWritePostRule(session: SparkSession) extends
Rule[SparkPlan] {
+ private[datasources] def injectFakeRowAdaptor(command: SparkPlan, child:
SparkPlan): SparkPlan = {
+ child match {
+ // if the child is columnar, we can just wrap & transfer the columnar
data
+ case c2r: ColumnarToRowExecBase =>
+ command.withNewChildren(Array(FakeRowAdaptor(c2r.child)))
+ // If the child is aqe, we make aqe "support columnar",
+ // then aqe itself will guarantee to generate columnar outputs.
+ // So FakeRowAdaptor will always consumes columnar data,
+ // thus avoiding the case of c2r->aqe->r2c->writer
+ case aqe: AdaptiveSparkPlanExec =>
+ command.withNewChildren(
+ Array(
+ FakeRowAdaptor(
+ AdaptiveSparkPlanExec(
+ aqe.inputPlan,
+ aqe.context,
+ aqe.preprocessingRules,
+ aqe.isSubquery,
+ supportsColumnar = true
+ ))))
+ case other => command.withNewChildren(Array(FakeRowAdaptor(other)))
+ }
+ }
- private val NOOP_WRITE =
"org.apache.spark.sql.execution.datasources.noop.NoopWrite$"
+ case class NativeWritePostRule(session: SparkSession) extends
Rule[SparkPlan] {
override def apply(p: SparkPlan): SparkPlan = p match {
- case rc @ AppendDataExec(_, _, write)
- if write.getClass.getName == NOOP_WRITE &&
- BackendsApiManager.getSettings.enableNativeWriteFiles() =>
- injectFakeRowAdaptor(rc, rc.child)
- case rc @ OverwriteByExpressionExec(_, _, write)
- if write.getClass.getName == NOOP_WRITE &&
- BackendsApiManager.getSettings.enableNativeWriteFiles() =>
- injectFakeRowAdaptor(rc, rc.child)
case rc @ DataWritingCommandExec(cmd, child) =>
// The same thread can set these properties in the last query
submission.
val fields = child.output.toStructType.fields
@@ -165,30 +178,6 @@ object GlutenWriterColumnarRules {
case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
}
-
- private def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan):
SparkPlan = {
- child match {
- // if the child is columnar, we can just wrap&transfer the columnar
data
- case c2r: ColumnarToRowExecBase =>
- command.withNewChildren(Array(FakeRowAdaptor(c2r.child)))
- // If the child is aqe, we make aqe "support columnar",
- // then aqe itself will guarantee to generate columnar outputs.
- // So FakeRowAdaptor will always consumes columnar data,
- // thus avoiding the case of c2r->aqe->r2c->writer
- case aqe: AdaptiveSparkPlanExec =>
- command.withNewChildren(
- Array(
- FakeRowAdaptor(
- AdaptiveSparkPlanExec(
- aqe.inputPlan,
- aqe.context,
- aqe.preprocessingRules,
- aqe.isSubquery,
- supportsColumnar = true
- ))))
- case other => command.withNewChildren(Array(FakeRowAdaptor(other)))
- }
- }
}
def injectSparkLocalProperty(spark: SparkSession, format: Option[String]):
Unit = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala
new file mode 100644
index 0000000000..bedf006510
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.noop
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+import
org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.injectFakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
OverwriteByExpressionExec}
+
+/**
+ * A rule that injects a FakeRowAdaptor for NoopWrite.
+ *
+ * The current V2 Command does not support columnar. Therefore, when its child
node is a
+ * ColumnarNode, Vanilla Spark inserts a ColumnarToRow conversion between V2
Command and its child.
+ * This rule replaces the inserted ColumnarToRow with a FakeRowAdaptor,
effectively bypassing the
+ * ColumnarToRow operation for NoopWrite. Since NoopWrite does not actually
perform any data
+ * operations, it can accept input data in either row-based or columnar format.
+ */
+case class GlutenNoopWriterRule(session: SparkSession) extends Rule[SparkPlan]
{
+ override def apply(p: SparkPlan): SparkPlan = p match {
+ case rc @ AppendDataExec(_, _, NoopWrite) =>
+ injectFakeRowAdaptor(rc, rc.child)
+ case rc @ OverwriteByExpressionExec(_, _, NoopWrite) =>
+ injectFakeRowAdaptor(rc, rc.child)
+ case _ => p
+ }
+}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index b985347f20..8c62e3b0fd 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1123,6 +1123,9 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("Change merge join to broadcast join without local shuffle read")
.exclude(
"Avoid changing merge join to broadcast join if too many empty
partitions on build plan")
+ .exclude("SPARK-32932: Do not use local shuffle read at final stage on
write command")
+ .exclude(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of
v2 write commands")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
.exclude("SPARK-32717: AQEOptimizer should respect excludedRules
configuration")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index f5071d2f3f..62ab868363 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -166,14 +166,12 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-30403",
"SPARK-30719",
"SPARK-31384",
- "SPARK-30953",
"SPARK-31658",
"SPARK-32717",
"SPARK-32649",
"SPARK-34533",
"SPARK-34781",
"SPARK-35585",
- "SPARK-32932",
"SPARK-33494",
// "SPARK-33933",
"SPARK-31220",
@@ -1053,7 +1051,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSupportsCatalogOptionsSuite]
enableSuite[GlutenTableCapabilityCheckSuite]
enableSuite[GlutenWriteDistributionAndOrderingSuite]
- enableSuite[GlutenWriterColumnarRulesSuite]
enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
// Exclude the following suite for plan changed from SMJ to SHJ.
.exclude("avoid shuffle when join 2 bucketed tables")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 6d3c3e865d..928dc38985 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.clickhouse
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformer}
import org.apache.spark.SparkConf
@@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.log4j.Level
@@ -42,7 +49,7 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1196,6 +1203,86 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
+ testGluten("SPARK-32932: Do not use local shuffle read at final stage on
write command") {
+ withSQLConf(
+ SQLConf.PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+ ) {
+ val data =
+ for (
+ i <- 1L to 10L;
+ j <- 1L to 3L
+ ) yield (i, j)
+
+ val df = data.toDF("i", "j").repartition($"j")
+ var noLocalread: Boolean = false
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ qe.executedPlan match {
+ case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+ noLocalread = collect(plan) {
+ case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+ }.isEmpty
+ case _ => // ignore other events
+ }
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+
+ withTable("t") {
+ df.write.partitionBy("j").saveAsTable("t")
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+ }
+
+ // Test DataSource v2
+ val format = classOf[NoopDataSource].getName
+ df.write.format(format).mode("overwrite").save()
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index f8b6092a46..ce9513c8cc 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.velox
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformer}
import org.apache.spark.SparkConf
@@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.log4j.Level
@@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1175,6 +1182,86 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
+ testGluten("SPARK-32932: Do not use local shuffle read at final stage on
write command") {
+ withSQLConf(
+ SQLConf.PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+ ) {
+ val data =
+ for (
+ i <- 1L to 10L;
+ j <- 1L to 3L
+ ) yield (i, j)
+
+ val df = data.toDF("i", "j").repartition($"j")
+ var noLocalread: Boolean = false
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ qe.executedPlan match {
+ case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+ noLocalread = collect(plan) {
+ case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+ }.isEmpty
+ case _ => // ignore other events
+ }
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+
+ withTable("t") {
+ df.write.partitionBy("j").saveAsTable("t")
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+ }
+
+ // Test DataSource v2
+ val format = classOf[NoopDataSource].getName
+ df.write.format(format).mode("overwrite").save()
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
deleted file mode 100644
index 10abca1c6d..0000000000
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode}
-import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
-
-class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait {
-
- class WriterColumnarListener extends QueryExecutionListener {
- var fakeRowAdaptor: Option[FakeRowAdaptor] = None
-
- override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
- fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor
=> f }
- }
-
- override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
- }
-
- testGluten("writing to noop") {
- withTempDir {
- dir =>
- withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") {
- spark.range(0,
100).write.mode(SaveMode.Overwrite).parquet(dir.getPath)
- val listener = new WriterColumnarListener
- spark.listenerManager.register(listener)
- try {
-
spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save()
- spark.sparkContext.listenerBus.waitUntilEmpty()
- assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not
found.")
- } finally {
- spark.listenerManager.unregister(listener)
- }
- }
- }
- }
-}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 219eb0d0b9..f91841b991 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1119,6 +1119,9 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("Change merge join to broadcast join without local shuffle read")
.exclude(
"Avoid changing merge join to broadcast join if too many empty
partitions on build plan")
+ .exclude("SPARK-32932: Do not use local shuffle read at final stage on
write command")
+ .exclude(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of
v2 write commands")
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if
too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index d3bc3846d8..72b77ae1f9 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -70,7 +70,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSupportsCatalogOptionsSuite]
enableSuite[GlutenTableCapabilityCheckSuite]
enableSuite[GlutenWriteDistributionAndOrderingSuite]
- enableSuite[GlutenWriterColumnarRulesSuite]
enableSuite[GlutenQueryCompilationErrorsDSv2Suite]
@@ -191,14 +190,12 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-30403",
"SPARK-30719",
"SPARK-31384",
- "SPARK-30953",
"SPARK-31658",
"SPARK-32717",
"SPARK-32649",
"SPARK-34533",
"SPARK-34781",
"SPARK-35585",
- "SPARK-32932",
"SPARK-33494",
"SPARK-33933",
"SPARK-31220",
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 441f3a60a3..779d264114 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.clickhouse
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformerBase}
import org.apache.spark.SparkConf
@@ -24,14 +25,20 @@ import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait,
Row}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.logging.log4j.Level
@@ -40,7 +47,7 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1193,6 +1200,86 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
+ testGluten("SPARK-32932: Do not use local shuffle read at final stage on
write command") {
+ withSQLConf(
+ SQLConf.PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+ ) {
+ val data =
+ for (
+ i <- 1L to 10L;
+ j <- 1L to 3L
+ ) yield (i, j)
+
+ val df = data.toDF("i", "j").repartition($"j")
+ var noLocalread: Boolean = false
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ qe.executedPlan match {
+ case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+ noLocalread = collect(plan) {
+ case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+ }.isEmpty
+ case _ => // ignore other events
+ }
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+
+ withTable("t") {
+ df.write.partitionBy("j").saveAsTable("t")
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+ }
+
+ // Test DataSource v2
+ val format = classOf[NoopDataSource].getName
+ df.write.format(format).mode("overwrite").save()
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 729a12f56c..f9f0723e00 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.velox
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformer}
import org.apache.spark.SparkConf
@@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.logging.log4j.Level
@@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1179,6 +1186,86 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
+ testGluten("SPARK-32932: Do not use local shuffle read at final stage on
write command") {
+ withSQLConf(
+ SQLConf.PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+ ) {
+ val data =
+ for (
+ i <- 1L to 10L;
+ j <- 1L to 3L
+ ) yield (i, j)
+
+ val df = data.toDF("i", "j").repartition($"j")
+ var noLocalread: Boolean = false
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ qe.executedPlan match {
+ case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+ noLocalread = collect(plan) {
+ case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+ }.isEmpty
+ case _ => // ignore other events
+ }
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+
+ withTable("t") {
+ df.write.partitionBy("j").saveAsTable("t")
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+ }
+
+ // Test DataSource v2
+ val format = classOf[NoopDataSource].getName
+ df.write.format(format).mode("overwrite").save()
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(noLocalread)
+ noLocalread = false
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
deleted file mode 100644
index 10abca1c6d..0000000000
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode}
-import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
-
-class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait {
-
- class WriterColumnarListener extends QueryExecutionListener {
- var fakeRowAdaptor: Option[FakeRowAdaptor] = None
-
- override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
- fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor
=> f }
- }
-
- override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
- }
-
- testGluten("writing to noop") {
- withTempDir {
- dir =>
- withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") {
- spark.range(0,
100).write.mode(SaveMode.Overwrite).parquet(dir.getPath)
- val listener = new WriterColumnarListener
- spark.listenerManager.register(listener)
- try {
-
spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save()
- spark.sparkContext.listenerBus.waitUntilEmpty()
- assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not
found.")
- } finally {
- spark.listenerManager.unregister(listener)
- }
- }
- }
- }
-}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 56c95ae1bd..9ebcadf531 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -983,6 +983,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("Change merge join to broadcast join without local shuffle read")
.exclude(
"Avoid changing merge join to broadcast join if too many empty
partitions on build plan")
+ .exclude(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of
v2 write commands")
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if
too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index cc9746dcdb..94d3a1f6e8 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -25,7 +25,7 @@ import
org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalog
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite,
GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite,
GlutenQueryParsingErrorsSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite,
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite,
GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite,
GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite,
GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite,
GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import
org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
-import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite,
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite,
GlutenFileFormatWriterSuite, GlutenFileIndexSuite,
GlutenFileMetadataStructRowIndexSuite, GlutenFileMetadataStructSuite,
GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite,
GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite,
GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite,
GlutenOrcCodecSuite, [...]
+import org.apache.spark.sql.execution.datasources._
import
org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
import
org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite,
GlutenCSVv1Suite, GlutenCSVv2Suite}
import
org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite
@@ -182,7 +182,6 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-30403",
"SPARK-30719",
"SPARK-31384",
- "SPARK-30953",
"SPARK-31658",
"SPARK-32717",
"SPARK-32649",
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 49d47fa65b..2bd5a96dad 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.clickhouse
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformer}
import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.logging.log4j.Level
@@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1182,6 +1187,37 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 729a12f56c..6a3d6da27c 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.velox
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformer}
import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.logging.log4j.Level
@@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index bdb160ad24..f482ad921e 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -983,6 +983,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("Change merge join to broadcast join without local shuffle read")
.exclude(
"Avoid changing merge join to broadcast join if too many empty
partitions on build plan")
+ .exclude(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of
v2 write commands")
.exclude("SPARK-37753: Allow changing outer join to broadcast join even if
too many empty partitions on broadcast side")
.exclude("SPARK-29544: adaptive skew join with different join types")
.exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 71786c9132..73c4d43ced 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -185,7 +185,6 @@ class VeloxTestSettings extends BackendTestSettings {
"SPARK-30403",
"SPARK-30719",
"SPARK-31384",
- "SPARK-30953",
"SPARK-31658",
"SPARK-32717",
"SPARK-32649",
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 2e5df7b859..bd941586d7 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.clickhouse
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformer}
import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.logging.log4j.Level
@@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1197,6 +1202,37 @@ class ClickHouseAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with Glute
}
}
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 729a12f56c..6a3d6da27c 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.adaptive.velox
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ShuffledHashJoinExecTransformerBase, SortExecTransformer,
SortMergeJoinExecTransformer}
import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.logging.log4j.Level
@@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
override def sparkConf: SparkConf = {
super.sparkConf
- .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+ .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
}
@@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
+ testGluten(
+ "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2
write commands") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ var plan: SparkPlan = null
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution,
durationNs: Long): Unit = {
+ plan = qe.executedPlan
+ }
+ override def onFailure(
+ funcName: String,
+ qe: QueryExecution,
+ exception: Exception): Unit = {}
+ }
+ spark.listenerManager.register(listener)
+ withTable("t1") {
+ val format = classOf[NoopDataSource].getName
+ Seq((0, 1)).toDF("x",
"y").write.format(format).mode("overwrite").save()
+
+ sparkContext.listenerBus.waitUntilEmpty()
+ assert(plan.isInstanceOf[V2TableWriteExec])
+ val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+ assert(childPlan.isInstanceOf[FakeRowAdaptor])
+
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+
testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
diff --git
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala
new file mode 100644
index 0000000000..ebf17444e6
--- /dev/null
+++
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.datasources
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{GlutenQueryTest, SaveMode}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.util.QueryExecutionListener
+
+class GlutenNoopWriterRuleSuite extends GlutenQueryTest with
SharedSparkSession {
+
+ override def sparkConf: SparkConf = {
+ val conf = super.sparkConf
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.default.parallelism", "1")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1024MB")
+ .set("spark.ui.enabled", "false")
+ .set("spark.gluten.ui.enabled", "false")
+ if (BackendTestUtils.isCHBackendLoaded()) {
+ conf.set(GlutenConfig.GLUTEN_LIB_PATH,
SystemParameters.getClickHouseLibPath)
+ }
+ conf
+ }
+
+ class WriterColumnarListener extends QueryExecutionListener {
+ var fakeRowAdaptor: Option[FakeRowAdaptor] = None
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor
=> f }
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
+ }
+
+ test("writing to noop") {
+ withTempDir {
+ dir =>
+ spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath)
+ val listener = new WriterColumnarListener
+ spark.listenerManager.register(listener)
+ try {
+
spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save()
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not
found.")
+ } finally {
+ spark.listenerManager.unregister(listener)
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]