This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c7cd7741bf [VL] Support Iceberg WriteToDataSourceV2 (#11533)
c7cd7741bf is described below
commit c7cd7741bf4443676f9cb10ffbd435de2182264d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Feb 5 15:30:38 2026 +0800
[VL] Support Iceberg WriteToDataSourceV2 (#11533)
What changes are proposed in this pull request?
Support WriteToDataSourceV2 for Iceberg, which is used by
WriteToMicroBatchDataSource
Introduce ColumnarStreamingDataWriterFactory companion with Spark's
StreamingDataWriterFactory
Introduce ColumnarMicroBatchWriterFactory companion with Spark's
MicroBatchWriterFactory
Offload Spark's WriteToDataSourceV2Exec to
VeloxIcebergWriteToDataSourceV2Exec
image
How was this patch tested?
Add case test("iceberg stream write to table")
Was this patch authored or co-authored using generative AI tooling?
No
---
.../connector/write/IcebergDataWriteFactory.scala | 13 +++-
.../execution/AbstractIcebergWriteExec.scala | 15 +++-
.../VeloxIcebergWriteToDataSourceV2Exec.scala | 84 ++++++++++++++++++++++
.../gluten/extension/OffloadIcebergWrite.scala | 19 +++--
.../execution/enhanced/VeloxIcebergSuite.scala | 38 +++++++++-
.../org/apache/spark/sql/gluten/TestUtils.scala | 10 ++-
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 +
docs/Configuration.md | 1 +
.../write/ColumnarMicroBatchWriterFactory.java | 42 +++++++++++
.../write/ColumnarStreamingDataWriterFactory.java | 55 ++++++++++++++
.../gluten/backendsapi/BackendSettingsApi.scala | 2 +
.../org/apache/gluten/config/GlutenConfig.scala | 8 +++
.../execution/ColumnarV2TableWriteExec.scala | 29 ++++++--
.../extension/columnar/validator/Validators.scala | 4 +-
14 files changed, 303 insertions(+), 19 deletions(-)
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
index a9b6ec04c2..731a19330b 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
@@ -44,7 +44,12 @@ case class IcebergDataWriteFactory(
sortOrder: SortOrder,
field: IcebergNestedField,
queryId: String)
- extends ColumnarBatchDataWriterFactory {
+ extends ColumnarBatchDataWriterFactory
+ with ColumnarStreamingDataWriterFactory {
+
+ override def createWriter(partitionId: Int, taskId: Long):
DataWriter[ColumnarBatch] = {
+ createWriter(partitionId, taskId, 0)
+ }
/**
* Returns a data writer to do the actual writing work. Note that, Spark
will reuse the same data
@@ -53,7 +58,10 @@ case class IcebergDataWriteFactory(
* <p> If this method fails (by throwing an exception), the corresponding
Spark write task would
* fail and get retried until hitting the maximum retry times.
*/
- override def createWriter(partitionId: Int, taskId: Long):
DataWriter[ColumnarBatch] = {
+ override def createWriter(
+ partitionId: Int,
+ taskId: Long,
+ epochId: Long): DataWriter[ColumnarBatch] = {
val fields = partitionSpec
.fields()
.stream()
@@ -64,7 +72,6 @@ case class IcebergDataWriteFactory(
.setSpecId(partitionSpec.specId())
.addAllFields(fields)
.build()
- val epochId = 0
val operationId = queryId + "-" + epochId
val (writerHandle, jniWrapper) =
getJniWrapper(
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
index fb1b25856c..12ed90bdd6 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.IcebergNestedFieldVisitor
-import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory,
IcebergDataWriteFactory}
+import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory,
ColumnarStreamingDataWriterFactory, IcebergDataWriteFactory}
import org.apache.spark.sql.types.StructType
@@ -26,7 +26,8 @@ import org.apache.iceberg.types.TypeUtil
abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
- override protected def createFactory(schema: StructType):
ColumnarBatchDataWriterFactory = {
+ // the writer factory works for both batch and streaming
+ private def createIcebergDataWriteFactory(schema: StructType):
IcebergDataWriteFactory = {
val writeSchema = IcebergWriteUtil.getWriteSchema(write)
val nestedField = TypeUtil.visit(writeSchema, new
IcebergNestedFieldVisitor)
IcebergDataWriteFactory(
@@ -40,4 +41,14 @@ abstract class AbstractIcebergWriteExec extends
IcebergWriteExec {
IcebergWriteUtil.getQueryId(write)
)
}
+
+ override protected def createBatchWriterFactory(
+ schema: StructType): ColumnarBatchDataWriterFactory = {
+ createIcebergDataWriteFactory(schema)
+ }
+
+ override protected def createStreamingWriterFactory(
+ schema: StructType): ColumnarStreamingDataWriterFactory = {
+ createIcebergDataWriteFactory(schema)
+ }
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergWriteToDataSourceV2Exec.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergWriteToDataSourceV2Exec.scala
new file mode 100644
index 0000000000..37dce8525c
--- /dev/null
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergWriteToDataSourceV2Exec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.sql.connector.metric.CustomMetric
+import org.apache.spark.sql.connector.write.BatchWrite
+import org.apache.spark.sql.connector.write.Write
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite
+
+import org.apache.iceberg.spark.source.IcebergWriteUtil.supportsWrite
+
+case class VeloxIcebergWriteToDataSourceV2Exec(
+ query: SparkPlan,
+ refreshCache: () => Unit,
+ write: Write,
+ override val batchWrite: BatchWrite,
+ writeMetrics: Seq[CustomMetric]
+) extends AbstractIcebergWriteExec {
+
+ override val customMetrics: Map[String, SQLMetric] = {
+ writeMetrics.map {
+ customMetric =>
+ customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext,
customMetric)
+ }.toMap ++
BackendsApiManager.getMetricsApiInstance.genBatchWriteMetrics(sparkContext)
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
IcebergWriteExec =
+ copy(query = newChild)
+}
+
+object VeloxIcebergWriteToDataSourceV2Exec {
+
+ private def extractOuterWrite(batchWrite: BatchWrite): Option[Write] = {
+ batchWrite match {
+ case microBatchWrite: MicroBatchWrite =>
+ try {
+ val streamWrite = microBatchWrite.writeSupport
+ val outerClassField = streamWrite.getClass.getDeclaredField("this$0")
+ outerClassField.setAccessible(true)
+ outerClassField.get(streamWrite) match {
+ case write: Write => Some(write)
+ case _ => None
+ }
+ } catch {
+ case _: Throwable => None
+ }
+ case _ => None
+ }
+ }
+
+ def apply(original: WriteToDataSourceV2Exec):
Option[VeloxIcebergWriteToDataSourceV2Exec] = {
+ extractOuterWrite(original.batchWrite)
+ .filter(supportsWrite)
+ .map {
+ write =>
+ VeloxIcebergWriteToDataSourceV2Exec(
+ original.query,
+ original.refreshCache,
+ write,
+ original.batchWrite,
+ original.writeMetrics
+ )
+ }
+ }
+}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
index 1b00c1a788..5d6e09cfc9 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{VeloxIcebergAppendDataExec,
VeloxIcebergOverwriteByExpressionExec,
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution._
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -25,7 +25,7 @@ import
org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2._
import org.apache.iceberg.spark.source.IcebergWriteUtil.supportsWrite
@@ -61,6 +61,14 @@ case class OffloadIcebergOverwritePartitionsDynamic()
extends OffloadSingleNode
}
}
+case class OffloadIcebergWriteToDataSourceV2() extends OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = plan match {
+ case r: WriteToDataSourceV2Exec =>
+ VeloxIcebergWriteToDataSourceV2Exec(r).getOrElse(r)
+ case other => other
+ }
+}
+
object OffloadIcebergWrite {
def inject(injector: Injector): Unit = {
// Inject legacy rule.
@@ -70,7 +78,9 @@ object OffloadIcebergWrite {
OffloadIcebergAppend(),
OffloadIcebergReplaceData(),
OffloadIcebergOverwrite(),
- OffloadIcebergOverwritePartitionsDynamic())
+ OffloadIcebergOverwritePartitionsDynamic(),
+ OffloadIcebergWriteToDataSourceV2()
+ )
HeuristicTransform.Simple(
Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
offload
@@ -81,7 +91,8 @@ object OffloadIcebergWrite {
RasOffload.from[AppendDataExec](OffloadIcebergAppend()),
RasOffload.from[ReplaceDataExec](OffloadIcebergReplaceData()),
RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite()),
-
RasOffload.from[OverwritePartitionsDynamicExec](OffloadIcebergOverwritePartitionsDynamic())
+
RasOffload.from[OverwritePartitionsDynamicExec](OffloadIcebergOverwritePartitionsDynamic()),
+
RasOffload.from[WriteToDataSourceV2Exec](OffloadIcebergWriteToDataSourceV2())
)
offloads.foreach(
offload =>
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index a3a2bc2f2f..c3d3c8edc6 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -16,16 +16,19 @@
*/
package org.apache.gluten.execution.enhanced
-import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite,
VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec,
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution._
import org.apache.gluten.tags.EnhancedFeaturesTest
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.gluten.TestUtils
@EnhancedFeaturesTest
class VeloxIcebergSuite extends IcebergSuite {
+ import testImplicits._
+
test("iceberg insert") {
withTable("iceberg_tb2") {
spark.sql("""
@@ -347,4 +350,37 @@ class VeloxIcebergSuite extends IcebergSuite {
s"File name does not match expected format: $fileName")
}
}
+
+ test("iceberg stream write to table") {
+ withTable("iceberg_tbl") {
+ withTempDir {
+ checkpointDir =>
+ spark.sql("CREATE TABLE iceberg_tbl (a INT, b STRING) USING iceberg")
+
TestUtils.checkExecutedPlanContains[VeloxIcebergWriteToDataSourceV2Exec](spark)
{
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("iceberg")
+ .toTable("iceberg_tbl")
+
+ val query = () => spark.sql("SELECT * FROM iceberg_tbl ORDER BY a")
+ try {
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, "a")))
+
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, "a"), Row(2, "b")))
+ } finally {
+ stream.stop()
+ }
+ }
+
+ }
+ }
+ }
}
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
index 587c064b9c..dbd234e248 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/spark/sql/gluten/TestUtils.scala
@@ -25,18 +25,24 @@ import scala.reflect.ClassTag
object TestUtils {
def checkExecutedPlanContains[T: ClassTag](spark: SparkSession, sqlStr:
String): Unit = {
+ checkExecutedPlanContains(spark) {
+ spark.sql(sqlStr)
+ }
+ }
+
+ def checkExecutedPlanContains[T: ClassTag](spark: SparkSession)(action: =>
Unit): Unit = {
var found = false
val queryListener = new QueryExecutionListener {
override def onFailure(f: String, qe: QueryExecution, e: Exception):
Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration:
Long): Unit = {
if (!found) {
- found =
qe.executedPlan.find(implicitly[ClassTag[T]].runtimeClass.isInstance(_)).isDefined
+ found =
qe.executedPlan.exists(implicitly[ClassTag[T]].runtimeClass.isInstance(_))
}
}
}
try {
spark.listenerManager.register(queryListener)
- spark.sql(sqlStr)
+ action
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(found)
} finally {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 0698c72426..8a1a343087 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -590,6 +590,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportOverwritePartitionsDynamic(): Boolean =
enableEnhancedFeatures()
+ override def supportWriteToDataSourceV2(): Boolean = enableEnhancedFeatures()
+
/** Velox does not support columnar shuffle with empty schema. */
override def supportEmptySchemaColumnarShuffle(): Boolean = false
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 8ef5060351..1372d98243 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -111,6 +111,7 @@ nav_order: 15
| spark.gluten.sql.columnar.wholeStage.fallback.threshold | -1
| The threshold for whether whole stage will fall back in AQE
supported case by counting the number of ColumnarToRow & vanilla leaf node.
|
| spark.gluten.sql.columnar.window | true
| Enable or disable columnar window.
|
| spark.gluten.sql.columnar.window.group.limit | true
| Enable or disable columnar window group limit.
|
+| spark.gluten.sql.columnar.writeToDataSourceV2 | true
| Enable or disable columnar v2 command write to data source v2.
|
| spark.gluten.sql.columnarSampleEnabled | false
| Disable or enable columnar sample.
|
| spark.gluten.sql.columnarToRowMemoryThreshold | 64MB
|
| spark.gluten.sql.countDistinctWithoutExpand | false
| Convert Count Distinct to a UDAF called count_distinct to prevent
SparkPlanner converting it to Expand+Count. WARNING: When enabled, count
distinct queries will fail to fallback!!!
|
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarMicroBatchWriterFactory.java
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarMicroBatchWriterFactory.java
new file mode 100644
index 0000000000..b2874e668b
--- /dev/null
+++
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarMicroBatchWriterFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.connector.write;
+
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * ColumnarMicroBatchWriterFactory is used to create ColumnarMicroBatchWriter.
+ *
+ * <p>It is used in micro-batch mode.
+ */
+public class ColumnarMicroBatchWriterFactory implements
ColumnarBatchDataWriterFactory {
+
+ private final long epochId;
+ private final ColumnarStreamingDataWriterFactory streamingWriterFactory;
+
+ public ColumnarMicroBatchWriterFactory(
+ long epochId, ColumnarStreamingDataWriterFactory streamingWriterFactory)
{
+ this.epochId = epochId;
+ this.streamingWriterFactory = streamingWriterFactory;
+ }
+
+ @Override
+ public DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId) {
+ return streamingWriterFactory.createWriter(partitionId, taskId, epochId);
+ }
+}
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarStreamingDataWriterFactory.java
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarStreamingDataWriterFactory.java
new file mode 100644
index 0000000000..fdec18fe2d
--- /dev/null
+++
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarStreamingDataWriterFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.connector.write;
+
+import org.apache.spark.TaskContext;
+import org.apache.spark.sql.connector.write.DataWriter;
+import
org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+import java.io.Serializable;
+
+/**
+ * A factory of {@link DataWriter}, which is responsible for creating and
initializing the actual
+ * data writer at executor side.
+ *
+ * <p>Note that, the writer factory will be serialized and sent to executors,
then the data writer
+ * will be created on executors and do the actual writing. So this interface
must be serializable
+ * and {@link DataWriter} doesn't need to be.
+ *
+ * <p>A companion interface with Spark's row bases {@link
StreamingDataWriterFactory}
+ */
+public interface ColumnarStreamingDataWriterFactory extends Serializable {
+
+ /**
+ * Returns a data writer to do the actual writing work. Note that, Spark
will reuse the same data
+ * object instance when sending data to the data writer, for better
performance. Data writers are
+ * responsible for defensive copies if necessary, e.g. copy the data before
buffer it in a list.
+ *
+ * <p>If this method fails (by throwing an exception), the corresponding
Spark write task would
+ * fail and get retried until hitting the maximum retry times.
+ *
+ * @param partitionId A unique id of the RDD partition that the returned
writer will process.
+ * Usually Spark processes many RDD partitions at the same time,
implementations should use
+ * the partition id to distinguish writers for different partitions.
+ * @param taskId The task id returned by {@link
TaskContext#taskAttemptId()}. Spark may run
+ * multiple tasks for the same partition (due to speculation or task
failures, for example).
+ * @param epochId A monotonically increasing id for streaming queries that
are split in to
+ * discrete periods of execution.
+ */
+ DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId, long
epochId);
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 4f9354020d..8b32ebce2f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -164,6 +164,8 @@ trait BackendSettingsApi {
def supportOverwritePartitionsDynamic(): Boolean = false
+ def supportWriteToDataSourceV2(): Boolean = false
+
/** Whether the backend supports columnar shuffle with empty schema. */
def supportEmptySchemaColumnarShuffle(): Boolean = true
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 6aee03ca49..ed2d549366 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -107,6 +107,8 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def enableOverwritePartitionsDynamic: Boolean =
getConf(COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED)
+ def enableColumnarWriteToDataSourceV2: Boolean =
getConf(COLUMNAR_WRITE_TO_DATASOURCE_V2_ENABLED)
+
def enableColumnarShuffledHashJoin: Boolean =
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
def shuffledHashJoinOptimizeBuildSide: Boolean =
@@ -880,6 +882,12 @@ object GlutenConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(true)
+ val COLUMNAR_WRITE_TO_DATASOURCE_V2_ENABLED =
+ buildConf("spark.gluten.sql.columnar.writeToDataSourceV2")
+ .doc("Enable or disable columnar v2 command write to data source v2.")
+ .booleanConf
+ .createWithDefault(true)
+
val COLUMNAR_PREFER_STREAMING_AGGREGATE =
buildConf("spark.gluten.sql.columnar.preferStreamingAggregate")
.doc(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
index 8c960e70dd..cb9470765b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarV2TableWriteExec.scala
@@ -17,30 +17,39 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.connector.write.ColumnarBatchDataWriterFactory
+import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory,
ColumnarMicroBatchWriterFactory, ColumnarStreamingDataWriterFactory}
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
import org.apache.gluten.extension.columnar.transition.Convention.RowType
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.write.{BatchWrite, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, Write,
WriterCommitMessage}
import org.apache.spark.sql.datasources.v2.{DataWritingColumnarBatchSparkTask,
DataWritingColumnarBatchSparkTaskResult, StreamWriterCommitProgressUtil,
WritingColumnarBatchSparkTask}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.LongAccumulator
-trait ColumnarV2TableWriteExec extends V2ExistingTableWriteExec with
ValidatablePlan {
+trait ColumnarV2TableWriteExec extends V2TableWriteExec with ValidatablePlan {
+
+ def refreshCache: () => Unit
+
+ def write: Write
+
+ def batchWrite: BatchWrite = write.toBatch
def withNewQuery(newQuery: SparkPlan): SparkPlan =
withNewChildInternal(newQuery)
- protected def createFactory(schema: StructType):
ColumnarBatchDataWriterFactory
+ protected def createBatchWriterFactory(schema: StructType):
ColumnarBatchDataWriterFactory
+
+ protected def createStreamingWriterFactory(schema: StructType):
ColumnarStreamingDataWriterFactory
override protected def run(): Seq[InternalRow] = {
- writeColumnarBatchWithV2(write.toBatch)
+ writeColumnarBatchWithV2(batchWrite)
refreshCache()
Nil
}
@@ -77,7 +86,15 @@ trait ColumnarV2TableWriteExec extends
V2ExistingTableWriteExec with Validatable
// Avoid object not serializable issue.
val writeMetrics: Map[String, SQLMetric] = customMetrics
- val factory = createFactory(query.schema)
+ val factory = batchWrite match {
+ case m: MicroBatchWrite =>
+ val epochIdField = m.getClass.getDeclaredField("epochId")
+ epochIdField.setAccessible(true)
+ val epochId = epochIdField.getLong(m)
+ new ColumnarMicroBatchWriterFactory(epochId,
createStreamingWriterFactory(query.schema))
+ case _ =>
+ createBatchWriterFactory(query.schema)
+ }
try {
sparkContext.runJob(
rdd,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 14d4a7ffcf..caa6ba16ba 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec,
ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec,
ReplaceDataExec, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.window.WindowExec
@@ -140,6 +140,7 @@ object Validators {
case p: OverwriteByExpressionExec if
!settings.supportOverwriteByExpression() => fail(p)
case p: OverwritePartitionsDynamicExec if
!settings.supportOverwritePartitionsDynamic() =>
fail(p)
+ case p: WriteToDataSourceV2Exec if
!settings.supportWriteToDataSourceV2() => fail(p)
case _ => pass()
}
}
@@ -164,6 +165,7 @@ object Validators {
case p: OverwriteByExpressionExec if
!glutenConf.enableOverwriteByExpression => fail(p)
case p: OverwritePartitionsDynamicExec if
!glutenConf.enableOverwritePartitionsDynamic =>
fail(p)
+ case p: WriteToDataSourceV2Exec if
!glutenConf.enableColumnarWriteToDataSourceV2 => fail(p)
case p @ (_: LocalLimitExec | _: GlobalLimitExec) if
!glutenConf.enableColumnarLimit =>
fail(p)
case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]