Copilot commented on code in PR #6365:
URL: https://github.com/apache/paimon/pull/6365#discussion_r2431248841


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala:
##########
@@ -297,3 +302,140 @@ class PartitionedJsonTable(
       partitionSchema())
   }
 }
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: 
StructType)
+  extends WriteBuilder {
+  override def build: Write = new Write() {
+    override def toBatch: BatchWrite = {
+      FormatTableBatchWrite(table, writeSchema)
+    }
+
+    override def toStreaming: StreamingWrite = {
+      throw new UnsupportedOperationException("FormatTable doesn't support 
streaming write")
+    }
+  }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema: 
StructType)
+  extends BatchWrite
+  with Logging {
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    FormatTableWriterFactory(table, writeSchema)
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Committing to FormatTable ${table.name()}")
+
+    val committers = messages
+      .collect {
+        case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+        case other =>
+          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
+      }
+      .flatten
+      .toSeq
+
+    try {
+      val start = System.currentTimeMillis()
+      committers.foreach(_.commit())
+      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+    } catch {
+      case e: Exception =>
+        logError("Failed to commit FormatTable writes", e)
+        throw e
+    }
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Aborting write to FormatTable ${table.name()}")
+    val committers = messages.collect {
+      case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+    }.flatten
+
+    committers.foreach {
+      committer =>
+        try {
+          committer.discard()
+        } catch {
+          case e: Exception => logWarning(s"Failed to abort committer: 
${e.getMessage}")
+        }
+    }
+  }
+}
+
+private case class FormatTableWriterFactory(table: FormatTable, writeSchema: 
StructType)
+  extends DataWriterFactory {
+
+  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
+    val formatTableWrite = table.newBatchWriteBuilder().newWrite()
+    new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+  }
+}
+
+private class FormatTableDataWriter(
+    table: FormatTable,
+    formatTableWrite: BatchTableWrite,
+    writeSchema: StructType)
+  extends DataWriter[InternalRow]
+  with Logging {
+
+  private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow 
= {
+    val numFields = writeSchema.fields.length
+    record => {
+      new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+    }
+  }

Review Comment:
   The magic number -1 passed to SparkInternalRowWrapper constructor should be 
replaced with a named constant or documented with a comment explaining its 
purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to