Copilot commented on code in PR #6365:
URL: https://github.com/apache/paimon/pull/6365#discussion_r2415666928


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala:
##########
@@ -297,3 +302,133 @@ class PartitionedJsonTable(
       partitionSchema())
   }
 }
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: 
StructType)
+  extends WriteBuilder {
+  override def build: Write = new Write() {
+    override def toBatch: BatchWrite = {
+      FormatTableBatchWrite(table, writeSchema)
+    }
+
+    override def toStreaming: StreamingWrite = {
+      throw new UnsupportedOperationException("FormatTable doesn't support 
streaming write")
+    }
+  }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema: 
StructType)
+  extends BatchWrite
+  with Logging {
+
+  private val batchWriteBuilder = 
table.newBatchWriteBuilder().asInstanceOf[FormatBatchWriteBuilder]
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    FormatTableWriterFactory(table, writeSchema, batchWriteBuilder)
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Committing to FormatTable ${table.name()}")
+
+    // For FormatTable, we don't use the batch commit mechanism from the 
builder
+    // Instead, we directly execute the committers

Review Comment:
   [nitpick] This comment explains what the code doesn't do but doesn't explain 
why this approach was chosen or what the implications are. Consider adding 
context about why the batch commit mechanism isn't used.
   ```suggestion
       // For FormatTable, we don't use the batch commit mechanism from the 
builder.
       // This is because FormatTable's committers may require direct execution 
to ensure
       // compatibility with its internal commit logic, which may not align 
with the generic
       // batch commit mechanism. By directly executing the committers, we 
maintain control
       // over error handling and commit ordering, but this may reduce 
atomicity and could
       // have implications for distributed failure recovery. Future 
improvements may revisit
       // this approach if FormatTable's batch commit support matures.
       // Instead, we directly execute the committers.
   ```



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala:
##########
@@ -297,3 +302,133 @@ class PartitionedJsonTable(
       partitionSchema())
   }
 }
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: 
StructType)
+  extends WriteBuilder {
+  override def build: Write = new Write() {
+    override def toBatch: BatchWrite = {
+      FormatTableBatchWrite(table, writeSchema)
+    }
+
+    override def toStreaming: StreamingWrite = {
+      throw new UnsupportedOperationException("FormatTable doesn't support 
streaming write")
+    }
+  }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema: 
StructType)
+  extends BatchWrite
+  with Logging {
+
+  private val batchWriteBuilder = 
table.newBatchWriteBuilder().asInstanceOf[FormatBatchWriteBuilder]
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    FormatTableWriterFactory(table, writeSchema, batchWriteBuilder)
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Committing to FormatTable ${table.name()}")
+
+    // For FormatTable, we don't use the batch commit mechanism from the 
builder
+    // Instead, we directly execute the committers
+    val committers = messages
+      .collect {
+        case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+        case other =>
+          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
+      }
+      .flatten
+      .toSeq
+
+    try {
+      val start = System.currentTimeMillis()
+      committers.foreach(_.commit())
+      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+    } catch {
+      case e: Exception =>
+        logError("Failed to commit FormatTable writes", e)
+        throw e
+    }
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Aborting write to FormatTable ${table.name()}")
+    // FormatTable doesn't have specific cleanup requirements for now

Review Comment:
   [nitpick] The comment 'for now' suggests this is temporary behavior. 
Consider documenting what cleanup requirements might be needed in the future or 
removing 'for now' if this is the intended permanent behavior.
   ```suggestion
       // FormatTable does not have specific cleanup requirements during abort
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to