This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 32e5fbae8e [GLUTEN-10713] Eliminate the c2r before iceberg partition 
write (#10714)
32e5fbae8e is described below

commit 32e5fbae8ee9e1df6371f6d950495ae3daaf6518
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Sep 19 16:56:52 2025 +0800

    [GLUTEN-10713] Eliminate the c2r before iceberg partition write (#10714)
    
    Currently, for iceberg partitioned write, an AQE node is inserted before 
the write node, which results in unnecessary c2r and r2c operations being added 
when we offload the native write
---
 .../gluten/component/CHIcebergComponent.scala      |  2 +-
 .../gluten/component/VeloxIcebergComponent.scala   |  3 +-
 .../gluten/execution/IcebergWriteJniWrapper.java   |  1 -
 .../OffloadIcebergWrite.scala                      |  3 +-
 .../execution/enhanced/VeloxIcebergSuite.scala     | 50 ++++++++++++++++++++--
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  3 ++
 .../OffloadIcebergScan.scala                       |  3 +-
 .../execution/ColumnarV2TableWriteExec.scala       |  3 ++
 .../extension/columnar/V2WritePostRule.scala       | 40 +++++++++++++++++
 .../extension/columnar/validator/Validators.scala  |  2 +
 10 files changed, 101 insertions(+), 9 deletions(-)

diff --git 
a/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
 
b/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
index 8ee694cefd..7da4d1ec90 100644
--- 
a/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
+++ 
b/backends-clickhouse/src-iceberg/main/scala/org/apache/gluten/component/CHIcebergComponent.scala
@@ -18,7 +18,7 @@
 package org.apache.gluten.component
 
 import org.apache.gluten.backendsapi.clickhouse.CHBackend
-import org.apache.gluten.execution.OffloadIcebergScan
+import org.apache.gluten.extension.OffloadIcebergScan
 import org.apache.gluten.extension.injector.Injector
 
 class CHIcebergComponent extends Component {
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
index 775c054ca3..c977e17ab4 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/component/VeloxIcebergComponent.scala
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 package org.apache.gluten.component
+
 import org.apache.gluten.backendsapi.velox.VeloxBackend
-import org.apache.gluten.execution.{OffloadIcebergScan, OffloadIcebergWrite}
+import org.apache.gluten.extension.{OffloadIcebergScan, OffloadIcebergWrite}
 import org.apache.gluten.extension.injector.Injector
 
 class VeloxIcebergComponent extends Component {
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
index 25f2e7697c..a9192952cf 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.execution;
 
-import org.apache.gluten.proto.IcebergPartitionSpec;
 import org.apache.gluten.runtime.Runtime;
 import org.apache.gluten.runtime.RuntimeAware;
 
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
similarity index 94%
rename from 
backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
rename to 
backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
index 1e2beff366..98135fae49 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution
+package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{VeloxIcebergAppendDataExec, 
VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec}
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index a7dfe4e79e..b7270e69a0 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.gluten.execution.enhanced
 
-import org.apache.gluten.execution.{IcebergSuite, VeloxIcebergAppendDataExec, 
VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite, 
VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, 
VeloxIcebergReplaceDataExec}
 import org.apache.gluten.tags.EnhancedFeaturesTest
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.execution.CommandResultExec
 import org.apache.spark.sql.gluten.TestUtils
 
@@ -84,8 +84,8 @@ class VeloxIcebergSuite extends IcebergSuite {
                   |tblproperties (
                   |  'format-version' = '2',
                   |  'write.delete.mode' = 'copy-on-write',
-                  |  'write.update.mode' = 'copy-on-writ',
-                  |  'write.merge.mode' = 'copy-on-writ'
+                  |  'write.update.mode' = 'copy-on-write',
+                  |  'write.merge.mode' = 'copy-on-write'
                   |);
                   |""".stripMargin)
 
@@ -217,4 +217,46 @@ class VeloxIcebergSuite extends IcebergSuite {
       )
     }
   }
+
+  test("check iceberg write c2r") {
+    withTable("iceberg_tbl") {
+      spark.sql("""
+                  |create table if not exists iceberg_tbl (a int, pt int) 
using iceberg
+                  |tblproperties (
+                  |  'format-version' = '2',
+                  |  'write.delete.mode' = 'copy-on-write',
+                  |  'write.update.mode' = 'copy-on-write',
+                  |  'write.merge.mode' = 'copy-on-write'
+                  |)
+                  |partitioned by (pt)
+                  |""".stripMargin)
+
+      def checkColumnarToRow(df: DataFrame, num: Int): Unit = {
+        assert(
+          collect(
+            
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
 {
+            case p if p.isInstanceOf[ColumnarToRowExecBase] => p
+          }.size == num)
+      }
+
+      // insert partitioned table
+      var df = spark.sql("insert into table iceberg_tbl values (1, 1), (2, 1), 
(3, 1), (4, 2)")
+      checkAnswer(
+        spark.sql("select * from iceberg_tbl order by a"),
+        Seq(Row(1, 1), Row(2, 1), Row(3, 1), Row(4, 2)))
+      checkColumnarToRow(df, 0)
+
+      // delete partitioned table
+      df = spark.sql("delete from iceberg_tbl where a = 1")
+      checkAnswer(
+        spark.sql("select * from iceberg_tbl order by a"),
+        Seq(Row(2, 1), Row(3, 1), Row(4, 2)))
+      checkColumnarToRow(df, 0)
+
+      // overwrite partitioned table
+      df = spark.sql("insert overwrite table iceberg_tbl values (5, 1)")
+      checkAnswer(spark.sql("select * from iceberg_tbl order by a"), 
Seq(Row(5, 1)))
+      checkColumnarToRow(df, 0)
+    }
+  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 50fced37ea..57d86a18d4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -21,6 +21,7 @@ import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{PreventBatchTypeMismatchInTableCache,
 RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, 
RewriteSubqueryBroadcast}
+import org.apache.gluten.extension.columnar.V2WritePostRule
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicTransform}
 import org.apache.gluten.extension.columnar.offload.{OffloadExchange, 
OffloadJoin, OffloadOthers}
@@ -122,6 +123,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
+    injector.injectPostTransform(_ => V2WritePostRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatchType))
 
     // Gluten columnar: Fallback policies.
@@ -223,6 +225,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
+    injector.injectPostTransform(_ => V2WritePostRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatchType))
     injector.injectPostTransform(c => RemoveTopmostColumnarToRow(c.session, 
c.caller.isAqe()))
     SparkShimLoader.getSparkShims
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/extension/OffloadIcebergScan.scala
similarity index 95%
rename from 
gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
rename to 
gluten-iceberg/src/main/scala/org/apache/gluten/extension/OffloadIcebergScan.scala
index 44b0597503..c6ff8e8b6c 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/extension/OffloadIcebergScan.scala
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution
+package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.IcebergScanTransformer
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
index 7265814de7..0479a2d36d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.write.{BatchWrite, WriterCommitMessage}
 import org.apache.spark.sql.datasources.v2.{DataWritingColumnarBatchSparkTask, 
DataWritingColumnarBatchSparkTaskResult, StreamWriterCommitProgressUtil, 
WritingColumnarBatchSparkTask}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
@@ -34,6 +35,8 @@ import org.apache.spark.util.LongAccumulator
 
 trait ColumnarV2TableWriteExec extends V2ExistingTableWriteExec with 
ValidatablePlan {
 
+  def withNewQuery(newQuery: SparkPlan): SparkPlan = 
withNewChildInternal(newQuery)
+
   protected def createFactory(schema: StructType): 
ColumnarBatchDataWriterFactory
 
   override protected def run(): Seq[InternalRow] = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/V2WritePostRule.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/V2WritePostRule.scala
new file mode 100644
index 0000000000..d7a4a4e4cc
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/V2WritePostRule.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.execution.ColumnarV2TableWriteExec
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+
+case class V2WritePostRule() extends Rule[SparkPlan] {
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+    case write: ColumnarV2TableWriteExec =>
+      /**
+       * If the columnar write's child is aqe, we make aqe "support columnar", 
then aqe itself will
+       * guarantee to generate columnar outputs. thus avoiding the case of 
c2r->aqe->r2c->writer.
+       */
+      write.query match {
+        case aqe: AdaptiveSparkPlanExec if !aqe.supportsColumnar =>
+          write.withNewQuery(aqe.copy(supportsColumnar = true))
+        case _ => write
+      }
+    case other => other
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 3be033451e..2eda0ee12c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -158,6 +158,8 @@ object Validators {
       case p: ShuffleExchangeExec if !glutenConf.enableColumnarShuffle => 
fail(p)
       case p: BroadcastExchangeExec if 
!glutenConf.enableColumnarBroadcastExchange => fail(p)
       case p: AppendDataExec if !glutenConf.enableAppendData => fail(p)
+      case p: ReplaceDataExec if !glutenConf.enableReplaceData => fail(p)
+      case p: OverwriteByExpressionExec if 
!glutenConf.enableOverwriteByExpression => fail(p)
       case p @ (_: LocalLimitExec | _: GlobalLimitExec) if 
!glutenConf.enableColumnarLimit =>
         fail(p)
       case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to