Zouxxyy commented on code in PR #11435:
URL: 
https://github.com/apache/incubator-gluten/pull/11435#discussion_r2717131868


##########
gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/ColumnarWriteToDataSourceV2Exec.scala:
##########
@@ -46,7 +46,7 @@ trait WritingColumnarBatchSparkTask[W <: 
DataWriter[ColumnarBatch]]
     val partId = context.partitionId()
     val taskId = context.taskAttemptId()
     val attemptId = context.attemptNumber()
-    val dataWriter = factory.createWriter().asInstanceOf[W]
+    val dataWriter = factory.createWriter(partId, taskId).asInstanceOf[W]

Review Comment:
   I tried obtain them here
   ```c++
   std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
       RowTypePtr rowType,
       int32_t format,
       const std::string& outputDirectory,
       facebook::velox::common::CompressionKind compressionKind,
       const std::string& operationId,
       std::shared_ptr<const 
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
       const gluten::IcebergNestedField& protoField,
       const std::unordered_map<std::string, std::string>& sparkConfs) {
     GLUTEN_CHECK(taskInfo_.has_value(), "Task info must be set before creating 
IcebergWriter");
     auto veloxPool = memoryManager()->getLeafMemoryPool();
     auto connectorPool = memoryManager()->getAggregateMemoryPool();
     return std::make_shared<IcebergWriter>(
         rowType, format, outputDirectory, compressionKind, 
taskInfo_->partitionId, taskInfo_->taskId, operationId, spec, protoField, 
sparkConfs, veloxPool, connectorPool);
   }
   ```
   
   but got this error, perhaps the timing of setting taskInfo is inconsistent 
with Iceberg's write creation.
   
   ```
   22:18:47.228 ERROR org.apache.spark.task.TaskResources: Task 0 failed by 
error: 
   org.apache.gluten.exception.GlutenException: Task info must be set before 
creating IcebergWriter
        at org.apache.gluten.execution.IcebergWriteJniWrapper.init(Native 
Method)
        at 
org.apache.gluten.connector.write.IcebergDataWriteFactory.getJniWrapper(IcebergDataWriteFactory.scala:103)
        at 
org.apache.gluten.connector.write.IcebergDataWriteFactory.createWriter(IcebergDataWriteFactory.scala:77)
        at 
org.apache.spark.sql.datasources.v2.WritingColumnarBatchSparkTask.run(ColumnarWriteToDataSourceV2Exec.scala:49)
        at 
org.apache.spark.sql.datasources.v2.WritingColumnarBatchSparkTask.run$(ColumnarWriteToDataSourceV2Exec.scala:39)
        at 
org.apache.spark.sql.datasources.v2.DataWritingColumnarBatchSparkTask$.run(ColumnarWriteToDataSourceV2Exec.scala:93)
   ```
   
   Actually, I think maybe my implementation is fine too, especially the 
adjustment to the `ColumnarBatchDataWriterFactory` interface—it now fully 
aligns with Spark's `DataWriterFactory`.
   
   Gluten's `ColumnarBatchDataWriterFactory`
   
   ```java
   public interface ColumnarBatchDataWriterFactory extends Serializable {
     DataWriter<ColumnarBatch> createWriter(int partitionId, long taskId);
   }
   ```
   Spark's `DataWriterFactory`
   
   ```java
   public interface DataWriterFactory extends Serializable {
     DataWriter<InternalRow> createWriter(int partitionId, long taskId);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to