This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8456498eb5 [GLUTEN-10457][VL] Iceberg supports copy on write (#10458)
8456498eb5 is described below

commit 8456498eb529c7c562ff3b598703ff12e6e28b8c
Author: Jin Chengcheng <[email protected]>
AuthorDate: Wed Aug 20 14:55:45 2025 +0100

    [GLUTEN-10457][VL] Iceberg supports copy on write (#10458)
---
 .../gluten/execution/OffloadIcebergWrite.scala     | 28 +++++++-----
 .../execution/VeloxIcebergReplaceDataExec.scala    | 51 ++++++++++++++++++++++
 .../execution/enhanced/VeloxIcebergSuite.scala     | 44 ++++++++++++++++++-
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  3 ++
 docs/Configuration.md                              |  1 +
 .../gluten/backendsapi/BackendSettingsApi.scala    |  2 +
 .../org/apache/gluten/config/GlutenConfig.scala    |  9 ++++
 .../extension/columnar/validator/Validators.scala  |  3 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 31 +++++++++++++
 9 files changed, 160 insertions(+), 12 deletions(-)

diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
index 1b8aff0208..f8f7abdaa4 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
@@ -24,7 +24,7 @@ import 
org.apache.gluten.extension.columnar.validator.Validators
 import org.apache.gluten.extension.injector.Injector
 
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2.AppendDataExec
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
ReplaceDataExec}
 
 case class OffloadIcebergWrite() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
@@ -34,25 +34,33 @@ case class OffloadIcebergWrite() extends OffloadSingleNode {
   }
 }
 
+case class OffloadIcebergDelete() extends OffloadSingleNode {
+  override def offload(plan: SparkPlan): SparkPlan = plan match {
+    case r: ReplaceDataExec =>
+      VeloxIcebergReplaceDataExec(r)
+    case other => other
+  }
+}
+
 object OffloadIcebergWrite {
   def inject(injector: Injector): Unit = {
     // Inject legacy rule.
     injector.gluten.legacy.injectTransform {
       c =>
-        val offload = Seq(OffloadIcebergWrite())
+        val offload = Seq(OffloadIcebergWrite(), OffloadIcebergDelete())
         HeuristicTransform.Simple(
           Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
           offload
         )
     }
 
-    // Inject RAS rule.
-    injector.gluten.ras.injectRasRule {
-      c =>
-        RasOffload.Rule(
-          RasOffload.from[AppendDataExec](OffloadIcebergWrite()),
-          Validators.newValidator(new GlutenConfig(c.sqlConf)),
-          Nil)
-    }
+    val offloads: Seq[RasOffload] = Seq(
+      RasOffload.from[AppendDataExec](OffloadIcebergWrite()),
+      RasOffload.from[ReplaceDataExec](OffloadIcebergDelete())
+    )
+    offloads.foreach(
+      offload =>
+        injector.gluten.ras.injectRasRule(
+          c => RasOffload.Rule(offload, Validators.newValidator(new 
GlutenConfig(c.sqlConf)), Nil)))
   }
 }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergReplaceDataExec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergReplaceDataExec.scala
new file mode 100644
index 0000000000..720ee09daf
--- /dev/null
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergReplaceDataExec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory, 
IcebergDataWriteFactory}
+
+import org.apache.spark.sql.connector.write.Write
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.types.StructType
+
+import org.apache.iceberg.spark.source.IcebergWriteUtil
+
+case class VeloxIcebergReplaceDataExec(query: SparkPlan, refreshCache: () => 
Unit, write: Write)
+  extends IcebergAppendDataExec {
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
IcebergAppendDataExec =
+    copy(query = newChild)
+
+  override def createFactory(schema: StructType): 
ColumnarBatchDataWriterFactory =
+    IcebergDataWriteFactory(
+      schema,
+      getFileFormat(IcebergWriteUtil.getFileFormat(write)),
+      IcebergWriteUtil.getDirectory(write),
+      getCodec,
+      getPartitionSpec)
+}
+
+object VeloxIcebergReplaceDataExec {
+  def apply(original: ReplaceDataExec): VeloxIcebergReplaceDataExec = {
+    VeloxIcebergReplaceDataExec(
+      original.query,
+      original.refreshCache,
+      original.write
+    )
+  }
+}
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index affb30040a..06c8ee0c0a 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.enhanced
 
-import org.apache.gluten.execution.{IcebergAppendDataExec, IcebergSuite}
+import org.apache.gluten.execution.{IcebergAppendDataExec, IcebergSuite, 
VeloxIcebergReplaceDataExec}
 import org.apache.gluten.tags.EnhancedFeaturesTest
 
 import org.apache.spark.sql.execution.CommandResultExec
@@ -71,4 +71,46 @@ class VeloxIcebergSuite extends IcebergSuite {
       assert(result(0).get(1) == 189)
     }
   }
+
+  test("iceberg read cow table - delete") {
+    withTable("iceberg_cow_tb") {
+      spark.sql("""
+                  |create table iceberg_cow_tb (
+                  |  id int,
+                  |  name string,
+                  |  p string
+                  |) using iceberg
+                  |tblproperties (
+                  |  'format-version' = '2',
+                  |  'write.delete.mode' = 'copy-on-write',
+                  |  'write.update.mode' = 'copy-on-writ',
+                  |  'write.merge.mode' = 'copy-on-writ'
+                  |);
+                  |""".stripMargin)
+
+      // Insert some test rows.
+      spark.sql("""
+                  |insert into table iceberg_cow_tb
+                  |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+                  |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+                  |""".stripMargin)
+
+      // Delete row.
+      val df = spark.sql(
+        """
+          |delete from iceberg_cow_tb where name = 'a1';
+          |""".stripMargin
+      )
+      assert(
+        df.queryExecution.executedPlan
+          .asInstanceOf[CommandResultExec]
+          .commandPhysicalPlan
+          .isInstanceOf[VeloxIcebergReplaceDataExec])
+      val selectDf = spark.sql("""
+                                 |select * from iceberg_cow_tb;
+                                 |""".stripMargin)
+      val result = selectDf.collect()
+      assert(result.length == 5)
+    }
+  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index bb6b7597bc..9821efa66e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -559,4 +559,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def supportAppendDataExec(): Boolean =
     GlutenConfig.get.enableAppendData && enableEnhancedFeatures()
+
+  override def supportReplaceDataExec(): Boolean =
+    GlutenConfig.get.enableReplaceData && enableEnhancedFeatures()
 }
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 4b1013f8e2..082ae13945 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -105,6 +105,7 @@ nav_order: 15
 | spark.gluten.sql.columnar.project.collapse                         | true    
          | Combines two columnar project operators into one and perform alias 
substitution                                                                    
                                                                                
                                                                                
                                                                                
               [...]
 | spark.gluten.sql.columnar.query.fallback.threshold                 | -1      
          | The threshold for whether query will fall back by counting the 
number of ColumnarToRow & vanilla leaf node.                                    
                                                                                
                                                                                
                                                                                
                   [...]
 | spark.gluten.sql.columnar.range                                    | true    
          | Enable or disable columnar range.                                   
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| spark.gluten.sql.columnar.replaceData                              | true    
          | Enable or disable columnar v2 command replace data.                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.scanOnly                                 | false   
          | When enabled, only scan and the filter after scan will be offloaded 
to native.                                                                      
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.shuffle                                  | true    
          | Enable or disable columnar shuffle.                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled        | true    
          | If enabled, fall back to ColumnarShuffleManager when celeborn 
service is unavailable.Otherwise, throw an exception.                           
                                                                                
                                                                                
                                                                                
                    [...]
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 4f917de7f4..f4b9c46df1 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -161,4 +161,6 @@ trait BackendSettingsApi {
   def enableEnhancedFeatures(): Boolean = false
 
   def supportAppendDataExec(): Boolean = false
+
+  def supportReplaceDataExec(): Boolean = false
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 07a862e175..9d91bb6379 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -93,6 +93,8 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
 
   def enableAppendData: Boolean = getConf(COLUMNAR_APPEND_DATA_ENABLED)
 
+  def enableReplaceData: Boolean = getConf(COLUMNAR_REPLACE_DATA_ENABLED)
+
   def enableColumnarShuffledHashJoin: Boolean = 
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
 
   def shuffledHashJoinOptimizeBuildSide: Boolean =
@@ -878,6 +880,13 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
+  val COLUMNAR_REPLACE_DATA_ENABLED =
+    buildConf("spark.gluten.sql.columnar.replaceData")
+      .internal()
+      .doc("Enable or disable columnar v2 command replace data.")
+      .booleanConf
+      .createWithDefault(true)
+
   val COLUMNAR_PREFER_STREAMING_AGGREGATE =
     buildConf("spark.gluten.sql.columnar.preferStreamingAggregate")
       .internal()
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 57df0aa95d..33f4927017 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
BatchScanExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
BatchScanExec, ReplaceDataExec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.window.WindowExec
@@ -137,6 +137,7 @@ object Validators {
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
       case p: AppendDataExec if !settings.supportAppendDataExec() => fail(p)
+      case p: ReplaceDataExec if !settings.supportReplaceDataExec() => fail(p)
       case _ => pass()
     }
   }
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
new file mode 100644
index 0000000000..8084836510
--- /dev/null
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.connector.write.Write
+import org.apache.spark.sql.execution.SparkPlan
+
+/** Physical plan node to replace data in existing tables. */
+case class ReplaceDataExec(query: SparkPlan, refreshCache: () => Unit, write: 
Write)
+  extends V2ExistingTableWriteExec {
+
+  override val stringArgs: Iterator[Any] = Iterator(query, write)
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
ReplaceDataExec = {
+    copy(query = newChild)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to