Copilot commented on code in PR #6365:
URL: https://github.com/apache/paimon/pull/6365#discussion_r2415666928
##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala:
##########
@@ -297,3 +302,133 @@ class PartitionedJsonTable(
partitionSchema())
}
}
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema:
StructType)
+ extends WriteBuilder {
+ override def build: Write = new Write() {
+ override def toBatch: BatchWrite = {
+ FormatTableBatchWrite(table, writeSchema)
+ }
+
+ override def toStreaming: StreamingWrite = {
+ throw new UnsupportedOperationException("FormatTable doesn't support
streaming write")
+ }
+ }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema:
StructType)
+ extends BatchWrite
+ with Logging {
+
+ private val batchWriteBuilder =
table.newBatchWriteBuilder().asInstanceOf[FormatBatchWriteBuilder]
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ FormatTableWriterFactory(table, writeSchema, batchWriteBuilder)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Committing to FormatTable ${table.name()}")
+
+ // For FormatTable, we don't use the batch commit mechanism from the
builder
+ // Instead, we directly execute the committers
Review Comment:
[nitpick] This comment explains what the code doesn't do but doesn't explain
why this approach was chosen or what the implications are. Consider adding
context about why the batch commit mechanism isn't used.
```suggestion
// For FormatTable, we don't use the batch commit mechanism from the
builder.
// This is because FormatTable's committers may require direct execution
to ensure
// compatibility with its internal commit logic, which may not align
with the generic
// batch commit mechanism. By directly executing the committers, we
maintain control
// over error handling and commit ordering, but this may reduce
atomicity and could
// have implications for distributed failure recovery. Future
improvements may revisit
// this approach if FormatTable's batch commit support matures.
// Instead, we directly execute the committers.
```
##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala:
##########
@@ -297,3 +302,133 @@ class PartitionedJsonTable(
partitionSchema())
}
}
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema:
StructType)
+ extends WriteBuilder {
+ override def build: Write = new Write() {
+ override def toBatch: BatchWrite = {
+ FormatTableBatchWrite(table, writeSchema)
+ }
+
+ override def toStreaming: StreamingWrite = {
+ throw new UnsupportedOperationException("FormatTable doesn't support
streaming write")
+ }
+ }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema:
StructType)
+ extends BatchWrite
+ with Logging {
+
+ private val batchWriteBuilder =
table.newBatchWriteBuilder().asInstanceOf[FormatBatchWriteBuilder]
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ FormatTableWriterFactory(table, writeSchema, batchWriteBuilder)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Committing to FormatTable ${table.name()}")
+
+ // For FormatTable, we don't use the batch commit mechanism from the
builder
+ // Instead, we directly execute the committers
+ val committers = messages
+ .collect {
+ case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+ case other =>
+ throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
+ }
+ .flatten
+ .toSeq
+
+ try {
+ val start = System.currentTimeMillis()
+ committers.foreach(_.commit())
+ logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+ } catch {
+ case e: Exception =>
+ logError("Failed to commit FormatTable writes", e)
+ throw e
+ }
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Aborting write to FormatTable ${table.name()}")
+ // FormatTable doesn't have specific cleanup requirements for now
Review Comment:
[nitpick] The comment 'for now' suggests this is temporary behavior.
Consider documenting what cleanup requirements might be needed in the future or
removing 'for now' if this is the intended permanent behavior.
```suggestion
// FormatTable does not have specific cleanup requirements during abort
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]