This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c7cd7741bf [VL] Support Iceberg WriteToDataSourceV2 (#11533)
c7cd7741bf is described below

commit c7cd7741bf4443676f9cb10ffbd435de2182264d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Feb 5 15:30:38 2026 +0800

    [VL] Support Iceberg WriteToDataSourceV2 (#11533)
    
    What changes are proposed in this pull request?
    Support WriteToDataSourceV2 for Iceberg, which is used by 
WriteToMicroBatchDataSource
    
    Introduce ColumnarStreamingDataWriterFactory companion with Spark's 
StreamingDataWriterFactory
    Introduce ColumnarMicroBatchWriterFactory companion with Spark's 
MicroBatchWriterFactory
    Offload Spark's WriteToDataSourceV2Exec to 
VeloxIcebergWriteToDataSourceV2Exec
    image
    How was this patch tested?
    Add case test("iceberg stream write to table")
    
    Was this patch authored or co-authored using generative AI tooling?
    No
---
 .../connector/write/IcebergDataWriteFactory.scala  | 13 +++-
 .../execution/AbstractIcebergWriteExec.scala       | 15 +++-
 .../VeloxIcebergWriteToDataSourceV2Exec.scala      | 84 ++++++++++++++++++++++
 .../gluten/extension/OffloadIcebergWrite.scala     | 19 +++--
 .../execution/enhanced/VeloxIcebergSuite.scala     | 38 +++++++++-
 .../org/apache/spark/sql/gluten/TestUtils.scala    | 10 ++-
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  2 +
 docs/Configuration.md                              |  1 +
 .../write/ColumnarMicroBatchWriterFactory.java     | 42 +++++++++++
 .../write/ColumnarStreamingDataWriterFactory.java  | 55 ++++++++++++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |  2 +
 .../org/apache/gluten/config/GlutenConfig.scala    |  8 +++
 .../execution/ColumnarV2TableWriteExec.scala       | 29 ++++++--
 .../extension/columnar/validator/Validators.scala  |  4 +-
 14 files changed, 303 insertions(+), 19 deletions(-)

diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
index a9b6ec04c2..731a19330b 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
@@ -44,7 +44,12 @@ case class IcebergDataWriteFactory(
     sortOrder: SortOrder,
     field: IcebergNestedField,
     queryId: String)
-  extends ColumnarBatchDataWriterFactory {
+  extends ColumnarBatchDataWriterFactory
+  with ColumnarStreamingDataWriterFactory {
+
+  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[ColumnarBatch] = {
+    createWriter(partitionId, taskId, 0)
+  }
 
   /**
    * Returns a data writer to do the actual writing work. Note that, Spark 
will reuse the same data
@@ -53,7 +58,10 @@ case class IcebergDataWriteFactory(
    * <p> If this method fails (by throwing an exception), the corresponding 
Spark write task would
    * fail and get retried until hitting the maximum retry times.
    */
-  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[ColumnarBatch] = {
+  override def createWriter(
+      partitionId: Int,
+      taskId: Long,
+      epochId: Long): DataWriter[ColumnarBatch] = {
     val fields = partitionSpec
       .fields()
       .stream()
@@ -64,7 +72,6 @@ case class IcebergDataWriteFactory(
       .setSpecId(partitionSpec.specId())
       .addAllFields(fields)
       .build()
-    val epochId = 0
     val operationId = queryId + "-" + epochId
     val (writerHandle, jniWrapper) =
       getJniWrapper(
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
index fb1b25856c..12ed90bdd6 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.IcebergNestedFieldVisitor
-import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory, 
IcebergDataWriteFactory}
+import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory, 
ColumnarStreamingDataWriterFactory, IcebergDataWriteFactory}
 
 import org.apache.spark.sql.types.StructType
 
@@ -26,7 +26,8 @@ import org.apache.iceberg.types.TypeUtil
 
 abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
 
-  override protected def createFactory(schema: StructType): 
ColumnarBatchDataWriterFactory = {
+  // the writer factory works for both batch and streaming
+  private def createIcebergDataWriteFactory(schema: StructType): 
IcebergDataWriteFactory = {
     val writeSchema = IcebergWriteUtil.getWriteSchema(write)
     val nestedField = TypeUtil.visit(writeSchema, new 
IcebergNestedFieldVisitor)
     IcebergDataWriteFactory(
@@ -40,4 +41,14 @@ abstract class AbstractIcebergWriteExec extends 
IcebergWriteExec {
       IcebergWriteUtil.getQueryId(write)
     )
   }
+
+  override protected def createBatchWriterFactory(
+      schema: StructType): ColumnarBatchDataWriterFactory = {
+    createIcebergDataWriteFactory(schema)
+  }
+
+  override protected def createStreamingWriterFactory(
+      schema: StructType): ColumnarStreamingDataWriterFactory = {
+    createIcebergDataWriteFactory(schema)
+  }
 }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergWriteToDataSourceV2Exec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergWriteToDataSourceV2Exec.scala
new file mode 100644
index 0000000000..37dce8525c
--- /dev/null
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergWriteToDataSourceV2Exec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.sql.connector.metric.CustomMetric
+import org.apache.spark.sql.connector.write.BatchWrite
+import org.apache.spark.sql.connector.write.Write
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite
+
+import org.apache.iceberg.spark.source.IcebergWriteUtil.supportsWrite
+
+case class VeloxIcebergWriteToDataSourceV2Exec(
+    query: SparkPlan,
+    refreshCache: () => Unit,
+    write: Write,
+    override val batchWrite: BatchWrite,
+    writeMetrics: Seq[CustomMetric]
+) extends AbstractIcebergWriteExec {
+
+  override val customMetrics: Map[String, SQLMetric] = {
+    writeMetrics.map {
+      customMetric =>
+        customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, 
customMetric)
+    }.toMap ++ 
BackendsApiManager.getMetricsApiInstance.genBatchWriteMetrics(sparkContext)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
IcebergWriteExec =
+    copy(query = newChild)
+}
+
+object VeloxIcebergWriteToDataSourceV2Exec {
+
+  private def extractOuterWrite(batchWrite: BatchWrite): Option[Write] = {
+    batchWrite match {
+      case microBatchWrite: MicroBatchWrite =>
+        try {
+          val streamWrite = microBatchWrite.writeSupport
+          val outerClassField = streamWrite.getClass.getDeclaredField("this$0")
+          outerClassField.setAccessible(true)
+          outerClassField.get(streamWrite) match {
+            case write: Write => Some(write)
+            case _ => None
+          }
+        } catch {
+          case _: Throwable => None
+        }
+      case _ => None
+    }
+  }
+
+  def apply(original: WriteToDataSourceV2Exec): 
Option[VeloxIcebergWriteToDataSourceV2Exec] = {
+    extractOuterWrite(original.batchWrite)
+      .filter(supportsWrite)
+      .map {
+        write =>
+          VeloxIcebergWriteToDataSourceV2Exec(
+            original.query,
+            original.refreshCache,
+            write,
+            original.batchWrite,
+            original.writeMetrics
+          )
+      }
+  }
+}
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
index 1b00c1a788..5d6e09cfc9 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{VeloxIcebergAppendDataExec, 
VeloxIcebergOverwriteByExpressionExec, 
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution._
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -25,7 +25,7 @@ import 
org.apache.gluten.extension.columnar.validator.Validators
 import org.apache.gluten.extension.injector.Injector
 
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2._
 
 import org.apache.iceberg.spark.source.IcebergWriteUtil.supportsWrite
 
@@ -61,6 +61,14 @@ case class OffloadIcebergOverwritePartitionsDynamic() 
extends OffloadSingleNode
   }
 }
 
+case class OffloadIcebergWriteToDataSourceV2() extends OffloadSingleNode {
+  override def offload(plan: SparkPlan): SparkPlan = plan match {
+    case r: WriteToDataSourceV2Exec =>
+      VeloxIcebergWriteToDataSourceV2Exec(r).getOrElse(r)
+    case other => other
+  }
+}
+
 object OffloadIcebergWrite {
   def inject(injector: Injector): Unit = {
     // Inject legacy rule.
@@ -70,7 +78,9 @@ object OffloadIcebergWrite {
           OffloadIcebergAppend(),
           OffloadIcebergReplaceData(),
           OffloadIcebergOverwrite(),
-          OffloadIcebergOverwritePartitionsDynamic())
+          OffloadIcebergOverwritePartitionsDynamic(),
+          OffloadIcebergWriteToDataSourceV2()
+        )
         HeuristicTransform.Simple(
           Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
           offload
@@ -81,7 +91,8 @@ object OffloadIcebergWrite {
       RasOffload.from[AppendDataExec](OffloadIcebergAppend()),
       RasOffload.from[ReplaceDataExec](OffloadIcebergReplaceData()),
       RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite()),
-      
RasOffload.from[OverwritePartitionsDynamicExec](OffloadIcebergOverwritePartitionsDynamic())
+      
RasOffload.from[OverwritePartitionsDynamicExec](OffloadIcebergOverwritePartitionsDynamic()),
+      
RasOffload.from[WriteToDataSourceV2Exec](OffloadIcebergWriteToDataSourceV2())
     )
     offloads.foreach(
       offload =>
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index a3a2bc2f2f..c3d3c8edc6 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -16,16 +16,19 @@
  */
 package org.apache.gluten.execution.enhanced
 
-import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite, 
VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, 
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution._
 import org.apache.gluten.tags.EnhancedFeaturesTest
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.gluten.TestUtils
 
 @EnhancedFeaturesTest
 class VeloxIcebergSuite extends IcebergSuite {
 
+  import testImplicits._
+
   test("iceberg insert") {
     withTable("iceberg_tb2") {
       spark.sql("""
@@ -347,4 +350,37 @@ class VeloxIcebergSuite extends IcebergSuite {
         s"File name does not match expected format: $fileName")
     }
   }
+
+  test("iceberg stream write to table") {
+    withTable("iceberg_tbl") {
+      withTempDir {
+        checkpointDir =>
+          spark.sql("CREATE TABLE iceberg_tbl (a INT, b STRING) USING iceberg")
+          
TestUtils.checkExecutedPlanContains[VeloxIcebergWriteToDataSourceV2Exec](spark) 
{
+            val inputData = MemoryStream[(Int, String)]
+            val stream = inputData
+              .toDS()
+              .toDF("a", "b")
+              .writeStream
+              .option("checkpointLocation", checkpointDir.getCanonicalPath)
+              .format("iceberg")
+              .toTable("iceberg_tbl")
+
+            val query = () => spark.sql("SELECT * FROM iceberg_tbl ORDER BY a")
+            try {
+              inputData.addData((1, "a"))
+              stream.processAllAvailable()
+              checkAnswer(query(), Seq(Row(1, "a")))
+
+              inputData.addData((2, "b"))
+              stream.processAllAvailable()
+              checkAnswer(query(), Seq(Row(1, "a"), Row(2, "b")))
+            } finally {
+              stream.stop()
+            }
+          }
+
+      }
+    }
+  }
 }
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
index 587c064b9c..dbd234e248 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
@@ -25,18 +25,24 @@ import scala.reflect.ClassTag
 object TestUtils {
 
   def checkExecutedPlanContains[T: ClassTag](spark: SparkSession, sqlStr: 
String): Unit = {
+    checkExecutedPlanContains(spark) {
+      spark.sql(sqlStr)
+    }
+  }
+
+  def checkExecutedPlanContains[T: ClassTag](spark: SparkSession)(action: => 
Unit): Unit = {
     var found = false
     val queryListener = new QueryExecutionListener {
       override def onFailure(f: String, qe: QueryExecution, e: Exception): 
Unit = {}
       override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
         if (!found) {
-          found = 
qe.executedPlan.find(implicitly[ClassTag[T]].runtimeClass.isInstance(_)).isDefined
+          found = 
qe.executedPlan.exists(implicitly[ClassTag[T]].runtimeClass.isInstance(_))
         }
       }
     }
     try {
       spark.listenerManager.register(queryListener)
-      spark.sql(sqlStr)
+      action
       spark.sparkContext.listenerBus.waitUntilEmpty()
       assert(found)
     } finally {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 0698c72426..8a1a343087 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -590,6 +590,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def supportOverwritePartitionsDynamic(): Boolean = 
enableEnhancedFeatures()
 
+  override def supportWriteToDataSourceV2(): Boolean = enableEnhancedFeatures()
+
   /** Velox does not support columnar shuffle with empty schema. */
   override def supportEmptySchemaColumnarShuffle(): Boolean = false
 }
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 8ef5060351..1372d98243 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -111,6 +111,7 @@ nav_order: 15
 | spark.gluten.sql.columnar.wholeStage.fallback.threshold            | -1      
          | The threshold for whether whole stage will fall back in AQE 
supported case by counting the number of ColumnarToRow & vanilla leaf node.     
                                                                                
                                                                                
                                                                   |
 | spark.gluten.sql.columnar.window                                   | true    
          | Enable or disable columnar window.                                  
                                                                                
                                                                                
                                                                                
                                                           |
 | spark.gluten.sql.columnar.window.group.limit                       | true    
          | Enable or disable columnar window group limit.                      
                                                                                
                                                                                
                                                                                
                                                           |
+| spark.gluten.sql.columnar.writeToDataSourceV2                      | true    
          | Enable or disable columnar v2 command write to data source v2.      
                                                                                
                                                                                
                                                                                
                                                           |
 | spark.gluten.sql.columnarSampleEnabled                             | false   
          | Disable or enable columnar sample.                                  
                                                                                
                                                                                
                                                                                
                                                           |
 | spark.gluten.sql.columnarToRowMemoryThreshold                      | 64MB    
          |
 | spark.gluten.sql.countDistinctWithoutExpand                        | false   
          | Convert Count Distinct to a UDAF called count_distinct to prevent 
SparkPlanner converting it to Expand+Count. WARNING: When enabled, count 
distinct queries will fail to fallback!!!                                       
                                                                                
                                                                    |
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarMicroBatchWriterFactory.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarMicroBatchWriterFactory.java
new file mode 100644
index 0000000000..b2874e668b
--- /dev/null
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarMicroBatchWriterFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.connector.write;
+
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * ColumnarMicroBatchWriterFactory is used to create ColumnarMicroBatchWriter.
+ *
+ * <p>It is used in micro-batch mode.
+ */
+public class ColumnarMicroBatchWriterFactory implements 
ColumnarBatchDataWriterFactory {
+
+  private final long epochId;
+  private final ColumnarStreamingDataWriterFactory streamingWriterFactory;
+
+  public ColumnarMicroBatchWriterFactory(
+      long epochId, ColumnarStreamingDataWriterFactory streamingWriterFactory) 
{
+    this.epochId = epochId;
+    this.streamingWriterFactory = streamingWriterFactory;
+  }
+
+  @Override
+  public DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId) {
+    return streamingWriterFactory.createWriter(partitionId, taskId, epochId);
+  }
+}
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarStreamingDataWriterFactory.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarStreamingDataWriterFactory.java
new file mode 100644
index 0000000000..fdec18fe2d
--- /dev/null
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarStreamingDataWriterFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.connector.write;
+
+import org.apache.spark.TaskContext;
+import org.apache.spark.sql.connector.write.DataWriter;
+import 
org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+import java.io.Serializable;
+
+/**
+ * A factory of {@link DataWriter}, which is responsible for creating and 
initializing the actual
+ * data writer at executor side.
+ *
+ * <p>Note that, the writer factory will be serialized and sent to executors, 
then the data writer
+ * will be created on executors and do the actual writing. So this interface 
must be serializable
+ * and {@link DataWriter} doesn't need to be.
+ *
+ * <p>A companion interface with Spark's row bases {@link 
StreamingDataWriterFactory}
+ */
+public interface ColumnarStreamingDataWriterFactory extends Serializable {
+
+  /**
+   * Returns a data writer to do the actual writing work. Note that, Spark 
will reuse the same data
+   * object instance when sending data to the data writer, for better 
performance. Data writers are
+   * responsible for defensive copies if necessary, e.g. copy the data before 
buffer it in a list.
+   *
+   * <p>If this method fails (by throwing an exception), the corresponding 
Spark write task would
+   * fail and get retried until hitting the maximum retry times.
+   *
+   * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
+   *     Usually Spark processes many RDD partitions at the same time, 
implementations should use
+   *     the partition id to distinguish writers for different partitions.
+   * @param taskId The task id returned by {@link 
TaskContext#taskAttemptId()}. Spark may run
+   *     multiple tasks for the same partition (due to speculation or task 
failures, for example).
+   * @param epochId A monotonically increasing id for streaming queries that 
are split in to
+   *     discrete periods of execution.
+   */
+  DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId, long 
epochId);
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 4f9354020d..8b32ebce2f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -164,6 +164,8 @@ trait BackendSettingsApi {
 
   def supportOverwritePartitionsDynamic(): Boolean = false
 
+  def supportWriteToDataSourceV2(): Boolean = false
+
   /** Whether the backend supports columnar shuffle with empty schema. */
   def supportEmptySchemaColumnarShuffle(): Boolean = true
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 6aee03ca49..ed2d549366 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -107,6 +107,8 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
   def enableOverwritePartitionsDynamic: Boolean =
     getConf(COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED)
 
+  def enableColumnarWriteToDataSourceV2: Boolean = 
getConf(COLUMNAR_WRITE_TO_DATASOURCE_V2_ENABLED)
+
   def enableColumnarShuffledHashJoin: Boolean = 
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
 
   def shuffledHashJoinOptimizeBuildSide: Boolean =
@@ -880,6 +882,12 @@ object GlutenConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(true)
 
+  val COLUMNAR_WRITE_TO_DATASOURCE_V2_ENABLED =
+    buildConf("spark.gluten.sql.columnar.writeToDataSourceV2")
+      .doc("Enable or disable columnar v2 command write to data source v2.")
+      .booleanConf
+      .createWithDefault(true)
+
   val COLUMNAR_PREFER_STREAMING_AGGREGATE =
     buildConf("spark.gluten.sql.columnar.preferStreamingAggregate")
       .doc(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
index 8c960e70dd..cb9470765b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
@@ -17,30 +17,39 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.connector.write.ColumnarBatchDataWriterFactory
+import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory, 
ColumnarMicroBatchWriterFactory, ColumnarStreamingDataWriterFactory}
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
 import org.apache.gluten.extension.columnar.transition.Convention.RowType
 
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.write.{BatchWrite, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, Write, 
WriterCommitMessage}
 import org.apache.spark.sql.datasources.v2.{DataWritingColumnarBatchSparkTask, 
DataWritingColumnarBatchSparkTaskResult, StreamWriterCommitProgressUtil, 
WritingColumnarBatchSparkTask}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.LongAccumulator
 
-trait ColumnarV2TableWriteExec extends V2ExistingTableWriteExec with 
ValidatablePlan {
+trait ColumnarV2TableWriteExec extends V2TableWriteExec with ValidatablePlan {
+
+  def refreshCache: () => Unit
+
+  def write: Write
+
+  def batchWrite: BatchWrite = write.toBatch
 
   def withNewQuery(newQuery: SparkPlan): SparkPlan = 
withNewChildInternal(newQuery)
 
-  protected def createFactory(schema: StructType): 
ColumnarBatchDataWriterFactory
+  protected def createBatchWriterFactory(schema: StructType): 
ColumnarBatchDataWriterFactory
+
+  protected def createStreamingWriterFactory(schema: StructType): 
ColumnarStreamingDataWriterFactory
 
   override protected def run(): Seq[InternalRow] = {
-    writeColumnarBatchWithV2(write.toBatch)
+    writeColumnarBatchWithV2(batchWrite)
     refreshCache()
     Nil
   }
@@ -77,7 +86,15 @@ trait ColumnarV2TableWriteExec extends 
V2ExistingTableWriteExec with Validatable
 
     // Avoid object not serializable issue.
     val writeMetrics: Map[String, SQLMetric] = customMetrics
-    val factory = createFactory(query.schema)
+    val factory = batchWrite match {
+      case m: MicroBatchWrite =>
+        val epochIdField = m.getClass.getDeclaredField("epochId")
+        epochIdField.setAccessible(true)
+        val epochId = epochIdField.getLong(m)
+        new ColumnarMicroBatchWriterFactory(epochId, 
createStreamingWriterFactory(query.schema))
+      case _ =>
+        createBatchWriterFactory(query.schema)
+    }
     try {
       sparkContext.runJob(
         rdd,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 14d4a7ffcf..caa6ba16ba 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, 
ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, 
ReplaceDataExec, WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.window.WindowExec
@@ -140,6 +140,7 @@ object Validators {
       case p: OverwriteByExpressionExec if 
!settings.supportOverwriteByExpression() => fail(p)
       case p: OverwritePartitionsDynamicExec if 
!settings.supportOverwritePartitionsDynamic() =>
         fail(p)
+      case p: WriteToDataSourceV2Exec if 
!settings.supportWriteToDataSourceV2() => fail(p)
       case _ => pass()
     }
   }
@@ -164,6 +165,7 @@ object Validators {
       case p: OverwriteByExpressionExec if 
!glutenConf.enableOverwriteByExpression => fail(p)
       case p: OverwritePartitionsDynamicExec if 
!glutenConf.enableOverwritePartitionsDynamic =>
         fail(p)
+      case p: WriteToDataSourceV2Exec if 
!glutenConf.enableColumnarWriteToDataSourceV2 => fail(p)
       case p @ (_: LocalLimitExec | _: GlobalLimitExec) if 
!glutenConf.enableColumnarLimit =>
         fail(p)
       case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to