This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d101cb80e4 [CORE] Refactor columnar noop write rule (#8422)
d101cb80e4 is described below

commit d101cb80e4ab973f3bee2ba3c4983ffe53759678
Author: jackylee <[email protected]>
AuthorDate: Thu Jan 9 16:23:50 2025 +0800

    [CORE] Refactor columnar noop write rule (#8422)
---
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |  2 +
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  3 +
 .../datasources/GlutenWriterColumnarRules.scala    | 59 ++++++--------
 .../datasources/noop/GlutenNoopWriterRule.scala    | 42 ++++++++++
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  3 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |  3 -
 .../ClickHouseAdaptiveQueryExecSuite.scala         | 89 +++++++++++++++++++++-
 .../velox/VeloxAdaptiveQueryExecSuite.scala        | 89 +++++++++++++++++++++-
 .../GlutenWriterColumnarRulesSuite.scala           | 54 -------------
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  3 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |  3 -
 .../ClickHouseAdaptiveQueryExecSuite.scala         | 89 +++++++++++++++++++++-
 .../velox/VeloxAdaptiveQueryExecSuite.scala        | 89 +++++++++++++++++++++-
 .../GlutenWriterColumnarRulesSuite.scala           | 54 -------------
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  2 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |  3 +-
 .../ClickHouseAdaptiveQueryExecSuite.scala         | 38 ++++++++-
 .../velox/VeloxAdaptiveQueryExecSuite.scala        | 38 ++++++++-
 .../utils/clickhouse/ClickHouseTestSettings.scala  |  2 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |  1 -
 .../ClickHouseAdaptiveQueryExecSuite.scala         | 38 ++++++++-
 .../velox/VeloxAdaptiveQueryExecSuite.scala        | 38 ++++++++-
 .../datasources/GlutenNoopWriterRuleSuite.scala    | 70 +++++++++++++++++
 23 files changed, 652 insertions(+), 160 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 139b8b3131..21ae342a22 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.delta.DeltaLogFileIndex
 import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule
 import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
 import org.apache.spark.util.SparkPlanRules
 
@@ -132,6 +133,7 @@ object CHRuleApi {
       c =>
         intercept(
           
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session)))
+    injector.injectPost(c => GlutenNoopWriterRule.apply(c.session))
 
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 4f19375e85..f3c75cd983 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -36,6 +36,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
+import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.execution.joins.BaseJoinExec
@@ -110,6 +111,7 @@ object VeloxRuleApi {
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPost(c => each(c.session)))
     injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
+    injector.injectPost(c => GlutenNoopWriterRule(c.session))
 
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
@@ -188,6 +190,7 @@ object VeloxRuleApi {
       .getExtendedColumnarPostRules()
       .foreach(each => injector.injectPostTransform(c => each(c.session)))
     injector.injectPostTransform(c => 
ColumnarCollapseTransformStages(c.glutenConf))
+    injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
     injector.injectPostTransform(c => 
RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, 
c.session))
     injector.injectPostTransform(_ => RemoveFallbackTagRule())
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 126417bf18..54b5a34639 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import 
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, 
DataWritingCommand, DataWritingCommandExec}
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
OverwriteByExpressionExec}
 import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, 
InsertIntoHiveDirCommand, InsertIntoHiveTable}
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -133,19 +132,33 @@ object GlutenWriterColumnarRules {
     }
   }
 
-  case class NativeWritePostRule(session: SparkSession) extends 
Rule[SparkPlan] {
+  private[datasources] def injectFakeRowAdaptor(command: SparkPlan, child: 
SparkPlan): SparkPlan = {
+    child match {
+      // if the child is columnar, we can just wrap & transfer the columnar 
data
+      case c2r: ColumnarToRowExecBase =>
+        command.withNewChildren(Array(FakeRowAdaptor(c2r.child)))
+      // If the child is aqe, we make aqe "support columnar",
+      // then aqe itself will guarantee to generate columnar outputs.
+      // So FakeRowAdaptor will always consumes columnar data,
+      // thus avoiding the case of c2r->aqe->r2c->writer
+      case aqe: AdaptiveSparkPlanExec =>
+        command.withNewChildren(
+          Array(
+            FakeRowAdaptor(
+              AdaptiveSparkPlanExec(
+                aqe.inputPlan,
+                aqe.context,
+                aqe.preprocessingRules,
+                aqe.isSubquery,
+                supportsColumnar = true
+              ))))
+      case other => command.withNewChildren(Array(FakeRowAdaptor(other)))
+    }
+  }
 
-    private val NOOP_WRITE = 
"org.apache.spark.sql.execution.datasources.noop.NoopWrite$"
+  case class NativeWritePostRule(session: SparkSession) extends 
Rule[SparkPlan] {
 
     override def apply(p: SparkPlan): SparkPlan = p match {
-      case rc @ AppendDataExec(_, _, write)
-          if write.getClass.getName == NOOP_WRITE &&
-            BackendsApiManager.getSettings.enableNativeWriteFiles() =>
-        injectFakeRowAdaptor(rc, rc.child)
-      case rc @ OverwriteByExpressionExec(_, _, write)
-          if write.getClass.getName == NOOP_WRITE &&
-            BackendsApiManager.getSettings.enableNativeWriteFiles() =>
-        injectFakeRowAdaptor(rc, rc.child)
       case rc @ DataWritingCommandExec(cmd, child) =>
         // The same thread can set these properties in the last query 
submission.
         val fields = child.output.toStructType.fields
@@ -165,30 +178,6 @@ object GlutenWriterColumnarRules {
 
       case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
     }
-
-    private def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): 
SparkPlan = {
-      child match {
-        // if the child is columnar, we can just wrap&transfer the columnar 
data
-        case c2r: ColumnarToRowExecBase =>
-          command.withNewChildren(Array(FakeRowAdaptor(c2r.child)))
-        // If the child is aqe, we make aqe "support columnar",
-        // then aqe itself will guarantee to generate columnar outputs.
-        // So FakeRowAdaptor will always consumes columnar data,
-        // thus avoiding the case of c2r->aqe->r2c->writer
-        case aqe: AdaptiveSparkPlanExec =>
-          command.withNewChildren(
-            Array(
-              FakeRowAdaptor(
-                AdaptiveSparkPlanExec(
-                  aqe.inputPlan,
-                  aqe.context,
-                  aqe.preprocessingRules,
-                  aqe.isSubquery,
-                  supportsColumnar = true
-                ))))
-        case other => command.withNewChildren(Array(FakeRowAdaptor(other)))
-      }
-    }
   }
 
   def injectSparkLocalProperty(spark: SparkSession, format: Option[String]): 
Unit = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala
new file mode 100644
index 0000000000..bedf006510
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.noop
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.injectFakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
OverwriteByExpressionExec}
+
+/**
+ * A rule that injects a FakeRowAdaptor for NoopWrite.
+ *
+ * The current V2 Command does not support columnar. Therefore, when its child 
node is a
+ * ColumnarNode, Vanilla Spark inserts a ColumnarToRow conversion between V2 
Command and its child.
+ * This rule replaces the inserted ColumnarToRow with a FakeRowAdaptor, 
effectively bypassing the
+ * ColumnarToRow operation for NoopWrite. Since NoopWrite does not actually 
perform any data
+ * operations, it can accept input data in either row-based or columnar format.
+ */
+case class GlutenNoopWriterRule(session: SparkSession) extends Rule[SparkPlan] 
{
+  override def apply(p: SparkPlan): SparkPlan = p match {
+    case rc @ AppendDataExec(_, _, NoopWrite) =>
+      injectFakeRowAdaptor(rc, rc.child)
+    case rc @ OverwriteByExpressionExec(_, _, NoopWrite) =>
+      injectFakeRowAdaptor(rc, rc.child)
+    case _ => p
+  }
+}
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index b985347f20..8c62e3b0fd 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1123,6 +1123,9 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("Change merge join to broadcast join without local shuffle read")
     .exclude(
       "Avoid changing merge join to broadcast join if too many empty 
partitions on build plan")
+    .exclude("SPARK-32932: Do not use local shuffle read at final stage on 
write command")
+    .exclude(
+      "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of 
v2 write commands")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
     .exclude("SPARK-32717: AQEOptimizer should respect excludedRules 
configuration")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index f5071d2f3f..62ab868363 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -166,14 +166,12 @@ class VeloxTestSettings extends BackendTestSettings {
       "SPARK-30403",
       "SPARK-30719",
       "SPARK-31384",
-      "SPARK-30953",
       "SPARK-31658",
       "SPARK-32717",
       "SPARK-32649",
       "SPARK-34533",
       "SPARK-34781",
       "SPARK-35585",
-      "SPARK-32932",
       "SPARK-33494",
       // "SPARK-33933",
       "SPARK-31220",
@@ -1053,7 +1051,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenSupportsCatalogOptionsSuite]
   enableSuite[GlutenTableCapabilityCheckSuite]
   enableSuite[GlutenWriteDistributionAndOrderingSuite]
-  enableSuite[GlutenWriterColumnarRulesSuite]
   enableSuite[GlutenBucketedReadWithoutHiveSupportSuite]
     // Exclude the following suite for plan changed from SMJ to SHJ.
     .exclude("avoid shuffle when join 2 bucketed tables")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 6d3c3e865d..928dc38985 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.clickhouse
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformer}
 
 import org.apache.spark.SparkConf
@@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
 import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.log4j.Level
 
@@ -42,7 +49,7 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1196,6 +1203,86 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
     }
   }
 
+  testGluten("SPARK-32932: Do not use local shuffle read at final stage on 
write command") {
+    withSQLConf(
+      SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+    ) {
+      val data =
+        for (
+          i <- 1L to 10L;
+          j <- 1L to 3L
+        ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalread: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              noLocalread = collect(plan) {
+                case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalread)
+        noLocalread = false
+      }
+
+      // Test DataSource v2
+      val format = classOf[NoopDataSource].getName
+      df.write.format(format).mode("overwrite").save()
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(noLocalread)
+      noLocalread = false
+
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index f8b6092a46..ce9513c8cc 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.velox
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformer}
 
 import org.apache.spark.SparkConf
@@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
 import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.log4j.Level
 
@@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1175,6 +1182,86 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
+  testGluten("SPARK-32932: Do not use local shuffle read at final stage on 
write command") {
+    withSQLConf(
+      SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+    ) {
+      val data =
+        for (
+          i <- 1L to 10L;
+          j <- 1L to 3L
+        ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalread: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              noLocalread = collect(plan) {
+                case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalread)
+        noLocalread = false
+      }
+
+      // Test DataSource v2
+      val format = classOf[NoopDataSource].getName
+      df.write.format(format).mode("overwrite").save()
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(noLocalread)
+      noLocalread = false
+
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
deleted file mode 100644
index 10abca1c6d..0000000000
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode}
-import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
-
-class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait {
-
-  class WriterColumnarListener extends QueryExecutionListener {
-    var fakeRowAdaptor: Option[FakeRowAdaptor] = None
-
-    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
-      fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor 
=> f }
-    }
-
-    override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
-  }
-
-  testGluten("writing to noop") {
-    withTempDir {
-      dir =>
-        withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") {
-          spark.range(0, 
100).write.mode(SaveMode.Overwrite).parquet(dir.getPath)
-          val listener = new WriterColumnarListener
-          spark.listenerManager.register(listener)
-          try {
-            
spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save()
-            spark.sparkContext.listenerBus.waitUntilEmpty()
-            assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not 
found.")
-          } finally {
-            spark.listenerManager.unregister(listener)
-          }
-        }
-    }
-  }
-}
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 219eb0d0b9..f91841b991 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1119,6 +1119,9 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("Change merge join to broadcast join without local shuffle read")
     .exclude(
       "Avoid changing merge join to broadcast join if too many empty 
partitions on build plan")
+    .exclude("SPARK-32932: Do not use local shuffle read at final stage on 
write command")
+    .exclude(
+      "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of 
v2 write commands")
     .exclude("SPARK-37753: Allow changing outer join to broadcast join even if 
too many empty partitions on broadcast side")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index d3bc3846d8..72b77ae1f9 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -70,7 +70,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenSupportsCatalogOptionsSuite]
   enableSuite[GlutenTableCapabilityCheckSuite]
   enableSuite[GlutenWriteDistributionAndOrderingSuite]
-  enableSuite[GlutenWriterColumnarRulesSuite]
 
   enableSuite[GlutenQueryCompilationErrorsDSv2Suite]
 
@@ -191,14 +190,12 @@ class VeloxTestSettings extends BackendTestSettings {
       "SPARK-30403",
       "SPARK-30719",
       "SPARK-31384",
-      "SPARK-30953",
       "SPARK-31658",
       "SPARK-32717",
       "SPARK-32649",
       "SPARK-34533",
       "SPARK-34781",
       "SPARK-35585",
-      "SPARK-32932",
       "SPARK-33494",
       "SPARK-33933",
       "SPARK-31220",
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 441f3a60a3..779d264114 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.clickhouse
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformerBase}
 
 import org.apache.spark.SparkConf
@@ -24,14 +25,20 @@ import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, 
Row}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
 import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.logging.log4j.Level
 
@@ -40,7 +47,7 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1193,6 +1200,86 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
     }
   }
 
+  testGluten("SPARK-32932: Do not use local shuffle read at final stage on 
write command") {
+    withSQLConf(
+      SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+    ) {
+      val data =
+        for (
+          i <- 1L to 10L;
+          j <- 1L to 3L
+        ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalread: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              noLocalread = collect(plan) {
+                case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalread)
+        noLocalread = false
+      }
+
+      // Test DataSource v2
+      val format = classOf[NoopDataSource].getName
+      df.write.format(format).mode("overwrite").save()
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(noLocalread)
+      noLocalread = false
+
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 729a12f56c..f9f0723e00 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.velox
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformer}
 
 import org.apache.spark.SparkConf
@@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
 import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.logging.log4j.Level
 
@@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1179,6 +1186,86 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
+  testGluten("SPARK-32932: Do not use local shuffle read at final stage on 
write command") {
+    withSQLConf(
+      SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+    ) {
+      val data =
+        for (
+          i <- 1L to 10L;
+          j <- 1L to 3L
+        ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalread: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              noLocalread = collect(plan) {
+                case exec: AQEShuffleReadExec if exec.isLocalRead => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalread)
+        noLocalread = false
+      }
+
+      // Test DataSource v2
+      val format = classOf[NoopDataSource].getName
+      df.write.format(format).mode("overwrite").save()
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(noLocalread)
+      noLocalread = false
+
+      spark.listenerManager.unregister(listener)
+    }
+  }
+
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
deleted file mode 100644
index 10abca1c6d..0000000000
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode}
-import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.QueryExecutionListener
-
-class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait {
-
-  class WriterColumnarListener extends QueryExecutionListener {
-    var fakeRowAdaptor: Option[FakeRowAdaptor] = None
-
-    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
-      fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor 
=> f }
-    }
-
-    override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
-  }
-
-  testGluten("writing to noop") {
-    withTempDir {
-      dir =>
-        withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") {
-          spark.range(0, 
100).write.mode(SaveMode.Overwrite).parquet(dir.getPath)
-          val listener = new WriterColumnarListener
-          spark.listenerManager.register(listener)
-          try {
-            
spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save()
-            spark.sparkContext.listenerBus.waitUntilEmpty()
-            assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not 
found.")
-          } finally {
-            spark.listenerManager.unregister(listener)
-          }
-        }
-    }
-  }
-}
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 56c95ae1bd..9ebcadf531 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -983,6 +983,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("Change merge join to broadcast join without local shuffle read")
     .exclude(
       "Avoid changing merge join to broadcast join if too many empty 
partitions on build plan")
+    .exclude(
+      "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of 
v2 write commands")
     .exclude("SPARK-37753: Allow changing outer join to broadcast join even if 
too many empty partitions on broadcast side")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index cc9746dcdb..94d3a1f6e8 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalog
 import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, 
GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, 
GlutenQueryParsingErrorsSuite}
 import org.apache.spark.sql.execution.{FallbackStrategiesSuite, 
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, 
GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, 
GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, 
GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, 
GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
 import 
org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
-import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, 
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, 
GlutenFileFormatWriterSuite, GlutenFileIndexSuite, 
GlutenFileMetadataStructRowIndexSuite, GlutenFileMetadataStructSuite, 
GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, 
GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, 
GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, 
GlutenOrcCodecSuite,  [...]
+import org.apache.spark.sql.execution.datasources._
 import 
org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
 import 
org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite, 
GlutenCSVv1Suite, GlutenCSVv2Suite}
 import 
org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite
@@ -182,7 +182,6 @@ class VeloxTestSettings extends BackendTestSettings {
       "SPARK-30403",
       "SPARK-30719",
       "SPARK-31384",
-      "SPARK-30953",
       "SPARK-31658",
       "SPARK-32717",
       "SPARK-32649",
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 49d47fa65b..2bd5a96dad 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.clickhouse
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformer}
 
 import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.logging.log4j.Level
 
@@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1182,6 +1187,37 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
     }
   }
 
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 729a12f56c..6a3d6da27c 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.velox
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformer}
 
 import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.logging.log4j.Level
 
@@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index bdb160ad24..f482ad921e 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -983,6 +983,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("Change merge join to broadcast join without local shuffle read")
     .exclude(
       "Avoid changing merge join to broadcast join if too many empty 
partitions on build plan")
+    .exclude(
+      "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of 
v2 write commands")
     .exclude("SPARK-37753: Allow changing outer join to broadcast join even if 
too many empty partitions on broadcast side")
     .exclude("SPARK-29544: adaptive skew join with different join types")
     .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 71786c9132..73c4d43ced 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -185,7 +185,6 @@ class VeloxTestSettings extends BackendTestSettings {
       "SPARK-30403",
       "SPARK-30719",
       "SPARK-31384",
-      "SPARK-30953",
       "SPARK-31658",
       "SPARK-32717",
       "SPARK-32649",
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
index 2e5df7b859..bd941586d7 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.clickhouse
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformer}
 
 import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.logging.log4j.Level
 
@@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1197,6 +1202,37 @@ class ClickHouseAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with Glute
     }
   }
 
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 729a12f56c..6a3d6da27c 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.adaptive.velox
 
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer, 
SortMergeJoinExecTransformer}
 
 import org.apache.spark.SparkConf
@@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
+import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
@@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestData.TestData
 import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.QueryExecutionListener
 
 import org.apache.logging.log4j.Level
 
@@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
 
   override def sparkConf: SparkConf = {
     super.sparkConf
-      .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false")
+      .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false")
       .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
   }
 
@@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
+  testGluten(
+    "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 
write commands") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      var plan: SparkPlan = null
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
+          plan = qe.executedPlan
+        }
+        override def onFailure(
+            funcName: String,
+            qe: QueryExecution,
+            exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+      withTable("t1") {
+        val format = classOf[NoopDataSource].getName
+        Seq((0, 1)).toDF("x", 
"y").write.format(format).mode("overwrite").save()
+
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(plan.isInstanceOf[V2TableWriteExec])
+        val childPlan = plan.asInstanceOf[V2TableWriteExec].child
+        assert(childPlan.isInstanceOf[FakeRowAdaptor])
+        
assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec])
+
+        spark.listenerManager.unregister(listener)
+      }
+    }
+  }
+
   testGluten("SPARK-35650: Coalesce number of partitions by AEQ") {
     withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
       Seq("REPARTITION", "REBALANCE(key)")
diff --git 
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala
 
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala
new file mode 100644
index 0000000000..ebf17444e6
--- /dev/null
+++ 
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.datasources
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{GlutenQueryTest, SaveMode}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.util.QueryExecutionListener
+
+class GlutenNoopWriterRuleSuite extends GlutenQueryTest with 
SharedSparkSession {
+
+  override def sparkConf: SparkConf = {
+    val conf = super.sparkConf
+      .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+      .set("spark.default.parallelism", "1")
+      .set("spark.memory.offHeap.enabled", "true")
+      .set("spark.memory.offHeap.size", "1024MB")
+      .set("spark.ui.enabled", "false")
+      .set("spark.gluten.ui.enabled", "false")
+    if (BackendTestUtils.isCHBackendLoaded()) {
+      conf.set(GlutenConfig.GLUTEN_LIB_PATH, 
SystemParameters.getClickHouseLibPath)
+    }
+    conf
+  }
+
+  class WriterColumnarListener extends QueryExecutionListener {
+    var fakeRowAdaptor: Option[FakeRowAdaptor] = None
+
+    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+      fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor 
=> f }
+    }
+
+    override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
+  }
+
+  test("writing to noop") {
+    withTempDir {
+      dir =>
+        spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath)
+        val listener = new WriterColumnarListener
+        spark.listenerManager.register(listener)
+        try {
+          
spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save()
+          spark.sparkContext.listenerBus.waitUntilEmpty()
+          assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not 
found.")
+        } finally {
+          spark.listenerManager.unregister(listener)
+        }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to