This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8456498eb5 [GLUTEN-10457][VL] Iceberg supports copy on write (#10458)
8456498eb5 is described below
commit 8456498eb529c7c562ff3b598703ff12e6e28b8c
Author: Jin Chengcheng <[email protected]>
AuthorDate: Wed Aug 20 14:55:45 2025 +0100
[GLUTEN-10457][VL] Iceberg supports copy on write (#10458)
---
.../gluten/execution/OffloadIcebergWrite.scala | 28 +++++++-----
.../execution/VeloxIcebergReplaceDataExec.scala | 51 ++++++++++++++++++++++
.../execution/enhanced/VeloxIcebergSuite.scala | 44 ++++++++++++++++++-
.../gluten/backendsapi/velox/VeloxBackend.scala | 3 ++
docs/Configuration.md | 1 +
.../gluten/backendsapi/BackendSettingsApi.scala | 2 +
.../org/apache/gluten/config/GlutenConfig.scala | 9 ++++
.../extension/columnar/validator/Validators.scala | 3 +-
.../datasources/v2/WriteToDataSourceV2Exec.scala | 31 +++++++++++++
9 files changed, 160 insertions(+), 12 deletions(-)
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
index 1b8aff0208..f8f7abdaa4 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/OffloadIcebergWrite.scala
@@ -24,7 +24,7 @@ import
org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2.AppendDataExec
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
ReplaceDataExec}
case class OffloadIcebergWrite() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
@@ -34,25 +34,33 @@ case class OffloadIcebergWrite() extends OffloadSingleNode {
}
}
+case class OffloadIcebergDelete() extends OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = plan match {
+ case r: ReplaceDataExec =>
+ VeloxIcebergReplaceDataExec(r)
+ case other => other
+ }
+}
+
object OffloadIcebergWrite {
def inject(injector: Injector): Unit = {
// Inject legacy rule.
injector.gluten.legacy.injectTransform {
c =>
- val offload = Seq(OffloadIcebergWrite())
+ val offload = Seq(OffloadIcebergWrite(), OffloadIcebergDelete())
HeuristicTransform.Simple(
Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
offload
)
}
- // Inject RAS rule.
- injector.gluten.ras.injectRasRule {
- c =>
- RasOffload.Rule(
- RasOffload.from[AppendDataExec](OffloadIcebergWrite()),
- Validators.newValidator(new GlutenConfig(c.sqlConf)),
- Nil)
- }
+ val offloads: Seq[RasOffload] = Seq(
+ RasOffload.from[AppendDataExec](OffloadIcebergWrite()),
+ RasOffload.from[ReplaceDataExec](OffloadIcebergDelete())
+ )
+ offloads.foreach(
+ offload =>
+ injector.gluten.ras.injectRasRule(
+ c => RasOffload.Rule(offload, Validators.newValidator(new
GlutenConfig(c.sqlConf)), Nil)))
}
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergReplaceDataExec.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergReplaceDataExec.scala
new file mode 100644
index 0000000000..720ee09daf
--- /dev/null
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergReplaceDataExec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory,
IcebergDataWriteFactory}
+
+import org.apache.spark.sql.connector.write.Write
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.types.StructType
+
+import org.apache.iceberg.spark.source.IcebergWriteUtil
+
+case class VeloxIcebergReplaceDataExec(query: SparkPlan, refreshCache: () =>
Unit, write: Write)
+ extends IcebergAppendDataExec {
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
IcebergAppendDataExec =
+ copy(query = newChild)
+
+ override def createFactory(schema: StructType):
ColumnarBatchDataWriterFactory =
+ IcebergDataWriteFactory(
+ schema,
+ getFileFormat(IcebergWriteUtil.getFileFormat(write)),
+ IcebergWriteUtil.getDirectory(write),
+ getCodec,
+ getPartitionSpec)
+}
+
+object VeloxIcebergReplaceDataExec {
+ def apply(original: ReplaceDataExec): VeloxIcebergReplaceDataExec = {
+ VeloxIcebergReplaceDataExec(
+ original.query,
+ original.refreshCache,
+ original.write
+ )
+ }
+}
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index affb30040a..06c8ee0c0a 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.enhanced
-import org.apache.gluten.execution.{IcebergAppendDataExec, IcebergSuite}
+import org.apache.gluten.execution.{IcebergAppendDataExec, IcebergSuite,
VeloxIcebergReplaceDataExec}
import org.apache.gluten.tags.EnhancedFeaturesTest
import org.apache.spark.sql.execution.CommandResultExec
@@ -71,4 +71,46 @@ class VeloxIcebergSuite extends IcebergSuite {
assert(result(0).get(1) == 189)
}
}
+
+ test("iceberg read cow table - delete") {
+ withTable("iceberg_cow_tb") {
+ spark.sql("""
+ |create table iceberg_cow_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'copy-on-write',
+ | 'write.update.mode' = 'copy-on-writ',
+ | 'write.merge.mode' = 'copy-on-writ'
+ |);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql("""
+ |insert into table iceberg_cow_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+ | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+ |""".stripMargin)
+
+ // Delete row.
+ val df = spark.sql(
+ """
+ |delete from iceberg_cow_tb where name = 'a1';
+ |""".stripMargin
+ )
+ assert(
+ df.queryExecution.executedPlan
+ .asInstanceOf[CommandResultExec]
+ .commandPhysicalPlan
+ .isInstanceOf[VeloxIcebergReplaceDataExec])
+ val selectDf = spark.sql("""
+ |select * from iceberg_cow_tb;
+ |""".stripMargin)
+ val result = selectDf.collect()
+ assert(result.length == 5)
+ }
+ }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index bb6b7597bc..9821efa66e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -559,4 +559,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportAppendDataExec(): Boolean =
GlutenConfig.get.enableAppendData && enableEnhancedFeatures()
+
+ override def supportReplaceDataExec(): Boolean =
+ GlutenConfig.get.enableReplaceData && enableEnhancedFeatures()
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 4b1013f8e2..082ae13945 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -105,6 +105,7 @@ nav_order: 15
| spark.gluten.sql.columnar.project.collapse | true
| Combines two columnar project operators into one and perform alias
substitution
[...]
| spark.gluten.sql.columnar.query.fallback.threshold | -1
| The threshold for whether query will fall back by counting the
number of ColumnarToRow & vanilla leaf node.
[...]
| spark.gluten.sql.columnar.range | true
| Enable or disable columnar range.
[...]
+| spark.gluten.sql.columnar.replaceData | true
| Enable or disable columnar v2 command replace data.
[...]
| spark.gluten.sql.columnar.scanOnly | false
| When enabled, only scan and the filter after scan will be offloaded
to native.
[...]
| spark.gluten.sql.columnar.shuffle | true
| Enable or disable columnar shuffle.
[...]
| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true
| If enabled, fall back to ColumnarShuffleManager when celeborn
service is unavailable.Otherwise, throw an exception.
[...]
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 4f917de7f4..f4b9c46df1 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -161,4 +161,6 @@ trait BackendSettingsApi {
def enableEnhancedFeatures(): Boolean = false
def supportAppendDataExec(): Boolean = false
+
+ def supportReplaceDataExec(): Boolean = false
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 07a862e175..9d91bb6379 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -93,6 +93,8 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def enableAppendData: Boolean = getConf(COLUMNAR_APPEND_DATA_ENABLED)
+ def enableReplaceData: Boolean = getConf(COLUMNAR_REPLACE_DATA_ENABLED)
+
def enableColumnarShuffledHashJoin: Boolean =
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
def shuffledHashJoinOptimizeBuildSide: Boolean =
@@ -878,6 +880,13 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
+ val COLUMNAR_REPLACE_DATA_ENABLED =
+ buildConf("spark.gluten.sql.columnar.replaceData")
+ .internal()
+ .doc("Enable or disable columnar v2 command replace data.")
+ .booleanConf
+ .createWithDefault(true)
+
val COLUMNAR_PREFER_STREAMING_AGGREGATE =
buildConf("spark.gluten.sql.columnar.preferStreamingAggregate")
.internal()
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 57df0aa95d..33f4927017 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
BatchScanExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
BatchScanExec, ReplaceDataExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.window.WindowExec
@@ -137,6 +137,7 @@ object Validators {
case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
case p: AppendDataExec if !settings.supportAppendDataExec() => fail(p)
+ case p: ReplaceDataExec if !settings.supportReplaceDataExec() => fail(p)
case _ => pass()
}
}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
new file mode 100644
index 0000000000..8084836510
--- /dev/null
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.connector.write.Write
+import org.apache.spark.sql.execution.SparkPlan
+
+/** Physical plan node to replace data in existing tables. */
+case class ReplaceDataExec(query: SparkPlan, refreshCache: () => Unit, write:
Write)
+ extends V2ExistingTableWriteExec {
+
+ override val stringArgs: Iterator[Any] = Iterator(query, write)
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
ReplaceDataExec = {
+ copy(query = newChild)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]