Copilot commented on code in PR #6365:
URL: https://github.com/apache/paimon/pull/6365#discussion_r2415629715
##########
paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java:
##########
@@ -583,22 +583,27 @@ public void testFormatTableOnlyPartitionValueRead()
throws Exception {
Random random = new Random();
String dbName = "test_db";
catalog.createDatabase(dbName, true);
- HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f1", DataTypes.INT());
schemaBuilder.column("f2", DataTypes.INT());
schemaBuilder.column("dt", DataTypes.INT());
schemaBuilder.column("dt2", DataTypes.VARCHAR(64));
schemaBuilder.partitionKeys("dt", "dt2");
schemaBuilder.option("type", "format-table");
- schemaBuilder.option("file.compression", compressionType.value());
schemaBuilder.option("format-table.partition-path-only-value", "true");
- String[] formats = {"csv", "parquet", "json"};
+ Pair[] format2Compressions = {
+ Pair.of("csv", HadoopCompressionType.GZIP),
+ Pair.of("parquet", HadoopCompressionType.ZSTD),
+ Pair.of("json", HadoopCompressionType.GZIP),
+ Pair.of("orc", HadoopCompressionType.ZSTD)
+ };
int dtPartitionValue = 10;
String dt2PartitionValue = "2022-01-01";
- for (String format : formats) {
- Identifier identifier = Identifier.create(dbName,
"partition_table_" + format);
- schemaBuilder.option("file.format", format);
+ for (Pair<String, HadoopCompressionType> format2Compression :
format2Compressions) {
+ Identifier identifier =
+ Identifier.create(dbName, "partition_table_" +
format2Compression.getKey());
+ schemaBuilder.option("file.compression",
format2Compression.getValue().value());
+ schemaBuilder.option("file.format",
format2Compression.getKey().toString());
Review Comment:
Calling toString() on a String is redundant. The getKey() method already
returns a String, so this should be `format2Compression.getKey()`.
```suggestion
schemaBuilder.option("file.format", format2Compression.getKey());
```
##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala:
##########
@@ -297,3 +302,136 @@ class PartitionedJsonTable(
partitionSchema())
}
}
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema:
StructType)
+ extends WriteBuilder {
+ override def build: Write = new Write() {
+ override def toBatch: BatchWrite = {
+ FormatTableBatchWrite(table, writeSchema)
+ }
+
+ override def toStreaming: StreamingWrite = {
+ throw new UnsupportedOperationException("FormatTable doesn't support
streaming write")
+ }
+ }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema:
StructType)
+ extends BatchWrite
+ with Logging {
+
+ private val batchWriteBuilder =
table.newBatchWriteBuilder().asInstanceOf[FormatBatchWriteBuilder]
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ FormatTableWriterFactory(table, writeSchema, batchWriteBuilder)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Committing to FormatTable ${table.name()}")
+
+ // For FormatTable, we don't use the batch commit mechanism from the
builder
+ // Instead, we directly execute the committers
+ val committers = messages
+ .collect {
+ case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+ case other =>
+ throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
+ }
+ .flatten
+ .toSeq
+
+ try {
+ val start = System.currentTimeMillis()
+ committers.foreach(_.commit())
+ logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+ } catch {
+ case e: Exception =>
+ logError("Failed to commit FormatTable writes", e)
+ throw e
+ }
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Aborting write to FormatTable ${table.name()}")
+ // FormatTable doesn't have specific cleanup requirements for now
+ }
+}
+
+private case class FormatTableWriterFactory(
+ table: FormatTable,
+ writeSchema: StructType,
+ batchWriteBuilder: FormatBatchWriteBuilder)
+ extends DataWriterFactory {
+
+ override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
+ val formatTableWrite =
+ batchWriteBuilder
+ .newWrite()
+ .asInstanceOf[BatchTableWrite]
+ new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+ }
+}
+
+private class FormatTableDataWriter(
+ table: FormatTable,
+ formatTableWrite: BatchTableWrite,
+ writeSchema: StructType)
+ extends DataWriter[InternalRow]
+ with Logging {
+
+ private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow
= {
+ val numFields = writeSchema.fields.length
+ record => {
+ new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+ }
+ }
+
+ override def write(record: InternalRow): Unit = {
+ val paimonRow = rowConverter.apply(record)
+ formatTableWrite.write(paimonRow)
+ }
+
+ override def commit(): WriterCommitMessage = {
+ try {
+ val committers = formatTableWrite.prepareCommit().asScala.map {
+ case committer: TwoPhaseCommitMessage => committer.getCommitter
+ case _ => throw new IllegalArgumentException("Unsupported commit
message")
+ }
+ // Execute committers immediately to avoid serialization issues
+ committers.foreach(_.commit())
+ // Return empty commit message since we already committed
+ FormatTableTaskCommit(Seq.empty)
Review Comment:
The commit logic bypasses Spark's normal two-phase commit protocol by
executing committers immediately in the DataWriter. This breaks Spark's
atomicity guarantees - if the driver fails after some tasks commit but before
others, you'll have partial writes. Consider implementing proper two-phase
commit by returning the committers in the commit message and executing them in
the BatchWrite.commit() method.
```suggestion
// Do NOT execute committers here; return them in the commit message
for driver-side commit
FormatTableTaskCommit(committers)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]