This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e02f035848 [core] spark: support format table write (#6365)
e02f035848 is described below

commit e02f035848a862cd8456ca48c029b824c5de1ec8
Author: jerry <[email protected]>
AuthorDate: Thu Oct 16 20:34:14 2025 +0800

    [core] spark: support format table write (#6365)
---
 .../fs/MultiPartUploadTwoPhaseOutputStream.java    |   2 +
 .../paimon/fs/RenamingTwoPhaseOutputStream.java    |   2 +
 .../org/apache/paimon/fs/TwoPhaseOutputStream.java |   3 +-
 .../org/apache/paimon/catalog/CatalogTestBase.java |  38 ++++--
 .../format/orc/writer/RowDataVectorizer.java       |   2 +-
 .../paimon/format/text/HadoopCompressionUtils.java |   7 +
 .../spark/sql/execution/PaimonFormatTable.scala    | 152 ++++++++++++++++++++-
 .../paimon/spark/table/PaimonFormatTableTest.scala |  65 +++++----
 8 files changed, 225 insertions(+), 46 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index 362635b4df..2bfd0bec72 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -158,6 +158,8 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
 
     private static class MultiPartUploadCommitter<T, C> implements Committer {
 
+        private static final long serialVersionUID = 1L;
+
         private final MultiPartUploadStore<T, C> multiPartUploadStore;
         private final String uploadId;
         private final String objectName;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
index 0da827d920..48a7c4f912 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
@@ -96,6 +96,8 @@ public class RenamingTwoPhaseOutputStream extends 
TwoPhaseOutputStream {
     /** Committer implementation that renames temporary file to target path. */
     private static class TempFileCommitter implements Committer {
 
+        private static final long serialVersionUID = 1L;
+
         private final FileIO fileIO;
         private final Path tempPath;
         private final Path targetPath;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
index 401486e91b..3b8c18a70e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.fs;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /** TwoPhaseOutputStream provides a way to write to a file and get a committer 
that can commit. */
 public abstract class TwoPhaseOutputStream extends PositionOutputStream {
@@ -35,7 +36,7 @@ public abstract class TwoPhaseOutputStream extends 
PositionOutputStream {
     public abstract Committer closeForCommit() throws IOException;
 
     /** A committer interface that can commit or discard the written data. */
-    public interface Committer {
+    public interface Committer extends Serializable {
 
         /**
          * Commits the written data, making it visible.
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index a515e96ae0..ea3e2ebd86 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -583,7 +583,6 @@ public abstract class CatalogTestBase {
         Random random = new Random();
         String dbName = "test_db";
         catalog.createDatabase(dbName, true);
-        HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f1", DataTypes.INT());
         schemaBuilder.column("f2", DataTypes.INT());
@@ -591,14 +590,20 @@ public abstract class CatalogTestBase {
         schemaBuilder.column("dt2", DataTypes.VARCHAR(64));
         schemaBuilder.partitionKeys("dt", "dt2");
         schemaBuilder.option("type", "format-table");
-        schemaBuilder.option("file.compression", compressionType.value());
         schemaBuilder.option("format-table.partition-path-only-value", "true");
-        String[] formats = {"csv", "parquet", "json"};
+        Pair[] format2Compressions = {
+            Pair.of("csv", HadoopCompressionType.GZIP),
+            Pair.of("parquet", HadoopCompressionType.ZSTD),
+            Pair.of("json", HadoopCompressionType.GZIP),
+            Pair.of("orc", HadoopCompressionType.ZSTD)
+        };
         int dtPartitionValue = 10;
         String dt2PartitionValue = "2022-01-01";
-        for (String format : formats) {
-            Identifier identifier = Identifier.create(dbName, 
"partition_table_" + format);
-            schemaBuilder.option("file.format", format);
+        for (Pair<String, HadoopCompressionType> format2Compression : 
format2Compressions) {
+            Identifier identifier =
+                    Identifier.create(dbName, "partition_table_" + 
format2Compression.getKey());
+            schemaBuilder.option("file.compression", 
format2Compression.getValue().value());
+            schemaBuilder.option("file.format", format2Compression.getKey());
             catalog.createTable(identifier, schemaBuilder.build(), true);
             FormatTable table = (FormatTable) catalog.getTable(identifier);
             int size = 5;
@@ -619,7 +624,7 @@ public abstract class CatalogTestBase {
             partitionSpec.put("dt2", dt2PartitionValue + 1);
             List<InternalRow> readFilterData = read(table, null, null, 
partitionSpec, null);
             assertThat(readFilterData).isEmpty();
-            catalog.dropTable(Identifier.create(dbName, format), true);
+            catalog.dropTable(identifier, true);
         }
     }
 
@@ -633,21 +638,26 @@ public abstract class CatalogTestBase {
         String dbName = "test_db";
         catalog.createDatabase(dbName, true);
         int partitionValue = 10;
-        HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f1", DataTypes.INT());
         schemaBuilder.column("f2", DataTypes.INT());
         schemaBuilder.column("dt", DataTypes.INT());
         schemaBuilder.option("type", "format-table");
         schemaBuilder.option("target-file-size", "1 kb");
-        schemaBuilder.option("file.compression", compressionType.value());
-        String[] formats = {"csv", "parquet", "json"};
-        for (String format : formats) {
+        Pair[] format2Compressions = {
+            Pair.of("csv", HadoopCompressionType.GZIP),
+            Pair.of("parquet", HadoopCompressionType.ZSTD),
+            Pair.of("json", HadoopCompressionType.GZIP),
+            Pair.of("orc", HadoopCompressionType.ZSTD)
+        };
+        for (Pair<String, HadoopCompressionType> format2Compression : 
format2Compressions) {
             if (partitioned) {
                 schemaBuilder.partitionKeys("dt");
             }
-            Identifier identifier = Identifier.create(dbName, "table_" + 
format);
-            schemaBuilder.option("file.format", format);
+            Identifier identifier =
+                    Identifier.create(dbName, "table_" + 
format2Compression.getKey());
+            schemaBuilder.option("file.format", format2Compression.getKey());
+            schemaBuilder.option("file.compression", 
format2Compression.getValue().value());
             catalog.createTable(identifier, schemaBuilder.build(), true);
             FormatTable table = (FormatTable) catalog.getTable(identifier);
             int[] projection = new int[] {1, 2};
@@ -696,7 +706,7 @@ public abstract class CatalogTestBase {
                         read(table, partitionFilterPredicate, projection, 
null, null);
                 
assertThat(readPartitionAndNoPartitionFilterData).hasSize(size);
             }
-            catalog.dropTable(Identifier.create(dbName, format), true);
+            catalog.dropTable(identifier, true);
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index 5cb37f191a..e36fbdfee9 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -53,7 +53,7 @@ public class RowDataVectorizer extends 
Vectorizer<InternalRow> {
     @Override
     public void vectorize(InternalRow row, VectorizedRowBatch batch) {
         int rowId = batch.size++;
-        for (int i = 0; i < row.getFieldCount(); ++i) {
+        for (int i = 0; i < fieldNames.length; ++i) {
             ColumnVector fieldColumn = batch.cols[i];
             if (row.isNullAt(i)) {
                 if (!isNullable[i]) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
index 7ff2f97e17..0a292c4d8b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -101,6 +102,12 @@ public class HadoopCompressionUtils {
             Class<?> codecClass = Class.forName(codecName);
             CompressionCodec codec =
                     (CompressionCodec) 
codecClass.getDeclaredConstructor().newInstance();
+
+            // To fix npe when the codec implements Configurable
+            if (codec instanceof Configurable) {
+                ((Configurable) codec).setConf(new Configuration());
+            }
+
             codec.createOutputStream(new java.io.ByteArrayOutputStream());
             return Optional.of(codec);
         } catch (Exception | UnsatisfiedLinkError e) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
index 5167c4c1bd..aa444aba92 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -18,17 +18,22 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.paimon.spark.{PaimonFormatTableScanBuilder, SparkTypeUtils}
+import org.apache.paimon.fs.TwoPhaseOutputStream
+import org.apache.paimon.spark.{PaimonFormatTableScanBuilder, 
SparkInternalRowWrapper, SparkTypeUtils}
 import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.format.{FormatBatchWriteBuilder, 
TwoPhaseCommitMessage}
+import org.apache.paimon.table.sink.BatchTableWrite
 
 import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
 import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, 
SupportsRead, SupportsWrite, TableCapability}
-import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, 
BATCH_WRITE}
 import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, Write, WriteBuilder, 
WriterCommitMessage}
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder, 
CSVTable}
 import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -186,7 +191,7 @@ case class PaimonFormatTable(
   }
 
   override def capabilities(): util.Set[TableCapability] = {
-    util.EnumSet.of(BATCH_READ)
+    util.EnumSet.of(BATCH_READ, BATCH_WRITE)
   }
 
   override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
@@ -194,7 +199,7 @@ case class PaimonFormatTable(
   }
 
   override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): 
WriteBuilder = {
-    throw new UnsupportedOperationException()
+    PaimonFormatTableWriterBuilder(table, schema)
   }
 }
 
@@ -297,3 +302,140 @@ class PartitionedJsonTable(
       partitionSchema())
   }
 }
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: 
StructType)
+  extends WriteBuilder {
+  override def build: Write = new Write() {
+    override def toBatch: BatchWrite = {
+      FormatTableBatchWrite(table, writeSchema)
+    }
+
+    override def toStreaming: StreamingWrite = {
+      throw new UnsupportedOperationException("FormatTable doesn't support 
streaming write")
+    }
+  }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema: 
StructType)
+  extends BatchWrite
+  with Logging {
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    FormatTableWriterFactory(table, writeSchema)
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Committing to FormatTable ${table.name()}")
+
+    val committers = messages
+      .collect {
+        case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+        case other =>
+          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
+      }
+      .flatten
+      .toSeq
+
+    try {
+      val start = System.currentTimeMillis()
+      committers.foreach(_.commit())
+      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+    } catch {
+      case e: Exception =>
+        logError("Failed to commit FormatTable writes", e)
+        throw e
+    }
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Aborting write to FormatTable ${table.name()}")
+    val committers = messages.collect {
+      case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+    }.flatten
+
+    committers.foreach {
+      committer =>
+        try {
+          committer.discard()
+        } catch {
+          case e: Exception => logWarning(s"Failed to abort committer: 
${e.getMessage}")
+        }
+    }
+  }
+}
+
+private case class FormatTableWriterFactory(table: FormatTable, writeSchema: 
StructType)
+  extends DataWriterFactory {
+
+  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
+    val formatTableWrite = table.newBatchWriteBuilder().newWrite()
+    new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+  }
+}
+
+private class FormatTableDataWriter(
+    table: FormatTable,
+    formatTableWrite: BatchTableWrite,
+    writeSchema: StructType)
+  extends DataWriter[InternalRow]
+  with Logging {
+
+  private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow 
= {
+    val numFields = writeSchema.fields.length
+    record => {
+      new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+    }
+  }
+
+  override def write(record: InternalRow): Unit = {
+    val paimonRow = rowConverter.apply(record)
+    formatTableWrite.write(paimonRow)
+  }
+
+  override def commit(): WriterCommitMessage = {
+    try {
+      val committers = formatTableWrite
+        .prepareCommit()
+        .asScala
+        .map {
+          case committer: TwoPhaseCommitMessage => committer.getCommitter
+          case other =>
+            throw new IllegalArgumentException(
+              "Unsupported commit message type: " + 
other.getClass.getSimpleName)
+        }
+        .toSeq
+      FormatTableTaskCommit(committers)
+    } finally {
+      close()
+    }
+  }
+
+  override def abort(): Unit = {
+    logInfo("Aborting FormatTable data writer")
+    close()
+  }
+
+  override def close(): Unit = {
+    try {
+      formatTableWrite.close()
+    } catch {
+      case e: Exception =>
+        logError("Error closing FormatTableDataWriter", e)
+        throw new RuntimeException(e)
+    }
+  }
+}
+
+/** Commit message container for FormatTable writes, holding committers that 
need to be executed. */
+class FormatTableTaskCommit private (private val _committers: 
Seq[TwoPhaseOutputStream.Committer])
+  extends WriterCommitMessage {
+
+  def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
+}
+
+object FormatTableTaskCommit {
+  def apply(committers: Seq[TwoPhaseOutputStream.Committer]): 
FormatTableTaskCommit = {
+    new FormatTableTaskCommit(committers)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index c01898a7c8..a78813bc0f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.spark.table
 
-import org.apache.paimon.CoreOptions
 import org.apache.paimon.catalog.Identifier
 import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
@@ -39,10 +38,9 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     val tableName = "paimon_format_test_csv_malformed"
     withTable(tableName) {
       sql(
-        s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV 
OPTIONS ('" +
-          s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
-          s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
-              
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}') 
PARTITIONED BY (`ds` bigint)")
+        s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV 
TBLPROPERTIES (" +
+          s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+          "'format-table.implementation'='paimon') PARTITIONED BY (`ds` 
bigint)")
       val table =
         paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).asInstanceOf[FormatTable]
       val partition = 20250920
@@ -67,18 +65,15 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     val tableName = "paimon_format_test_csv_options"
     withTable(tableName) {
       sql(
-        s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV OPTIONS ('" +
-          s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
-          s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
-              
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}') 
PARTITIONED BY (`ds` bigint)")
+        s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES 
(" +
+          s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+          "'format-table.implementation'='paimon') PARTITIONED BY (`ds` 
bigint)")
       val table =
         paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).asInstanceOf[FormatTable]
+      table.fileIO().mkdirs(new Path(table.location()))
       val partition = 20250920
-      val csvFile =
-        new Path(
-          table.location(),
-          
s"ds=$partition/part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
-      table.fileIO().writeFile(csvFile, "1|asfasdfsdf\n2|asfasdfsdf", false)
+      sql(
+        s"INSERT INTO $tableName VALUES (1, 'asfasdfsdf', $partition), (2, 
'asfasdfsdf', $partition)")
       checkAnswer(
         sql(s"SELECT * FROM $tableName"),
         Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition))
@@ -86,7 +81,31 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     }
   }
 
-  test("PaimonFormatTableRead: read non-partitioned table") {
+  test("PaimonFormatTable: csv with partition path only value") {
+    val tableName = "paimon_format_test_partition_path_only_value"
+    withTable(tableName) {
+      sql(
+        s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES 
(" +
+          s"'file.compression'='none','format-table.implementation'='paimon'," 
+
+          "'format-table.partition-path-only-value'='true') PARTITIONED BY 
(`ds` bigint)")
+      val table =
+        paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).asInstanceOf[FormatTable]
+      table.fileIO().mkdirs(new Path(table.location()))
+      val partition = 20250920
+      sql(
+        s"INSERT INTO $tableName VALUES (1, 'asfasdfsdf', $partition), (2, 
'asfasdfsdf', $partition)")
+      checkAnswer(
+        sql(s"SELECT ds, f0 FROM $tableName"),
+        Seq(Row(partition, 1), Row(partition, 2))
+      )
+      checkAnswer(
+        sql(s"SELECT ds, f0 FROM $tableName where ds = $partition order by f0 
limit 1"),
+        Seq(Row(partition, 1))
+      )
+    }
+  }
+
+  test("PaimonFormatTable: non-partitioned table") {
     for {
       (format, compression) <- Seq(
         ("csv", "gzip"),
@@ -99,7 +118,8 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
         // Create format table using the same pattern as FormatTableTestBase
         sql(
           s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE) USING 
$format " +
-            s"TBLPROPERTIES ('file.compression'='$compression', 'seq'=',', 
'lineSep'='\n')")
+            s"TBLPROPERTIES ('file.compression'='$compression', 'seq'=',', 
'lineSep'='\n'," +
+            " 'format-table.implementation'='paimon')")
         val path =
           paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).options().get("path")
         fileIO.mkdirs(new Path(path))
@@ -109,9 +129,6 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
         sql(s"INSERT INTO $tableName VALUES (3, 'Charlie', 30.9)")
 
         // Test reading all data
-        sql(
-          s"Alter table $tableName SET TBLPROPERTIES 
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
-            s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
         checkAnswer(
           sql(s"SELECT * FROM $tableName ORDER BY id"),
           Seq(
@@ -143,7 +160,7 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     }
   }
 
-  test("PaimonFormatTableRead: read partitioned table") {
+  test("PaimonFormatTable: partitioned table") {
     for {
       (format, compression) <- Seq(
         ("csv", "gzip"),
@@ -155,8 +172,9 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
       withTable(tableName) {
         // Create partitioned format table
         sql(
-          s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE, dept 
STRING) USING $format " +
-            s"PARTITIONED BY (dept) TBLPROPERTIES 
('file.compression'='$compression')")
+          s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE) USING 
$format " +
+            s"PARTITIONED BY (dept STRING) TBLPROPERTIES 
('file.compression'='$compression'," +
+            " 'format-table.implementation'='paimon')")
         val paimonTable = paimonCatalog.getTable(Identifier.create("test_db", 
tableName))
         val path =
           paimonCatalog.getTable(Identifier.create("test_db", 
tableName)).options().get("path")
@@ -170,9 +188,6 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
             s" (5, 'Eve', 15.8, 'Marketing')")
 
         // Test reading all data
-        sql(
-          s"Alter table $tableName SET TBLPROPERTIES 
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
-            s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
         checkAnswer(
           sql(s"SELECT * FROM $tableName ORDER BY id"),
           Seq(

Reply via email to