This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b4f0e7430 [core] Introduce format table write (#6288)
7b4f0e7430 is described below

commit 7b4f0e74309407ccac9f2efdd803a2abb101cb7d
Author: jerry <[email protected]>
AuthorDate: Thu Oct 9 10:41:59 2025 +0800

    [core] Introduce format table write (#6288)
---
 .../paimon/append/RollingBlobFileWriter.java       |   6 +-
 .../apache/paimon/io/FileWriterAbortExecutor.java  |  37 +++++
 .../paimon/io/FormatTableRollingFileWriter.java    | 144 +++++++++++++++++++
 ...riter.java => FormatTableSingleFileWriter.java} | 123 +++++-----------
 .../apache/paimon/io/RollingFileWriterImpl.java    |   5 +-
 .../org/apache/paimon/io/SingleFileWriter.java     |  20 +--
 .../java/org/apache/paimon/table/FormatTable.java  |   3 +-
 .../table/format/FormatBatchWriteBuilder.java      |  81 +++++++++++
 .../paimon/table/format/FormatTableFileWriter.java | 118 ++++++++++++++++
 .../table/format/FormatTableRecordWriter.java      |  87 ++++++++++++
 .../paimon/table/format/FormatTableWrite.java      | 154 +++++++++++++++++++++
 .../paimon/table/format/TwoPhaseCommitMessage.java |  53 +++++++
 .../sink/FormatTableRowPartitionKeyExtractor.java  |  47 +++++++
 .../apache/paimon/utils/FileStorePathFactory.java  |  30 +++-
 .../org/apache/paimon/catalog/CatalogTestBase.java | 140 ++++---------------
 15 files changed, 817 insertions(+), 231 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index b4b15f2ef9..17d4be7c76 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -26,11 +26,11 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileWriterAbortExecutor;
 import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.io.RollingFileWriterImpl;
 import org.apache.paimon.io.RowDataFileWriter;
 import org.apache.paimon.io.SingleFileWriter;
-import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
@@ -91,7 +91,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
     private final long targetFileSize;
 
     // State management
-    private final List<AbortExecutor> closedWriters;
+    private final List<FileWriterAbortExecutor> closedWriters;
     private final List<DataFileMeta> results;
     private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, 
DataFileMeta>
             currentWriter;
@@ -316,7 +316,7 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
             currentWriter.abort();
             currentWriter = null;
         }
-        for (AbortExecutor abortExecutor : closedWriters) {
+        for (FileWriterAbortExecutor abortExecutor : closedWriters) {
             abortExecutor.abort();
         }
         blobWriter.abort();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileWriterAbortExecutor.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterAbortExecutor.java
new file mode 100644
index 0000000000..4c10608c11
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterAbortExecutor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+/** Abort executor to just have reference of path instead of whole writer. */
+public class FileWriterAbortExecutor {
+    private final FileIO fileIO;
+    private final Path path;
+
+    public FileWriterAbortExecutor(FileIO fileIO, Path path) {
+        this.fileIO = fileIO;
+        this.path = path;
+    }
+
+    public void abort() {
+        fileIO.deleteQuietly(path);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FormatTableRollingFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableRollingFileWriter.java
new file mode 100644
index 0000000000..f081abf310
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableRollingFileWriter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.types.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Format table's writer to roll over to a new file if the current size exceed 
the target file size.
+ */
+public class FormatTableRollingFileWriter implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FormatTableRollingFileWriter.class);
+
+    private static final int CHECK_ROLLING_RECORD_CNT = 1000;
+
+    private final Supplier<FormatTableSingleFileWriter> writerFactory;
+    private final long targetFileSize;
+    private final List<FileWriterAbortExecutor> closedWriters;
+    private final List<TwoPhaseOutputStream.Committer> committers;
+
+    private FormatTableSingleFileWriter currentWriter = null;
+    private long recordCount = 0;
+    private boolean closed = false;
+
+    public FormatTableRollingFileWriter(
+            FileIO fileIO,
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            DataFilePathFactory pathFactory,
+            String fileCompression) {
+        this.writerFactory =
+                () ->
+                        new FormatTableSingleFileWriter(
+                                fileIO,
+                                fileFormat.createWriterFactory(writeSchema),
+                                pathFactory.newPath(),
+                                fileCompression);
+        this.targetFileSize = targetFileSize;
+        this.closedWriters = new ArrayList<>();
+        this.committers = new ArrayList<>();
+    }
+
+    public long targetFileSize() {
+        return targetFileSize;
+    }
+
+    public void write(InternalRow row) throws IOException {
+        try {
+            if (currentWriter == null) {
+                currentWriter = writerFactory.get();
+            }
+
+            currentWriter.write(row);
+            recordCount += 1;
+            boolean needRolling =
+                    currentWriter.reachTargetSize(
+                            recordCount % CHECK_ROLLING_RECORD_CNT == 0, 
targetFileSize);
+            if (needRolling) {
+                closeCurrentWriter();
+            }
+        } catch (Throwable e) {
+            LOG.warn(
+                    "Exception occurs when writing file {}. Cleaning up.",
+                    currentWriter == null ? null : currentWriter.path(),
+                    e);
+            abort();
+            throw e;
+        }
+    }
+
+    private void closeCurrentWriter() throws IOException {
+        if (currentWriter == null) {
+            return;
+        }
+
+        currentWriter.close();
+        closedWriters.add(currentWriter.abortExecutor());
+        if (currentWriter.committers() != null) {
+            committers.addAll(currentWriter.committers());
+        }
+
+        currentWriter = null;
+    }
+
+    public void abort() {
+        if (currentWriter != null) {
+            currentWriter.abort();
+        }
+        for (FileWriterAbortExecutor abortExecutor : closedWriters) {
+            abortExecutor.abort();
+        }
+    }
+
+    public List<TwoPhaseOutputStream.Committer> committers() {
+        return committers;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+
+        try {
+            closeCurrentWriter();
+        } catch (IOException e) {
+            LOG.warn(
+                    "Exception occurs when writing file {}. Cleaning up.", 
currentWriter.path(), e);
+            abort();
+            throw e;
+        } finally {
+            closed = true;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableSingleFileWriter.java
similarity index 55%
copy from paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
copy to 
paimon-core/src/main/java/org/apache/paimon/io/FormatTableSingleFileWriter.java
index c9bf104249..1e204f1f22 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableSingleFileWriter.java
@@ -19,63 +19,49 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.BundleFormatWriter;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SupportsDirectWrite;
-import org.apache.paimon.fs.AsyncPositionOutputStream;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.utils.IOUtils;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.function.Function;
+import java.util.List;
 
-/**
- * A {@link FileWriter} to produce a single file.
- *
- * @param <T> type of records to write.
- * @param <R> type of result to produce after writing a file.
- */
-public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> {
+/** Format table's writer to produce a single file. */
+public class FormatTableSingleFileWriter {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(SingleFileWriter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(FormatTableSingleFileWriter.class);
 
     protected final FileIO fileIO;
     protected final Path path;
-    private final Function<T, InternalRow> converter;
 
     private FormatWriter writer;
     private PositionOutputStream out;
+    private TwoPhaseOutputStream.Committer committer;
 
     protected long outputBytes;
-    private long recordCount;
     protected boolean closed;
 
-    public SingleFileWriter(
-            FileIO fileIO,
-            FormatWriterFactory factory,
-            Path path,
-            Function<T, InternalRow> converter,
-            String compression,
-            boolean asyncWrite) {
+    public FormatTableSingleFileWriter(
+            FileIO fileIO, FormatWriterFactory factory, Path path, String 
compression) {
         this.fileIO = fileIO;
         this.path = path;
-        this.converter = converter;
 
         try {
             if (factory instanceof SupportsDirectWrite) {
-                writer = ((SupportsDirectWrite) factory).create(fileIO, path, 
compression);
+                throw new UnsupportedOperationException("Does not support 
SupportsDirectWrite.");
             } else {
-                out = fileIO.newOutputStream(path, false);
-                if (asyncWrite) {
-                    out = new AsyncPositionOutputStream(out);
-                }
+                out = fileIO.newTwoPhaseOutputStream(path, false);
                 writer = factory.create(out, compression);
             }
         } catch (IOException e) {
@@ -88,7 +74,6 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             throw new UncheckedIOException(e);
         }
 
-        this.recordCount = 0;
         this.closed = false;
     }
 
@@ -96,80 +81,62 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
         return path;
     }
 
-    @Override
-    public void write(T record) throws IOException {
-        writeImpl(record);
-    }
-
-    public void writeBundle(BundleRecords bundle) throws IOException {
-        if (closed) {
-            throw new RuntimeException("Writer has already closed!");
-        }
-
-        try {
-            if (writer instanceof BundleFormatWriter) {
-                ((BundleFormatWriter) writer).writeBundle(bundle);
-            } else {
-                for (InternalRow row : bundle) {
-                    writer.addElement(row);
-                }
-            }
-            recordCount += bundle.rowCount();
-        } catch (Throwable e) {
-            LOG.warn("Exception occurs when writing file " + path + ". 
Cleaning up.", e);
-            abort();
-            throw e;
-        }
-    }
-
-    protected InternalRow writeImpl(T record) throws IOException {
+    public void write(InternalRow record) throws IOException {
         if (closed) {
             throw new RuntimeException("Writer has already closed!");
         }
 
         try {
-            InternalRow rowData = converter.apply(record);
-            writer.addElement(rowData);
-            recordCount++;
-            return rowData;
+            writer.addElement(record);
         } catch (Throwable e) {
-            LOG.warn("Exception occurs when writing file " + path + ". 
Cleaning up.", e);
+            LOG.warn("Exception occurs when writing file {}. Cleaning up.", 
path, e);
             abort();
             throw e;
         }
     }
 
-    @Override
-    public long recordCount() {
-        return recordCount;
-    }
-
     public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
         return writer.reachTargetSize(suggestedCheck, targetSize);
     }
 
-    @Override
     public void abort() {
         if (writer != null) {
             IOUtils.closeQuietly(writer);
             writer = null;
         }
         if (out != null) {
-            IOUtils.closeQuietly(out);
+            try {
+                committer = ((TwoPhaseOutputStream) out).closeForCommit();
+            } catch (Throwable e) {
+                LOG.warn("Exception occurs when close for commit out {}", 
committer, e);
+            }
             out = null;
         }
+        if (committer != null) {
+            try {
+                committer.discard();
+            } catch (Throwable e) {
+                LOG.warn("Exception occurs when close out {}", committer, e);
+            }
+        }
         fileIO.deleteQuietly(path);
     }
 
-    public AbortExecutor abortExecutor() {
+    public List<TwoPhaseOutputStream.Committer> committers() {
+        if (!closed) {
+            throw new RuntimeException("Writer should be closed before getting 
committer!");
+        }
+        return Lists.newArrayList(committer);
+    }
+
+    public FileWriterAbortExecutor abortExecutor() {
         if (!closed) {
             throw new RuntimeException("Writer should be closed!");
         }
 
-        return new AbortExecutor(fileIO, path);
+        return new FileWriterAbortExecutor(fileIO, path);
     }
 
-    @Override
     public void close() throws IOException {
         if (closed) {
             return;
@@ -187,7 +154,7 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             if (out != null) {
                 out.flush();
                 outputBytes = out.getPos();
-                out.close();
+                committer = ((TwoPhaseOutputStream) out).closeForCommit();
                 out = null;
             }
         } catch (IOException e) {
@@ -198,20 +165,4 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             closed = true;
         }
     }
-
-    /** Abort executor to just have reference of path instead of whole writer. 
*/
-    public static class AbortExecutor {
-
-        private final FileIO fileIO;
-        private final Path path;
-
-        private AbortExecutor(FileIO fileIO, Path path) {
-            this.fileIO = fileIO;
-            this.path = path;
-        }
-
-        public void abort() {
-            fileIO.deleteQuietly(path);
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
index 76ee44b731..7353ea1971 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
 import org.apache.paimon.utils.Preconditions;
 
 import org.slf4j.Logger;
@@ -42,7 +41,7 @@ public class RollingFileWriterImpl<T, R> implements 
RollingFileWriter<T, R> {
 
     private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
     private final long targetFileSize;
-    private final List<AbortExecutor> closedWriters;
+    private final List<FileWriterAbortExecutor> closedWriters;
     protected final List<R> results;
 
     private SingleFileWriter<T, R> currentWriter = null;
@@ -145,7 +144,7 @@ public class RollingFileWriterImpl<T, R> implements 
RollingFileWriter<T, R> {
         if (currentWriter != null) {
             currentWriter.abort();
         }
-        for (AbortExecutor abortExecutor : closedWriters) {
+        for (FileWriterAbortExecutor abortExecutor : closedWriters) {
             abortExecutor.abort();
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index c9bf104249..3c3b11d8f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -161,12 +161,12 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
         fileIO.deleteQuietly(path);
     }
 
-    public AbortExecutor abortExecutor() {
+    public FileWriterAbortExecutor abortExecutor() {
         if (!closed) {
             throw new RuntimeException("Writer should be closed!");
         }
 
-        return new AbortExecutor(fileIO, path);
+        return new FileWriterAbortExecutor(fileIO, path);
     }
 
     @Override
@@ -198,20 +198,4 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
             closed = true;
         }
     }
-
-    /** Abort executor to just have reference of path instead of whole writer. 
*/
-    public static class AbortExecutor {
-
-        private final FileIO fileIO;
-        private final Path path;
-
-        private AbortExecutor(FileIO fileIO, Path path) {
-            this.fileIO = fileIO;
-            this.path = path;
-        }
-
-        public void abort() {
-            fileIO.deleteQuietly(path);
-        }
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index d3b6881246..2963dbb04d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.format.FormatBatchWriteBuilder;
 import org.apache.paimon.table.format.FormatReadBuilder;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -257,7 +258,7 @@ public interface FormatTable extends Table {
 
     @Override
     default BatchWriteBuilder newBatchWriteBuilder() {
-        throw new UnsupportedOperationException();
+        return new FormatBatchWriteBuilder(this);
     }
 
     default RowType partitionType() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
new file mode 100644
index 0000000000..abb10f0535
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.WriteSelector;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** A builder to build {@link FormatBatchWriteBuilder}. */
+public class FormatBatchWriteBuilder implements BatchWriteBuilder {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FormatTable table;
+    protected final CoreOptions options;
+
+    public FormatBatchWriteBuilder(FormatTable table) {
+        this.table = table;
+        this.options = new CoreOptions(table.options());
+    }
+
+    @Override
+    public String tableName() {
+        return table.name();
+    }
+
+    @Override
+    public RowType rowType() {
+        return table.rowType();
+    }
+
+    @Override
+    public Optional<WriteSelector> newWriteSelector() {
+        return table.newBatchWriteBuilder().newWriteSelector();
+    }
+
+    @Override
+    public BatchTableWrite newWrite() {
+        return new FormatTableWrite(
+                table.fileIO(),
+                rowType(),
+                this.options,
+                table.partitionType(),
+                table.partitionKeys());
+    }
+
+    @Override
+    public BatchTableCommit newCommit() {
+        throw new UnsupportedOperationException("FormatTable does not support 
commit");
+    }
+
+    @Override
+    public BatchWriteBuilder withOverwrite(@Nullable Map<String, String> 
staticPartition) {
+        throw new UnsupportedOperationException("FormatTable does not support 
commit");
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
new file mode 100644
index 0000000000..3d6af1b227
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.format.FileFormat.fileFormat;
+
+/** File writer for format table. */
+public class FormatTableFileWriter {
+
+    private final FileIO fileIO;
+    private RowType rowType;
+    private RowType partitionType;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    protected final Map<BinaryRow, FormatTableRecordWriter> writers;
+    protected final CoreOptions options;
+
+    public FormatTableFileWriter(
+            FileIO fileIO, RowType rowType, CoreOptions options, RowType 
partitionType) {
+        this.fileIO = fileIO;
+        this.rowType = rowType;
+        this.partitionType = partitionType;
+        this.fileFormat = fileFormat(options);
+        this.writers = new HashMap<>();
+        this.options = options;
+        this.pathFactory =
+                new FileStorePathFactory(
+                        options.path(),
+                        partitionType,
+                        options.partitionDefaultName(),
+                        options.fileFormatString(),
+                        options.dataFilePrefix(),
+                        options.changelogFilePrefix(),
+                        options.legacyPartitionName(),
+                        options.fileSuffixIncludeCompression(),
+                        options.fileCompression(),
+                        options.dataFilePathDirectory(),
+                        null,
+                        false);
+    }
+
+    public void withWriteType(RowType writeType) {
+        this.rowType = writeType;
+    }
+
+    public void write(BinaryRow partition, InternalRow data) throws Exception {
+        FormatTableRecordWriter writer = writers.get(partition);
+        if (writer == null) {
+            writer = createWriter(partition.copy());
+            writers.put(partition.copy(), writer);
+        }
+        writer.write(data);
+    }
+
+    public void close() throws Exception {
+        writers.clear();
+    }
+
+    public List<CommitMessage> prepareCommit() throws Exception {
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (FormatTableRecordWriter writer : writers.values()) {
+            List<TwoPhaseOutputStream.Committer> commiters = 
writer.closeAndGetCommitters();
+            for (TwoPhaseOutputStream.Committer committer : commiters) {
+                TwoPhaseCommitMessage twoPhaseCommitMessage = new 
TwoPhaseCommitMessage(committer);
+                commitMessages.add(twoPhaseCommitMessage);
+            }
+        }
+        return commitMessages;
+    }
+
+    private FormatTableRecordWriter createWriter(BinaryRow partition) {
+        RowType writeRowType =
+                rowType.project(
+                        rowType.getFieldNames().stream()
+                                .filter(name -> 
!partitionType.getFieldNames().contains(name))
+                                .collect(Collectors.toList()));
+        return new FormatTableRecordWriter(
+                fileIO,
+                fileFormat,
+                options.targetFileSize(false),
+                pathFactory.createFormatTableDataFilePathFactory(
+                        partition, 
options.formatTablePartitionOnlyValueInPath()),
+                writeRowType,
+                options.fileCompression());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRecordWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRecordWriter.java
new file mode 100644
index 0000000000..b611f2805d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRecordWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FormatTableRollingFileWriter;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** record writer for format table. */
+public class FormatTableRecordWriter implements AutoCloseable {
+
+    private final FileIO fileIO;
+    private final DataFilePathFactory pathFactory;
+    private final RowType writeSchema;
+    private final String fileCompression;
+    private final FileFormat fileFormat;
+    private final long targetFileSize;
+    private FormatTableRollingFileWriter writer;
+
+    public FormatTableRecordWriter(
+            FileIO fileIO,
+            FileFormat fileFormat,
+            long targetFileSize,
+            DataFilePathFactory pathFactory,
+            RowType writeSchema,
+            String fileCompression) {
+        this.fileIO = fileIO;
+        this.pathFactory = pathFactory;
+        this.fileCompression = fileCompression;
+        this.writeSchema = writeSchema;
+        this.fileFormat = fileFormat;
+        this.targetFileSize = targetFileSize;
+    }
+
+    public void write(InternalRow data) throws Exception {
+        if (writer == null) {
+            writer = createRollingRowWriter();
+        }
+        writer.write(data);
+    }
+
+    public List<TwoPhaseOutputStream.Committer> closeAndGetCommitters() throws 
Exception {
+        List<TwoPhaseOutputStream.Committer> commits = new ArrayList<>();
+        if (writer != null) {
+            writer.close();
+            commits.addAll(writer.committers());
+            writer = null;
+        }
+        return commits;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (writer != null) {
+            writer.abort();
+            writer = null;
+        }
+    }
+
+    private FormatTableRollingFileWriter createRollingRowWriter() {
+        return new FormatTableRollingFileWriter(
+                fileIO, fileFormat, targetFileSize, writeSchema, pathFactory, 
fileCompression);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
new file mode 100644
index 0000000000..ff88c52d66
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.FormatTableRowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** {@link TableWrite} implementation for format table. */
+public class FormatTableWrite implements BatchTableWrite {
+
+    private RowType rowType;
+    private final FormatTableFileWriter write;
+    private final FormatTableRowPartitionKeyExtractor partitionKeyExtractor;
+
+    private final int[] notNullFieldIndex;
+    private final @Nullable DefaultValueRow defaultValueRow;
+
+    public FormatTableWrite(
+            FileIO fileIO,
+            RowType rowType,
+            CoreOptions options,
+            RowType partitionType,
+            List<String> partitionKeys) {
+        this.rowType = rowType;
+        this.write = new FormatTableFileWriter(fileIO, rowType, options, 
partitionType);
+        this.partitionKeyExtractor =
+                new FormatTableRowPartitionKeyExtractor(rowType, 
partitionKeys);
+        List<String> notNullColumnNames =
+                rowType.getFields().stream()
+                        .filter(field -> !field.type().isNullable())
+                        .map(DataField::name)
+                        .collect(Collectors.toList());
+        this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
+        this.defaultValueRow = DefaultValueRow.create(rowType);
+    }
+
+    @Override
+    public BatchTableWrite withWriteType(RowType writeType) {
+        write.withWriteType(writeType);
+        return this;
+    }
+
+    @Override
+    public BinaryRow getPartition(InternalRow row) {
+        return partitionKeyExtractor.partition(row);
+    }
+
+    @Override
+    public void write(InternalRow row) throws Exception {
+        // checkNullability
+        for (int idx : notNullFieldIndex) {
+            if (row.isNullAt(idx)) {
+                String columnName = rowType.getFields().get(idx).name();
+                throw new RuntimeException(
+                        String.format("Cannot write null to non-null 
column(%s)", columnName));
+            }
+        }
+        row = defaultValueRow == null ? row : defaultValueRow.replaceRow(row);
+        BinaryRow partition = partitionKeyExtractor.partition(row);
+        write.write(partition, row);
+    }
+
+    @Override
+    public List<CommitMessage> prepareCommit() throws Exception {
+        return write.prepareCommit();
+    }
+
+    public void commit(List<CommitMessage> commitMessages) throws Exception {
+        for (CommitMessage commitMessage : commitMessages) {
+            if (commitMessage instanceof TwoPhaseCommitMessage) {
+                TwoPhaseCommitMessage twoPhaseCommitMessage = 
(TwoPhaseCommitMessage) commitMessage;
+                twoPhaseCommitMessage.getCommitter().commit();
+            } else {
+                throw new RuntimeException(
+                        "Unsupported commit message type: " + 
commitMessage.getClass().getName());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        write.close();
+    }
+
+    @Override
+    public int getBucket(InternalRow row) {
+        return 0;
+    }
+
+    @Override
+    public TableWrite withMemoryPoolFactory(MemoryPoolFactory 
memoryPoolFactory) {
+        return this;
+    }
+
+    @Override
+    public TableWrite withIOManager(IOManager ioManager) {
+        return this;
+    }
+
+    @Override
+    public void write(InternalRow row, int bucket) throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void writeBundle(BinaryRow partition, int bucket, BundleRecords 
bundle)
+            throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void compact(BinaryRow partition, int bucket, boolean 
fullCompaction) throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TableWrite withMetricRegistry(MetricRegistry registry) {
+        return this;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/TwoPhaseCommitMessage.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/TwoPhaseCommitMessage.java
new file mode 100644
index 0000000000..cc43636d2f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/TwoPhaseCommitMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import javax.annotation.Nullable;
+
+/** {@link CommitMessage} implementation for format table. */
+public class TwoPhaseCommitMessage implements CommitMessage {
+    private TwoPhaseOutputStream.Committer committer;
+
+    public TwoPhaseCommitMessage(TwoPhaseOutputStream.Committer committer) {
+        this.committer = committer;
+    }
+
+    @Override
+    public BinaryRow partition() {
+        return null;
+    }
+
+    @Override
+    public int bucket() {
+        return 0;
+    }
+
+    @Override
+    public @Nullable Integer totalBuckets() {
+        return 0;
+    }
+
+    public TwoPhaseOutputStream.Committer getCommitter() {
+        return committer;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FormatTableRowPartitionKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FormatTableRowPartitionKeyExtractor.java
new file mode 100644
index 0000000000..45a4198a3f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FormatTableRowPartitionKeyExtractor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+
+/** A {@link PartitionKeyExtractor} to {@link InternalRow} for format table. */
+public class FormatTableRowPartitionKeyExtractor implements 
PartitionKeyExtractor<InternalRow> {
+
+    private final Projection partitionProjection;
+
+    public FormatTableRowPartitionKeyExtractor(RowType rowType, List<String> 
partitionKeys) {
+        partitionProjection = CodeGenUtils.newProjection(rowType, 
partitionKeys);
+    }
+
+    @Override
+    public BinaryRow partition(InternalRow record) {
+        return partitionProjection.apply(record);
+    }
+
+    @Override
+    public BinaryRow trimmedPrimaryKey(InternalRow record) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index cb718fc73a..6deb55ef0b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -173,9 +173,10 @@ public class FileStorePathFactory {
                 createExternalPathProvider(partition, bucket));
     }
 
-    public DataFilePathFactory createFormatTableDataFilePathFactory(BinaryRow 
partition) {
+    public DataFilePathFactory createFormatTableDataFilePathFactory(
+            BinaryRow partition, boolean onlyValue) {
         return new DataFilePathFactory(
-                partitionPath(partition),
+                partitionPath(partition, onlyValue),
                 formatIdentifier,
                 dataFilePrefix,
                 changelogFilePrefix,
@@ -184,6 +185,15 @@ public class FileStorePathFactory {
                 createExternalPartitionPathProvider(partition));
     }
 
+    private ExternalPathProvider createExternalPartitionPathProvider(
+            BinaryRow partition, boolean onlyValue) {
+        if (externalPaths == null || externalPaths.isEmpty()) {
+            return null;
+        }
+
+        return new ExternalPathProvider(externalPaths, 
partitionPath(partition, onlyValue));
+    }
+
     private ExternalPathProvider createExternalPartitionPathProvider(BinaryRow 
partition) {
         if (externalPaths == null || externalPaths.isEmpty()) {
             return null;
@@ -192,9 +202,9 @@ public class FileStorePathFactory {
         return new ExternalPathProvider(externalPaths, 
partitionPath(partition));
     }
 
-    public Path partitionPath(BinaryRow partition) {
+    private Path partitionPath(BinaryRow partition, boolean onlyValue) {
         Path relativeBucketPath = null;
-        String partitionPath = getPartitionString(partition);
+        String partitionPath = getPartitionString(partition, onlyValue);
         if (!partitionPath.isEmpty()) {
             relativeBucketPath = new Path(partitionPath);
         }
@@ -207,6 +217,10 @@ public class FileStorePathFactory {
         return relativeBucketPath != null ? new Path(root, relativeBucketPath) 
: root;
     }
 
+    public Path partitionPath(BinaryRow partition) {
+        return partitionPath(partition, false);
+    }
+
     @Nullable
     private ExternalPathProvider createExternalPathProvider(BinaryRow 
partition, int bucket) {
         if (externalPaths == null || externalPaths.isEmpty()) {
@@ -248,6 +262,14 @@ public class FileStorePathFactory {
                                 partition, "Partition row data is null. This 
is unexpected.")));
     }
 
+    public String getPartitionString(BinaryRow partition, boolean onlyValue) {
+        return PartitionPathUtils.generatePartitionPathUtil(
+                partitionComputer.generatePartValues(
+                        Preconditions.checkNotNull(
+                                partition, "Partition row data is null. This 
is unexpected.")),
+                onlyValue);
+    }
+
     // @TODO, need to be changed
     public List<Path> getHierarchicalPartitionPath(BinaryRow partition) {
         return PartitionPathUtils.generateHierarchicalPartitionPaths(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 9c9e46e98e..c8a540319d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -25,19 +25,9 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.format.FileFormatFactory;
-import org.apache.paimon.format.FormatWriter;
-import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.HadoopCompressionType;
-import org.apache.paimon.format.SupportsDirectWrite;
-import org.apache.paimon.format.csv.CsvFileFormatFactory;
-import org.apache.paimon.format.json.JsonFileFormatFactory;
-import org.apache.paimon.format.parquet.ParquetFileFormatFactory;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.ResolvingFileIO;
-import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
@@ -51,9 +41,11 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.format.FormatTableWrite;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.system.AllTableOptionsTable;
@@ -79,7 +71,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -620,27 +611,7 @@ public abstract class CatalogTestBase {
                                 dtPartitionValue,
                                 BinaryString.fromString(dt2PartitionValue));
             }
-            FormatWriterFactory factory =
-                    (buildFileFormatFactory(format)
-                                    .create(
-                                            new 
FileFormatFactory.FormatContext(
-                                                    new 
Options(table.options()), 1024, 1024)))
-                            
.createWriterFactory(getFormatTableWriteRowType(table));
-            Path partitionPath =
-                    new Path(
-                            String.format(
-                                    "%s/%s/%s",
-                                    table.location(), dtPartitionValue, 
dt2PartitionValue));
-            DataFilePathFactory dataFilePathFactory =
-                    new DataFilePathFactory(
-                            partitionPath,
-                            format,
-                            "data",
-                            "change",
-                            true,
-                            compressionType.value(),
-                            null);
-            write(factory, dataFilePathFactory.newPath(), 
compressionType.value(), datas);
+            writeAndCheckCommitFormatTable(table, datas, null);
             List<InternalRow> readAllData = read(table, null, null, null, 
null);
             assertThat(readAllData).containsExactlyInAnyOrder(datas);
             Map<String, String> partitionSpec = new HashMap<>();
@@ -654,7 +625,7 @@ public abstract class CatalogTestBase {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testFormatTableRead(boolean partitioned) throws Exception {
+    public void testFormatTableReadAndWrite(boolean partitioned) throws 
Exception {
         if (!supportsFormatTable()) {
             return;
         }
@@ -682,7 +653,7 @@ public abstract class CatalogTestBase {
             int[] projection = new int[] {1, 2};
             PredicateBuilder builder = new 
PredicateBuilder(table.rowType().project(projection));
             Predicate predicate = builder.greaterOrEqual(0, 10);
-            int size = 5;
+            int size = 2000;
             int checkSize = 3;
             InternalRow[] datas = new InternalRow[size];
             InternalRow[] checkDatas = new InternalRow[checkSize];
@@ -696,59 +667,21 @@ public abstract class CatalogTestBase {
             }
             InternalRow dataWithDiffPartition =
                     GenericRow.of(random.nextInt(), random.nextInt(), 11);
-            FormatWriterFactory factory =
-                    (buildFileFormatFactory(format)
-                                    .create(
-                                            new 
FileFormatFactory.FormatContext(
-                                                    new 
Options(table.options()), 1024, 1024)))
-                            
.createWriterFactory(getFormatTableWriteRowType(table));
             Map<String, String> partitionSpec = null;
+            int dataSize = size;
             if (partitioned) {
-                Path partitionPath =
-                        new Path(String.format("%s/%s", table.location(), 
"dt=" + partitionValue));
-                DataFilePathFactory dataFilePathFactory =
-                        new DataFilePathFactory(
-                                partitionPath,
-                                format,
-                                "data",
-                                "change",
-                                true,
-                                compressionType.value(),
-                                null);
-                Path diffPartitionPath =
-                        new Path(String.format("%s/%s", table.location(), 
"dt=" + 11));
-                DataFilePathFactory diffPartitionPathFactory =
-                        new DataFilePathFactory(
-                                diffPartitionPath,
-                                format,
-                                "data",
-                                "change",
-                                true,
-                                compressionType.value(),
-                                null);
-                write(factory, dataFilePathFactory.newPath(), 
compressionType.value(), datas);
-                write(
-                        factory,
-                        diffPartitionPathFactory.newPath(),
-                        compressionType.value(),
-                        dataWithDiffPartition);
+                writeAndCheckCommitFormatTable(table, datas, 
dataWithDiffPartition);
+                dataSize = size + 1;
                 partitionSpec = new HashMap<>();
                 partitionSpec.put("dt", "" + partitionValue);
             } else {
-                DataFilePathFactory dataFilePathFactory =
-                        new DataFilePathFactory(
-                                new Path(table.location()),
-                                format,
-                                "data",
-                                "change",
-                                true,
-                                compressionType.value(),
-                                null);
-                write(factory, dataFilePathFactory.newPath(), 
compressionType.value(), datas);
+                writeAndCheckCommitFormatTable(table, datas, null);
             }
             List<InternalRow> readFilterData =
                     read(table, predicate, projection, partitionSpec, null);
             assertThat(readFilterData).containsExactlyInAnyOrder(checkDatas);
+            List<InternalRow> readallData = read(table, null, null, null, 
null);
+            assertThat(readallData).hasSize(dataSize);
             int limit = checkSize - 1;
             List<InternalRow> readLimitData =
                     read(table, predicate, projection, partitionSpec, limit);
@@ -767,44 +700,19 @@ public abstract class CatalogTestBase {
         }
     }
 
-    protected RowType getFormatTableWriteRowType(Table table) {
-        return table.rowType()
-                .project(
-                        table.rowType().getFieldNames().stream()
-                                .filter(name -> 
!table.partitionKeys().contains(name))
-                                .collect(Collectors.toList()));
-    }
-
-    protected FileFormatFactory buildFileFormatFactory(String format) {
-        switch (format) {
-            case "csv":
-                return new CsvFileFormatFactory();
-            case "parquet":
-                return new ParquetFileFormatFactory();
-            case "json":
-                return new JsonFileFormatFactory();
-            default:
-                throw new IllegalArgumentException("Unsupported format: " + 
format);
-        }
-    }
-
-    protected void write(
-            FormatWriterFactory factory, Path file, String compression, 
InternalRow... rows)
-            throws IOException {
-        FormatWriter writer;
-        PositionOutputStream out = null;
-        if (factory instanceof SupportsDirectWrite) {
-            writer = ((SupportsDirectWrite) factory).create(fileIO, file, 
compression);
-        } else {
-            out = fileIO.newOutputStream(file, true);
-            writer = factory.create(out, compression);
-        }
-        for (InternalRow row : rows) {
-            writer.addElement(row);
-        }
-        writer.close();
-        if (out != null) {
-            out.close();
+    private void writeAndCheckCommitFormatTable(
+            Table table, InternalRow[] datas, InternalRow 
dataWithDiffPartition) throws Exception {
+        try (FormatTableWrite write = (FormatTableWrite) 
table.newBatchWriteBuilder().newWrite()) {
+            for (InternalRow row : datas) {
+                write.write(row);
+            }
+            if (dataWithDiffPartition != null) {
+                write.write(dataWithDiffPartition);
+            }
+            List<CommitMessage> committers = write.prepareCommit();
+            List<InternalRow> readData = read(table, null, null, null, null);
+            assertThat(readData).isEmpty();
+            write.commit(committers);
         }
     }
 

Reply via email to