Zouxxyy commented on code in PR #11435:
URL:
https://github.com/apache/incubator-gluten/pull/11435#discussion_r2717131868
##########
gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala:
##########
@@ -46,7 +46,7 @@ trait WritingColumnarBatchSparkTask[W <:
DataWriter[ColumnarBatch]]
val partId = context.partitionId()
val taskId = context.taskAttemptId()
val attemptId = context.attemptNumber()
- val dataWriter = factory.createWriter().asInstanceOf[W]
+ val dataWriter = factory.createWriter(partId, taskId).asInstanceOf[W]
Review Comment:
I tried obtain them here
```c++
std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
RowTypePtr rowType,
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
const std::string& operationId,
std::shared_ptr<const
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs) {
GLUTEN_CHECK(taskInfo_.has_value(), "Task info must be set before creating
IcebergWriter");
auto veloxPool = memoryManager()->getLeafMemoryPool();
auto connectorPool = memoryManager()->getAggregateMemoryPool();
return std::make_shared<IcebergWriter>(
rowType, format, outputDirectory, compressionKind,
taskInfo_->partitionId, taskInfo_->taskId, operationId, spec, protoField,
sparkConfs, veloxPool, connectorPool);
}
```
but got this error, perhaps the timing of setting taskInfo is inconsistent
with Iceberg's write creation.
```
22:18:47.228 ERROR org.apache.spark.task.TaskResources: Task 0 failed by
error:
org.apache.gluten.exception.GlutenException: Task info must be set before
creating IcebergWriter
at org.apache.gluten.execution.IcebergWriteJniWrapper.init(Native
Method)
at
org.apache.gluten.connector.write.IcebergDataWriteFactory.getJniWrapper(IcebergDataWriteFactory.scala:103)
at
org.apache.gluten.connector.write.IcebergDataWriteFactory.createWriter(IcebergDataWriteFactory.scala:77)
at
org.apache.spark.sql.datasources.v2.WritingColumnarBatchSparkTask.run(ColumnarWriteToDataSourceV2Exec.scala:49)
at
org.apache.spark.sql.datasources.v2.WritingColumnarBatchSparkTask.run$(ColumnarWriteToDataSourceV2Exec.scala:39)
at
org.apache.spark.sql.datasources.v2.DataWritingColumnarBatchSparkTask$.run(ColumnarWriteToDataSourceV2Exec.scala:93)
```
Actually, I think maybe my implementation is fine too, especially the
adjustment to the `ColumnarBatchDataWriterFactory` interface—it now fully
aligns with Spark's `DataWriterFactory`.
Gluten's `ColumnarBatchDataWriterFactory`
```java
public interface ColumnarBatchDataWriterFactory extends Serializable {
DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId);
}
```
Spark's `DataWriterFactory`
```java
public interface DataWriterFactory extends Serializable {
DataWriter<InternalRow> createWriter(int partitionId, long taskId);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]