This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d2c0630143 [VL] Make the filename written by Iceberg Native consistent 
with in Java (#11435)
d2c0630143 is described below

commit d2c0630143f067ff8a0df764e5f85eaf76d2e4bc
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 30 21:14:55 2026 +0800

    [VL] Make the filename written by Iceberg Native consistent with in Java 
(#11435)
---
 .../connector/write/IcebergDataWriteFactory.scala  | 24 +++++-
 .../execution/AbstractIcebergWriteExec.scala       |  3 +-
 .../gluten/execution/IcebergWriteJniWrapper.java   |  3 +
 .../execution/enhanced/VeloxIcebergSuite.scala     | 21 ++++++
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  2 +-
 .../sql/execution/VeloxParquetWriteSuite.scala     | 15 +---
 cpp/velox/compute/VeloxRuntime.cc                  |  5 +-
 cpp/velox/compute/VeloxRuntime.h                   |  3 +
 cpp/velox/compute/iceberg/IcebergWriter.cc         | 88 +++++++++++++++++++++-
 cpp/velox/compute/iceberg/IcebergWriter.h          |  6 ++
 cpp/velox/jni/VeloxJniWrapper.cc                   |  6 ++
 cpp/velox/tests/iceberg/IcebergWriteTest.cc        |  3 +
 .../iceberg/spark/source/IcebergWriteUtil.scala    | 10 +++
 .../write/ColumnarBatchDataWriterFactory.java      |  9 ++-
 .../v2/ColumnarWriteToDataSourceV2Exec.scala       |  2 +-
 15 files changed, 176 insertions(+), 24 deletions(-)

diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
index 7004fa57ec..a9b6ec04c2 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
@@ -42,7 +42,8 @@ case class IcebergDataWriteFactory(
     codec: String,
     partitionSpec: PartitionSpec,
     sortOrder: SortOrder,
-    field: IcebergNestedField)
+    field: IcebergNestedField,
+    queryId: String)
   extends ColumnarBatchDataWriterFactory {
 
   /**
@@ -52,7 +53,7 @@ case class IcebergDataWriteFactory(
    * <p> If this method fails (by throwing an exception), the corresponding 
Spark write task would
    * fail and get retried until hitting the maximum retry times.
    */
-  override def createWriter(): DataWriter[ColumnarBatch] = {
+  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[ColumnarBatch] = {
     val fields = partitionSpec
       .fields()
       .stream()
@@ -63,8 +64,19 @@ case class IcebergDataWriteFactory(
       .setSpecId(partitionSpec.specId())
       .addAllFields(fields)
       .build()
+    val epochId = 0
+    val operationId = queryId + "-" + epochId
     val (writerHandle, jniWrapper) =
-      getJniWrapper(schema, format, directory, codec, specProto, field)
+      getJniWrapper(
+        schema,
+        format,
+        directory,
+        codec,
+        partitionId,
+        taskId,
+        operationId,
+        specProto,
+        field)
     IcebergColumnarBatchDataWriter(writerHandle, jniWrapper, format, 
partitionSpec, sortOrder)
   }
 
@@ -73,6 +85,9 @@ case class IcebergDataWriteFactory(
       format: Int,
       directory: String,
       codec: String,
+      partitionId: Int,
+      taskId: Long,
+      operationId: String,
       partitionSpec: IcebergPartitionSpec,
       field: IcebergNestedField): (Long, IcebergWriteJniWrapper) = {
     val schema = SparkArrowUtil.toArrowSchema(localSchema, 
SQLConf.get.sessionLocalTimeZone)
@@ -87,6 +102,9 @@ case class IcebergDataWriteFactory(
         format,
         directory,
         codec,
+        partitionId,
+        taskId,
+        operationId,
         partitionSpec.toByteArray,
         field.toByteArray)
     cSchema.close()
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
index 3ac518cf76..fb1b25856c 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
@@ -36,7 +36,8 @@ abstract class AbstractIcebergWriteExec extends 
IcebergWriteExec {
       getCodec,
       getPartitionSpec,
       IcebergWriteUtil.getSortOrder(write),
-      nestedField
+      nestedField,
+      IcebergWriteUtil.getQueryId(write)
     )
   }
 }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
index 54ae50e578..60bab59786 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
@@ -31,6 +31,9 @@ public class IcebergWriteJniWrapper implements RuntimeAware {
   public native long init(long cSchema, int format,
                           String directory,
                           String codec,
+                          int partitionId,
+                          long taskId,
+                          String operationId,
                           byte[] partitionSpec,
                           byte[] field);
 
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 437828368f..a3a2bc2f2f 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -326,4 +326,25 @@ class VeloxIcebergSuite extends IcebergSuite {
       assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 0)
     }
   }
+
+  test("iceberg write file name") {
+    withTable("iceberg_tbl") {
+      spark.sql("create table if not exists iceberg_tbl (id int) using 
iceberg")
+      spark.sql("insert into iceberg_tbl values 1")
+
+      val filePath = spark
+        .sql("select * from default.iceberg_tbl.files")
+        .select("file_path")
+        .collect()
+        .apply(0)
+        .getString(0)
+
+      val fileName = filePath.split('/').last
+      // Expected format: 
{partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.parquet
+      // Example: 00000-0-query_id-0-00001.parquet
+      assert(
+        fileName.matches("\\d{5}-\\d+-.*-\\d{5}\\.parquet"),
+        s"File name does not match expected format: $fileName")
+    }
+  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 945e245969..0698c72426 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -308,7 +308,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
       val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
       val compressionCodec = 
WriteFilesExecTransformer.getCompressionCodec(options)
       if (unSupportedCompressions.contains(compressionCodec)) {
-        Some("Brotli, lzo, lz4raw and lz4_raw compression codec is unsupported 
in Velox backend.")
+        Some(s"$compressionCodec compression codec is unsupported in Velox 
backend.")
       } else {
         None
       }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 6c61afb962..1b0d0647dd 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -32,25 +32,14 @@ class VeloxParquetWriteSuite extends 
VeloxWholeStageTransformerSuite with WriteU
   override protected val fileFormat: String = "parquet"
 
   // The parquet compression codec extensions
-  private val parquetCompressionCodecExtensions = if (isSparkVersionGE("3.5")) 
{
+  private val parquetCompressionCodecExtensions = {
     Map(
       "none" -> "",
       "uncompressed" -> "",
       "snappy" -> ".snappy",
       "gzip" -> ".gz",
       "lzo" -> ".lzo",
-      "lz4" -> ".lz4hadoop", // Specific extension for version 3.5
-      "brotli" -> ".br",
-      "zstd" -> ".zstd"
-    )
-  } else {
-    Map(
-      "none" -> "",
-      "uncompressed" -> "",
-      "snappy" -> ".snappy",
-      "gzip" -> ".gz",
-      "lzo" -> ".lzo",
-      "lz4" -> ".lz4",
+      "lz4" -> (if (isSparkVersionGE("3.5")) ".lz4hadoop" else ".lz4"),
       "brotli" -> ".br",
       "zstd" -> ".zstd"
     )
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 5aaf83d7f4..af0fde37ce 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -226,13 +226,16 @@ std::shared_ptr<IcebergWriter> 
VeloxRuntime::createIcebergWriter(
     int32_t format,
     const std::string& outputDirectory,
     facebook::velox::common::CompressionKind compressionKind,
+    int32_t partitionId,
+    int64_t taskId,
+    const std::string& operationId,
     std::shared_ptr<const 
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
     const gluten::IcebergNestedField& protoField,
     const std::unordered_map<std::string, std::string>& sparkConfs) {
   auto veloxPool = memoryManager()->getLeafMemoryPool();
   auto connectorPool = memoryManager()->getAggregateMemoryPool();
   return std::make_shared<IcebergWriter>(
-      rowType, format, outputDirectory, compressionKind, spec, protoField, 
sparkConfs, veloxPool, connectorPool);
+      rowType, format, outputDirectory, compressionKind, partitionId, taskId, 
operationId, spec, protoField, sparkConfs, veloxPool, connectorPool);
 }
 #endif
 
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 59eb43028f..b39733e839 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -74,6 +74,9 @@ class VeloxRuntime final : public Runtime {
       int32_t format,
       const std::string& outputDirectory,
       facebook::velox::common::CompressionKind compressionKind,
+      int32_t partitionId,
+      int64_t taskId,
+      const std::string& operationId,
       std::shared_ptr<const 
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
       const gluten::IcebergNestedField& protoField,
       const std::unordered_map<std::string, std::string>& sparkConfs);
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc 
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 2e9aa86605..1d6e7fa344 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -30,6 +30,77 @@ using namespace facebook::velox::connector::hive;
 using namespace facebook::velox::connector::hive::iceberg;
 namespace {
 
+// Custom Iceberg file name generator for Gluten
+class GlutenIcebergFileNameGenerator : public 
connector::hive::FileNameGenerator {
+ public:
+  GlutenIcebergFileNameGenerator(
+      int32_t partitionId,
+      int64_t taskId,
+      const std::string& operationId,
+      dwio::common::FileFormat fileFormat)
+      : partitionId_(partitionId),
+        taskId_(taskId),
+        operationId_(operationId),
+        fileFormat_(fileFormat),
+        fileCount_(0) {}
+
+  std::pair<std::string, std::string> gen(
+      std::optional<uint32_t> bucketId,
+      const std::shared_ptr<const connector::hive::HiveInsertTableHandle> 
insertTableHandle,
+      const connector::ConnectorQueryCtx& connectorQueryCtx,
+      bool commitRequired) const override {
+    auto targetFileName = 
insertTableHandle->locationHandle()->targetFileName();
+    if (targetFileName.empty()) {
+      // Generate file name following Iceberg format:
+      // {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}{suffix}
+      fileCount_++;
+
+      std::string fileExtension;
+      switch (fileFormat_) {
+        case dwio::common::FileFormat::PARQUET:
+          fileExtension = ".parquet";
+          break;
+        case dwio::common::FileFormat::ORC:
+          fileExtension = ".orc";
+          break;
+        default:
+          fileExtension = ".parquet";
+      }
+
+      char buffer[256];
+      snprintf(
+          buffer,
+          sizeof(buffer),
+          "%05d-%" PRId64 "-%s-%05d%s",
+          partitionId_,
+          taskId_,
+          operationId_.c_str(),
+          fileCount_,
+          fileExtension.c_str());
+      targetFileName = std::string(buffer);
+    }
+
+    return {targetFileName, targetFileName};
+  }
+
+  folly::dynamic serialize() const override {
+    VELOX_UNREACHABLE("Unexpected code path, implement serialize() first.");
+  }
+
+  std::string toString() const override {
+    return fmt::format(
+        "GlutenIcebergFileNameGenerator(partitionId={}, taskId={}, 
operationId={})",
+        partitionId_, taskId_, operationId_);
+  }
+
+ private:
+  int32_t partitionId_;
+  int64_t taskId_;
+  std::string operationId_;
+  dwio::common::FileFormat fileFormat_;
+  mutable int32_t fileCount_;
+};
+
 iceberg::IcebergNestedField convertToIcebergNestedField(const 
gluten::IcebergNestedField& protoField) {
   IcebergNestedField result;
   result.id = protoField.id();
@@ -48,6 +119,9 @@ std::shared_ptr<IcebergInsertTableHandle> 
createIcebergInsertTableHandle(
     const std::string& outputDirectoryPath,
     dwio::common::FileFormat fileFormat,
     facebook::velox::common::CompressionKind compressionKind,
+    int32_t partitionId,
+    int64_t taskId,
+    const std::string& operationId,
     std::shared_ptr<const IcebergPartitionSpec> spec,
     const iceberg::IcebergNestedField& nestedField,
     facebook::velox::memory::MemoryPool* pool) {
@@ -80,12 +154,17 @@ std::shared_ptr<IcebergInsertTableHandle> 
createIcebergInsertTableHandle(
               nestedField.children[i]));
     }
   }
+  
+  auto fileNameGenerator = std::make_shared<const 
GlutenIcebergFileNameGenerator>(
+      partitionId, taskId, operationId, fileFormat);
+  
   std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
       std::make_shared<connector::hive::LocationHandle>(
           outputDirectoryPath, outputDirectoryPath, 
connector::hive::LocationHandle::TableType::kExisting);
   const std::vector<IcebergSortingColumn> sortedBy;
+  const std::unordered_map<std::string, std::string> serdeParameters;
   return std::make_shared<connector::hive::iceberg::IcebergInsertTableHandle>(
-      columnHandles, locationHandle, spec, pool, fileFormat, sortedBy, 
compressionKind);
+      columnHandles, locationHandle, spec, pool, fileFormat, sortedBy, 
compressionKind, serdeParameters, fileNameGenerator);
 }
 
 } // namespace
@@ -96,12 +175,15 @@ IcebergWriter::IcebergWriter(
     int32_t format,
     const std::string& outputDirectory,
     facebook::velox::common::CompressionKind compressionKind,
+    int32_t partitionId,
+    int64_t taskId,
+    const std::string& operationId,
     std::shared_ptr<const iceberg::IcebergPartitionSpec> spec,
     const gluten::IcebergNestedField& field,
     const std::unordered_map<std::string, std::string>& sparkConfs,
     std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
     std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool)
-    : rowType_(rowType), field_(convertToIcebergNestedField(field)), 
pool_(memoryPool), connectorPool_(connectorPool), 
createTimeNs_(getCurrentTimeNano()) {
+    : rowType_(rowType), field_(convertToIcebergNestedField(field)), 
partitionId_(partitionId), taskId_(taskId), operationId_(operationId), 
pool_(memoryPool), connectorPool_(connectorPool), 
createTimeNs_(getCurrentTimeNano()) {
   auto veloxCfg =
       
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs));
   connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
@@ -123,7 +205,7 @@ IcebergWriter::IcebergWriter(
   dataSink_ = std::make_unique<IcebergDataSink>(
       rowType_,
       createIcebergInsertTableHandle(
-          rowType_, outputDirectory, icebergFormatToVelox(format), 
compressionKind, spec, field_, pool_.get()),
+          rowType_, outputDirectory, icebergFormatToVelox(format), 
compressionKind, partitionId_, taskId_, operationId_, spec, field_, 
pool_.get()),
       connectorQueryCtx_.get(),
       facebook::velox::connector::CommitStrategy::kNoCommit,
       connectorConfig_);
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.h 
b/cpp/velox/compute/iceberg/IcebergWriter.h
index f587195070..2fa13dcd69 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.h
+++ b/cpp/velox/compute/iceberg/IcebergWriter.h
@@ -43,6 +43,9 @@ class IcebergWriter {
       int32_t format,
       const std::string& outputDirectory,
       facebook::velox::common::CompressionKind compressionKind,
+      int32_t partitionId,
+      int64_t taskId,
+      const std::string& operationId,
       std::shared_ptr<const 
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
       const gluten::IcebergNestedField& field,
       const std::unordered_map<std::string, std::string>& sparkConfs,
@@ -58,6 +61,9 @@ class IcebergWriter {
  private:
   facebook::velox::RowTypePtr rowType_;
   const facebook::velox::connector::hive::iceberg::IcebergNestedField field_;
+  int32_t partitionId_;
+  int64_t taskId_;
+  std::string operationId_;
   std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
   std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool_;
   std::shared_ptr<facebook::velox::connector::hive::HiveConfig> 
connectorConfig_;
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 5633084c04..74adb1dff5 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -842,6 +842,9 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
     jint format,
     jstring directory,
     jstring codecJstr,
+    jint partitionId,
+    jlong taskId,
+    jstring operationId,
     jbyteArray partition,
     jbyteArray fieldBytes) {
   JNI_METHOD_START
@@ -863,6 +866,9 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
       format,
       jStringToCString(env, directory),
       facebook::velox::common::stringToCompressionKind(jStringToCString(env, 
codecJstr)),
+      partitionId,
+      taskId,
+      jStringToCString(env, operationId),
       spec,
       protoField,
       sparkConf));
diff --git a/cpp/velox/tests/iceberg/IcebergWriteTest.cc 
b/cpp/velox/tests/iceberg/IcebergWriteTest.cc
index 574d749370..740fa66f3f 100644
--- a/cpp/velox/tests/iceberg/IcebergWriteTest.cc
+++ b/cpp/velox/tests/iceberg/IcebergWriteTest.cc
@@ -58,6 +58,9 @@ TEST_F(VeloxIcebergWriteTest, write) {
       1,
       tmpPath + "/iceberg_write_test_table",
       common::CompressionKind::CompressionKind_ZSTD,
+      0, // partitionId
+      0, // taskId
+      folly::to<std::string>(folly::Random::rand64()), // operationId
       partitionSpec,
       root,
       std::unordered_map<std::string, std::string>(),
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
index c7c4c4e672..3f7ab278d0 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
@@ -57,6 +57,12 @@ object IcebergWriteUtil {
     field
   }
 
+  private lazy val queryIdField = {
+    val field = classOf[SparkWrite].getDeclaredField("queryId")
+    field.setAccessible(true)
+    field
+  }
+
   def supportsWrite(write: Write): Boolean = {
     write.isInstanceOf[SparkWrite]
   }
@@ -98,6 +104,10 @@ object IcebergWriteUtil {
     fileFormatField.get(write).asInstanceOf[FileFormat]
   }
 
+  def getQueryId(write: Write): String = {
+    queryIdField.get(write).asInstanceOf[String]
+  }
+
   def getDirectory(write: Write): String = {
     val loc = getTable(write).locationProvider().newDataLocation("")
     loc.substring(0, loc.length - 1)
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
index 4254c5baff..676be92e6a 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.connector.write;
 
+import org.apache.spark.TaskContext;
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.connector.write.DataWriter;
 import org.apache.spark.sql.connector.write.DataWriterFactory;
@@ -43,6 +44,12 @@ public interface ColumnarBatchDataWriterFactory extends 
Serializable {
    *
    * <p>If this method fails (by throwing an exception), the corresponding 
Spark write task would
    * fail and get retried until hitting the maximum retry times.
+   *
+   * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
+   *     Usually Spark processes many RDD partitions at the same time, 
implementations should use
+   *     the partition id to distinguish writers for different partitions.
+   * @param taskId The task id returned by {@link 
TaskContext#taskAttemptId()}. Spark may run
+   *     multiple tasks for the same partition (due to speculation or task 
failures, for example).
    */
-  DataWriter<ColumnarBatch> createWriter();
+  DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId);
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
index ef432aa119..b0d7049955 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
@@ -46,7 +46,7 @@ trait WritingColumnarBatchSparkTask[W <: 
DataWriter[ColumnarBatch]]
     val partId = context.partitionId()
     val taskId = context.taskAttemptId()
     val attemptId = context.attemptNumber()
-    val dataWriter = factory.createWriter().asInstanceOf[W]
+    val dataWriter = factory.createWriter(partId, taskId).asInstanceOf[W]
 
     var count = 0
     // write the data and commit this writer.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to