This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d2c0630143 [VL] Make the filename written by Iceberg Native consistent
with in Java (#11435)
d2c0630143 is described below
commit d2c0630143f067ff8a0df764e5f85eaf76d2e4bc
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 30 21:14:55 2026 +0800
[VL] Make the filename written by Iceberg Native consistent with in Java
(#11435)
---
.../connector/write/IcebergDataWriteFactory.scala | 24 +++++-
.../execution/AbstractIcebergWriteExec.scala | 3 +-
.../gluten/execution/IcebergWriteJniWrapper.java | 3 +
.../execution/enhanced/VeloxIcebergSuite.scala | 21 ++++++
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 +-
.../sql/execution/VeloxParquetWriteSuite.scala | 15 +---
cpp/velox/compute/VeloxRuntime.cc | 5 +-
cpp/velox/compute/VeloxRuntime.h | 3 +
cpp/velox/compute/iceberg/IcebergWriter.cc | 88 +++++++++++++++++++++-
cpp/velox/compute/iceberg/IcebergWriter.h | 6 ++
cpp/velox/jni/VeloxJniWrapper.cc | 6 ++
cpp/velox/tests/iceberg/IcebergWriteTest.cc | 3 +
.../iceberg/spark/source/IcebergWriteUtil.scala | 10 +++
.../write/ColumnarBatchDataWriterFactory.java | 9 ++-
.../v2/ColumnarWriteToDataSourceV2Exec.scala | 2 +-
15 files changed, 176 insertions(+), 24 deletions(-)
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
index 7004fa57ec..a9b6ec04c2 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
@@ -42,7 +42,8 @@ case class IcebergDataWriteFactory(
codec: String,
partitionSpec: PartitionSpec,
sortOrder: SortOrder,
- field: IcebergNestedField)
+ field: IcebergNestedField,
+ queryId: String)
extends ColumnarBatchDataWriterFactory {
/**
@@ -52,7 +53,7 @@ case class IcebergDataWriteFactory(
* <p> If this method fails (by throwing an exception), the corresponding
Spark write task would
* fail and get retried until hitting the maximum retry times.
*/
- override def createWriter(): DataWriter[ColumnarBatch] = {
+ override def createWriter(partitionId: Int, taskId: Long):
DataWriter[ColumnarBatch] = {
val fields = partitionSpec
.fields()
.stream()
@@ -63,8 +64,19 @@ case class IcebergDataWriteFactory(
.setSpecId(partitionSpec.specId())
.addAllFields(fields)
.build()
+ val epochId = 0
+ val operationId = queryId + "-" + epochId
val (writerHandle, jniWrapper) =
- getJniWrapper(schema, format, directory, codec, specProto, field)
+ getJniWrapper(
+ schema,
+ format,
+ directory,
+ codec,
+ partitionId,
+ taskId,
+ operationId,
+ specProto,
+ field)
IcebergColumnarBatchDataWriter(writerHandle, jniWrapper, format,
partitionSpec, sortOrder)
}
@@ -73,6 +85,9 @@ case class IcebergDataWriteFactory(
format: Int,
directory: String,
codec: String,
+ partitionId: Int,
+ taskId: Long,
+ operationId: String,
partitionSpec: IcebergPartitionSpec,
field: IcebergNestedField): (Long, IcebergWriteJniWrapper) = {
val schema = SparkArrowUtil.toArrowSchema(localSchema,
SQLConf.get.sessionLocalTimeZone)
@@ -87,6 +102,9 @@ case class IcebergDataWriteFactory(
format,
directory,
codec,
+ partitionId,
+ taskId,
+ operationId,
partitionSpec.toByteArray,
field.toByteArray)
cSchema.close()
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
index 3ac518cf76..fb1b25856c 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala
@@ -36,7 +36,8 @@ abstract class AbstractIcebergWriteExec extends
IcebergWriteExec {
getCodec,
getPartitionSpec,
IcebergWriteUtil.getSortOrder(write),
- nestedField
+ nestedField,
+ IcebergWriteUtil.getQueryId(write)
)
}
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
index 54ae50e578..60bab59786 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java
@@ -31,6 +31,9 @@ public class IcebergWriteJniWrapper implements RuntimeAware {
public native long init(long cSchema, int format,
String directory,
String codec,
+ int partitionId,
+ long taskId,
+ String operationId,
byte[] partitionSpec,
byte[] field);
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 437828368f..a3a2bc2f2f 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -326,4 +326,25 @@ class VeloxIcebergSuite extends IcebergSuite {
assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 0)
}
}
+
+ test("iceberg write file name") {
+ withTable("iceberg_tbl") {
+ spark.sql("create table if not exists iceberg_tbl (id int) using
iceberg")
+ spark.sql("insert into iceberg_tbl values 1")
+
+ val filePath = spark
+ .sql("select * from default.iceberg_tbl.files")
+ .select("file_path")
+ .collect()
+ .apply(0)
+ .getString(0)
+
+ val fileName = filePath.split('/').last
+ // Expected format:
{partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}.parquet
+ // Example: 00000-0-query_id-0-00001.parquet
+ assert(
+ fileName.matches("\\d{5}-\\d+-.*-\\d{5}\\.parquet"),
+ s"File name does not match expected format: $fileName")
+ }
+ }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 945e245969..0698c72426 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -308,7 +308,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val compressionCodec =
WriteFilesExecTransformer.getCompressionCodec(options)
if (unSupportedCompressions.contains(compressionCodec)) {
- Some("Brotli, lzo, lz4raw and lz4_raw compression codec is unsupported
in Velox backend.")
+ Some(s"$compressionCodec compression codec is unsupported in Velox
backend.")
} else {
None
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 6c61afb962..1b0d0647dd 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -32,25 +32,14 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite with WriteU
override protected val fileFormat: String = "parquet"
// The parquet compression codec extensions
- private val parquetCompressionCodecExtensions = if (isSparkVersionGE("3.5"))
{
+ private val parquetCompressionCodecExtensions = {
Map(
"none" -> "",
"uncompressed" -> "",
"snappy" -> ".snappy",
"gzip" -> ".gz",
"lzo" -> ".lzo",
- "lz4" -> ".lz4hadoop", // Specific extension for version 3.5
- "brotli" -> ".br",
- "zstd" -> ".zstd"
- )
- } else {
- Map(
- "none" -> "",
- "uncompressed" -> "",
- "snappy" -> ".snappy",
- "gzip" -> ".gz",
- "lzo" -> ".lzo",
- "lz4" -> ".lz4",
+ "lz4" -> (if (isSparkVersionGE("3.5")) ".lz4hadoop" else ".lz4"),
"brotli" -> ".br",
"zstd" -> ".zstd"
)
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 5aaf83d7f4..af0fde37ce 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -226,13 +226,16 @@ std::shared_ptr<IcebergWriter>
VeloxRuntime::createIcebergWriter(
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
+ int32_t partitionId,
+ int64_t taskId,
+ const std::string& operationId,
std::shared_ptr<const
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
auto connectorPool = memoryManager()->getAggregateMemoryPool();
return std::make_shared<IcebergWriter>(
- rowType, format, outputDirectory, compressionKind, spec, protoField,
sparkConfs, veloxPool, connectorPool);
+ rowType, format, outputDirectory, compressionKind, partitionId, taskId,
operationId, spec, protoField, sparkConfs, veloxPool, connectorPool);
}
#endif
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index 59eb43028f..b39733e839 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -74,6 +74,9 @@ class VeloxRuntime final : public Runtime {
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
+ int32_t partitionId,
+ int64_t taskId,
+ const std::string& operationId,
std::shared_ptr<const
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs);
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 2e9aa86605..1d6e7fa344 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -30,6 +30,77 @@ using namespace facebook::velox::connector::hive;
using namespace facebook::velox::connector::hive::iceberg;
namespace {
+// Custom Iceberg file name generator for Gluten
+class GlutenIcebergFileNameGenerator : public
connector::hive::FileNameGenerator {
+ public:
+ GlutenIcebergFileNameGenerator(
+ int32_t partitionId,
+ int64_t taskId,
+ const std::string& operationId,
+ dwio::common::FileFormat fileFormat)
+ : partitionId_(partitionId),
+ taskId_(taskId),
+ operationId_(operationId),
+ fileFormat_(fileFormat),
+ fileCount_(0) {}
+
+ std::pair<std::string, std::string> gen(
+ std::optional<uint32_t> bucketId,
+ const std::shared_ptr<const connector::hive::HiveInsertTableHandle>
insertTableHandle,
+ const connector::ConnectorQueryCtx& connectorQueryCtx,
+ bool commitRequired) const override {
+ auto targetFileName =
insertTableHandle->locationHandle()->targetFileName();
+ if (targetFileName.empty()) {
+ // Generate file name following Iceberg format:
+ // {partitionId:05d}-{taskId}-{operationId}-{fileCount:05d}{suffix}
+ fileCount_++;
+
+ std::string fileExtension;
+ switch (fileFormat_) {
+ case dwio::common::FileFormat::PARQUET:
+ fileExtension = ".parquet";
+ break;
+ case dwio::common::FileFormat::ORC:
+ fileExtension = ".orc";
+ break;
+ default:
+ fileExtension = ".parquet";
+ }
+
+ char buffer[256];
+ snprintf(
+ buffer,
+ sizeof(buffer),
+ "%05d-%" PRId64 "-%s-%05d%s",
+ partitionId_,
+ taskId_,
+ operationId_.c_str(),
+ fileCount_,
+ fileExtension.c_str());
+ targetFileName = std::string(buffer);
+ }
+
+ return {targetFileName, targetFileName};
+ }
+
+ folly::dynamic serialize() const override {
+ VELOX_UNREACHABLE("Unexpected code path, implement serialize() first.");
+ }
+
+ std::string toString() const override {
+ return fmt::format(
+ "GlutenIcebergFileNameGenerator(partitionId={}, taskId={},
operationId={})",
+ partitionId_, taskId_, operationId_);
+ }
+
+ private:
+ int32_t partitionId_;
+ int64_t taskId_;
+ std::string operationId_;
+ dwio::common::FileFormat fileFormat_;
+ mutable int32_t fileCount_;
+};
+
iceberg::IcebergNestedField convertToIcebergNestedField(const
gluten::IcebergNestedField& protoField) {
IcebergNestedField result;
result.id = protoField.id();
@@ -48,6 +119,9 @@ std::shared_ptr<IcebergInsertTableHandle>
createIcebergInsertTableHandle(
const std::string& outputDirectoryPath,
dwio::common::FileFormat fileFormat,
facebook::velox::common::CompressionKind compressionKind,
+ int32_t partitionId,
+ int64_t taskId,
+ const std::string& operationId,
std::shared_ptr<const IcebergPartitionSpec> spec,
const iceberg::IcebergNestedField& nestedField,
facebook::velox::memory::MemoryPool* pool) {
@@ -80,12 +154,17 @@ std::shared_ptr<IcebergInsertTableHandle>
createIcebergInsertTableHandle(
nestedField.children[i]));
}
}
+
+ auto fileNameGenerator = std::make_shared<const
GlutenIcebergFileNameGenerator>(
+ partitionId, taskId, operationId, fileFormat);
+
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath, outputDirectoryPath,
connector::hive::LocationHandle::TableType::kExisting);
const std::vector<IcebergSortingColumn> sortedBy;
+ const std::unordered_map<std::string, std::string> serdeParameters;
return std::make_shared<connector::hive::iceberg::IcebergInsertTableHandle>(
- columnHandles, locationHandle, spec, pool, fileFormat, sortedBy,
compressionKind);
+ columnHandles, locationHandle, spec, pool, fileFormat, sortedBy,
compressionKind, serdeParameters, fileNameGenerator);
}
} // namespace
@@ -96,12 +175,15 @@ IcebergWriter::IcebergWriter(
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
+ int32_t partitionId,
+ int64_t taskId,
+ const std::string& operationId,
std::shared_ptr<const iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& field,
const std::unordered_map<std::string, std::string>& sparkConfs,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool)
- : rowType_(rowType), field_(convertToIcebergNestedField(field)),
pool_(memoryPool), connectorPool_(connectorPool),
createTimeNs_(getCurrentTimeNano()) {
+ : rowType_(rowType), field_(convertToIcebergNestedField(field)),
partitionId_(partitionId), taskId_(taskId), operationId_(operationId),
pool_(memoryPool), connectorPool_(connectorPool),
createTimeNs_(getCurrentTimeNano()) {
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(sparkConfs));
connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
@@ -123,7 +205,7 @@ IcebergWriter::IcebergWriter(
dataSink_ = std::make_unique<IcebergDataSink>(
rowType_,
createIcebergInsertTableHandle(
- rowType_, outputDirectory, icebergFormatToVelox(format),
compressionKind, spec, field_, pool_.get()),
+ rowType_, outputDirectory, icebergFormatToVelox(format),
compressionKind, partitionId_, taskId_, operationId_, spec, field_,
pool_.get()),
connectorQueryCtx_.get(),
facebook::velox::connector::CommitStrategy::kNoCommit,
connectorConfig_);
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.h
b/cpp/velox/compute/iceberg/IcebergWriter.h
index f587195070..2fa13dcd69 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.h
+++ b/cpp/velox/compute/iceberg/IcebergWriter.h
@@ -43,6 +43,9 @@ class IcebergWriter {
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
+ int32_t partitionId,
+ int64_t taskId,
+ const std::string& operationId,
std::shared_ptr<const
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& field,
const std::unordered_map<std::string, std::string>& sparkConfs,
@@ -58,6 +61,9 @@ class IcebergWriter {
private:
facebook::velox::RowTypePtr rowType_;
const facebook::velox::connector::hive::iceberg::IcebergNestedField field_;
+ int32_t partitionId_;
+ int64_t taskId_;
+ std::string operationId_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool_;
std::shared_ptr<facebook::velox::connector::hive::HiveConfig>
connectorConfig_;
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 5633084c04..74adb1dff5 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -842,6 +842,9 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
jint format,
jstring directory,
jstring codecJstr,
+ jint partitionId,
+ jlong taskId,
+ jstring operationId,
jbyteArray partition,
jbyteArray fieldBytes) {
JNI_METHOD_START
@@ -863,6 +866,9 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
format,
jStringToCString(env, directory),
facebook::velox::common::stringToCompressionKind(jStringToCString(env,
codecJstr)),
+ partitionId,
+ taskId,
+ jStringToCString(env, operationId),
spec,
protoField,
sparkConf));
diff --git a/cpp/velox/tests/iceberg/IcebergWriteTest.cc
b/cpp/velox/tests/iceberg/IcebergWriteTest.cc
index 574d749370..740fa66f3f 100644
--- a/cpp/velox/tests/iceberg/IcebergWriteTest.cc
+++ b/cpp/velox/tests/iceberg/IcebergWriteTest.cc
@@ -58,6 +58,9 @@ TEST_F(VeloxIcebergWriteTest, write) {
1,
tmpPath + "/iceberg_write_test_table",
common::CompressionKind::CompressionKind_ZSTD,
+ 0, // partitionId
+ 0, // taskId
+ folly::to<std::string>(folly::Random::rand64()), // operationId
partitionSpec,
root,
std::unordered_map<std::string, std::string>(),
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
index c7c4c4e672..3f7ab278d0 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
@@ -57,6 +57,12 @@ object IcebergWriteUtil {
field
}
+ private lazy val queryIdField = {
+ val field = classOf[SparkWrite].getDeclaredField("queryId")
+ field.setAccessible(true)
+ field
+ }
+
def supportsWrite(write: Write): Boolean = {
write.isInstanceOf[SparkWrite]
}
@@ -98,6 +104,10 @@ object IcebergWriteUtil {
fileFormatField.get(write).asInstanceOf[FileFormat]
}
+ def getQueryId(write: Write): String = {
+ queryIdField.get(write).asInstanceOf[String]
+ }
+
def getDirectory(write: Write): String = {
val loc = getTable(write).locationProvider().newDataLocation("")
loc.substring(0, loc.length - 1)
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
index 4254c5baff..676be92e6a 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.connector.write;
+import org.apache.spark.TaskContext;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
@@ -43,6 +44,12 @@ public interface ColumnarBatchDataWriterFactory extends
Serializable {
*
* <p>If this method fails (by throwing an exception), the corresponding
Spark write task would
* fail and get retried until hitting the maximum retry times.
+ *
+ * @param partitionId A unique id of the RDD partition that the returned
writer will process.
+ * Usually Spark processes many RDD partitions at the same time,
implementations should use
+ * the partition id to distinguish writers for different partitions.
+ * @param taskId The task id returned by {@link
TaskContext#taskAttemptId()}. Spark may run
+ * multiple tasks for the same partition (due to speculation or task
failures, for example).
*/
- DataWriter<ColumnarBatch> createWriter();
+ DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId);
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
index ef432aa119..b0d7049955 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala
@@ -46,7 +46,7 @@ trait WritingColumnarBatchSparkTask[W <:
DataWriter[ColumnarBatch]]
val partId = context.partitionId()
val taskId = context.taskAttemptId()
val attemptId = context.attemptNumber()
- val dataWriter = factory.createWriter().asInstanceOf[W]
+ val dataWriter = factory.createWriter(partId, taskId).asInstanceOf[W]
var count = 0
// write the data and commit this writer.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]