This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d61b363a2 [core] file io support TwoPhaseOutputStream (#6287)
0d61b363a2 is described below

commit 0d61b363a2598b883e5ed3a18bf9d7ebd8d9691c
Author: jerry <lining....@alibaba-inc.com>
AuthorDate: Fri Sep 19 15:48:16 2025 +0800

    [core] file io support TwoPhaseOutputStream (#6287)
---
 .../src/main/java/org/apache/paimon/fs/FileIO.java |  20 ++
 .../org/apache/paimon/fs/MultiPartUploadStore.java |  50 +++
 .../fs/MultiPartUploadTwoPhaseOutputStream.java    | 222 ++++++++++++
 .../paimon/fs/RenamingTwoPhaseOutputStream.java    | 145 ++++++++
 .../org/apache/paimon/fs/TwoPhaseOutputStream.java |  54 +++
 .../fs/RenamingTwoPhaseOutputStreamTest.java       | 141 ++++++++
 .../main/java/org/apache/paimon/oss/OSSFileIO.java |  15 +
 .../org/apache/paimon/oss/OSSMultiPartUpload.java  |  72 ++++
 .../apache/paimon/oss/OssTwoPhaseOutputStream.java |  44 +++
 .../paimon/oss/OssTwoPhaseOutputStreamTest.java    | 382 +++++++++++++++++++++
 .../apache/paimon/s3/HadoopCompliantFileIO.java    |   4 +-
 .../main/java/org/apache/paimon/s3/S3FileIO.java   |  13 +
 .../org/apache/paimon/s3/S3MultiPartUpload.java    | 120 +++++++
 .../apache/paimon/s3/S3TwoPhaseOutputStream.java   |  44 +++
 .../paimon/s3/S3TwoPhaseOutputStreamTest.java      | 368 ++++++++++++++++++++
 15 files changed, 1692 insertions(+), 2 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index b7037693ee..5e5fe9fbfe 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -93,6 +93,26 @@ public interface FileIO extends Serializable, Closeable {
      */
     PositionOutputStream newOutputStream(Path path, boolean overwrite) throws 
IOException;
 
+    /**
+     * Opens a TwoPhaseOutputStream at the indicated Path for transactional 
writing.
+     *
+     * <p>This method creates a stream that supports transactional writing 
operations. The written
+     * data becomes visible only after calling commit on the returned 
committer from closeForCommit
+     * method.
+     *
+     * @param path the file target path
+     * @param overwrite if a file with this name already exists, then if true, 
the file will be
+     *     overwritten, and if false an error will be thrown.
+     * @return a TwoPhaseOutputStream that supports transactional writes
+     * @throws IOException Thrown, if the stream could not be opened because 
of an I/O, or because a
+     *     file already exists at that path and the write mode indicates to 
not overwrite the file.
+     * @throws UnsupportedOperationException if the filesystem does not 
support transactional writes
+     */
+    default TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean 
overwrite)
+            throws IOException {
+        return new RenamingTwoPhaseOutputStream(this, path, overwrite);
+    }
+
     /**
      * Return a file status object that represents the path.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
new file mode 100644
index 0000000000..2a18bed8a2
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/** MultiPartUploadStore. */
+public interface MultiPartUploadStore<T, C> {
+
+    default String pathToObject(Path hadoopPath) {
+        if (!hadoopPath.isAbsolute()) {
+            hadoopPath = new Path(workingDirectory(), hadoopPath);
+        }
+
+        return hadoopPath.toUri().getPath().substring(1);
+    }
+
+    Path workingDirectory();
+
+    String startMultiPartUpload(String objectName) throws IOException;
+
+    C completeMultipartUpload(
+            String objectName, String uploadId, List<T> partETags, long 
numBytesInParts)
+            throws IOException;
+
+    T uploadPart(String objectName, String uploadId, int partNumber, File 
file, long byteLength)
+            throws IOException;
+
+    void abortMultipartUpload(String objectName, String uploadId) throws 
IOException;
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
new file mode 100644
index 0000000000..362635b4df
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/** According to multipart upload to support two phase commit. */
+public abstract class MultiPartUploadTwoPhaseOutputStream<T, C> extends 
TwoPhaseOutputStream {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MultiPartUploadTwoPhaseOutputStream.class);
+
+    private final ByteArrayOutputStream buffer;
+    private final List<T> uploadedParts;
+    private final MultiPartUploadStore<T, C> multiPartUploadStore;
+    private final String objectName;
+
+    private String uploadId;
+    private long position;
+    private boolean closed = false;
+
+    public MultiPartUploadTwoPhaseOutputStream(
+            MultiPartUploadStore<T, C> multiPartUploadStore, 
org.apache.hadoop.fs.Path hadoopPath)
+            throws IOException {
+        this.multiPartUploadStore = multiPartUploadStore;
+        this.buffer = new ByteArrayOutputStream();
+        this.uploadedParts = new ArrayList<>();
+        this.objectName = multiPartUploadStore.pathToObject(hadoopPath);
+        this.uploadId = multiPartUploadStore.startMultiPartUpload(objectName);
+        this.position = 0;
+    }
+
+    public abstract long partSizeThreshold();
+
+    @Override
+    public long getPos() throws IOException {
+        return position;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (closed) {
+            throw new IOException("Stream is closed");
+        }
+        buffer.write(b);
+        position++;
+        if (buffer.size() >= partSizeThreshold()) {
+            uploadPart();
+        }
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (closed) {
+            throw new IOException("Stream is closed");
+        }
+        buffer.write(b, off, len);
+        position += len;
+        if (buffer.size() >= partSizeThreshold()) {
+            uploadPart();
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (closed) {
+            throw new IOException("Stream is closed");
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+            Committer committer = closeForCommit();
+            committer.commit();
+        }
+    }
+
+    @Override
+    public Committer closeForCommit() throws IOException {
+        if (closed) {
+            throw new IOException("Stream is already closed");
+        }
+        closed = true;
+
+        if (buffer.size() > 0) {
+            uploadPart();
+        }
+
+        return new MultiPartUploadCommitter(
+                multiPartUploadStore, uploadId, uploadedParts, objectName, 
position);
+    }
+
+    private void uploadPart() throws IOException {
+        if (buffer.size() == 0) {
+            return;
+        }
+
+        File tempFile = null;
+        try {
+            byte[] data = buffer.toByteArray();
+            tempFile = Files.createTempFile("multi-part-" + UUID.randomUUID(), 
".tmp").toFile();
+            try (FileOutputStream fos = new FileOutputStream(tempFile)) {
+                fos.write(data);
+                fos.flush();
+            }
+            T partETag =
+                    multiPartUploadStore.uploadPart(
+                            objectName, uploadId, uploadedParts.size() + 1, 
tempFile, data.length);
+            uploadedParts.add(partETag);
+            buffer.reset();
+        } catch (Exception e) {
+            throw new IOException(
+                    "Failed to upload part "
+                            + (uploadedParts.size() + 1)
+                            + " for upload ID: "
+                            + uploadId,
+                    e);
+        } finally {
+            if (tempFile != null && tempFile.exists()) {
+                if (!tempFile.delete()) {
+                    LOG.warn("Failed to delete temporary file: {}", 
tempFile.getAbsolutePath());
+                }
+            }
+        }
+    }
+
+    private static class MultiPartUploadCommitter<T, C> implements Committer {
+
+        private final MultiPartUploadStore<T, C> multiPartUploadStore;
+        private final String uploadId;
+        private final String objectName;
+        private final List<T> uploadedParts;
+        private final long byteLength;
+        private boolean committed = false;
+        private boolean discarded = false;
+
+        public MultiPartUploadCommitter(
+                MultiPartUploadStore<T, C> multiPartUploadStore,
+                String uploadId,
+                List<T> uploadedParts,
+                String objectName,
+                long byteLength) {
+            this.multiPartUploadStore = multiPartUploadStore;
+            this.uploadId = uploadId;
+            this.objectName = objectName;
+            this.uploadedParts = new ArrayList<>(uploadedParts);
+            this.byteLength = byteLength;
+        }
+
+        @Override
+        public void commit() throws IOException {
+            if (committed) {
+                return;
+            }
+            if (discarded) {
+                throw new IOException("Cannot commit: committer has been 
discarded");
+            }
+
+            try {
+                multiPartUploadStore.completeMultipartUpload(
+                        objectName, uploadId, uploadedParts, byteLength);
+                committed = true;
+                LOG.info(
+                        "Successfully committed multipart upload with ID: {} 
for objectName: {}",
+                        uploadId,
+                        objectName);
+            } catch (Exception e) {
+                throw new IOException("Failed to commit multipart upload with 
ID: " + uploadId, e);
+            }
+        }
+
+        @Override
+        public void discard() throws IOException {
+            if (discarded) {
+                return;
+            }
+
+            try {
+                multiPartUploadStore.abortMultipartUpload(objectName, 
uploadId);
+                discarded = true;
+                LOG.info(
+                        "Successfully discarded multipart upload with ID: {} 
for objectName: {}",
+                        uploadId,
+                        objectName);
+            } catch (Exception e) {
+                LOG.warn("Failed to discard multipart upload with ID: {}", 
uploadId, e);
+            }
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
new file mode 100644
index 0000000000..0da827d920
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.annotation.Public;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * A {@link TwoPhaseOutputStream} implementation that writes to a temporary 
file and commits by
+ * renaming to the target path. This follows HDFS-style commit semantics.
+ */
+@Public
+public class RenamingTwoPhaseOutputStream extends TwoPhaseOutputStream {
+
+    private final FileIO fileIO;
+    private final Path targetPath;
+    private final Path tempPath;
+    private final PositionOutputStream tempOutputStream;
+
+    public RenamingTwoPhaseOutputStream(FileIO fileIO, Path targetPath, 
boolean overwrite)
+            throws IOException {
+        if (!overwrite && fileIO.exists(targetPath)) {
+            throw new IOException("File " + targetPath + " already exists.");
+        }
+        this.fileIO = fileIO;
+        this.targetPath = targetPath;
+        this.tempPath = generateTempPath(targetPath);
+
+        // Create temporary file
+        this.tempOutputStream = fileIO.newOutputStream(tempPath, overwrite);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        tempOutputStream.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        tempOutputStream.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        tempOutputStream.write(b, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        tempOutputStream.flush();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return tempOutputStream.getPos();
+    }
+
+    @Override
+    public void close() throws IOException {
+        tempOutputStream.close();
+    }
+
+    @Override
+    public Committer closeForCommit() throws IOException {
+        close();
+        return new TempFileCommitter(fileIO, tempPath, targetPath);
+    }
+
+    /**
+     * Generate a temporary file path based on the target path. The temp file 
will be in the same
+     * directory as the target with a unique suffix.
+     */
+    private Path generateTempPath(Path targetPath) {
+        String tempFileName = ".tmp." + UUID.randomUUID();
+        return new Path(targetPath.getParent(), tempFileName);
+    }
+
+    /** Committer implementation that renames temporary file to target path. */
+    private static class TempFileCommitter implements Committer {
+
+        private final FileIO fileIO;
+        private final Path tempPath;
+        private final Path targetPath;
+        private boolean committed = false;
+        private boolean discarded = false;
+
+        public TempFileCommitter(FileIO fileIO, Path tempPath, Path 
targetPath) {
+            this.fileIO = fileIO;
+            this.tempPath = tempPath;
+            this.targetPath = targetPath;
+        }
+
+        @Override
+        public void commit() throws IOException {
+            if (committed || discarded) {
+                throw new IOException("Committer has already been used");
+            }
+
+            try {
+                Path parentDir = targetPath.getParent();
+                if (parentDir != null && !fileIO.exists(parentDir)) {
+                    fileIO.mkdirs(parentDir);
+                }
+
+                if (!fileIO.rename(tempPath, targetPath)) {
+                    throw new IOException("Failed to rename " + tempPath + " 
to " + targetPath);
+                }
+
+                committed = true;
+
+            } catch (IOException e) {
+                // Clean up temp file on failure
+                fileIO.deleteQuietly(tempPath);
+                throw new IOException(
+                        "Failed to commit temporary file " + tempPath + " to " 
+ targetPath, e);
+            }
+        }
+
+        @Override
+        public void discard() {
+            if (!committed && !discarded) {
+                fileIO.deleteQuietly(tempPath);
+                discarded = true;
+            }
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
new file mode 100644
index 0000000000..401486e91b
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.IOException;
+
+/** TwoPhaseOutputStream provides a way to write to a file and get a committer 
that can commit. */
+public abstract class TwoPhaseOutputStream extends PositionOutputStream {
+    /**
+     * Closes the stream for writing and returns a committer that can be used 
to make the written
+     * data visible.
+     *
+     * <p>After calling this method, the stream should not be used for writing 
anymore. The returned
+     * committer can be used to commit the data or discard it.
+     *
+     * @return A committer that can be used to commit the data
+     * @throws IOException if an I/O error occurs during closing
+     */
+    public abstract Committer closeForCommit() throws IOException;
+
+    /** A committer interface that can commit or discard the written data. */
+    public interface Committer {
+
+        /**
+         * Commits the written data, making it visible.
+         *
+         * @throws IOException if an I/O error occurs during commit
+         */
+        void commit() throws IOException;
+
+        /**
+         * Discards the written data, cleaning up any temporary files or 
resources.
+         *
+         * @throws IOException if an I/O error occurs during discard
+         */
+        void discard() throws IOException;
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java
new file mode 100644
index 0000000000..a68ccb8700
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStreamTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link RenamingTwoPhaseOutputStream}. */
+public class RenamingTwoPhaseOutputStreamTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private FileIO fileIO;
+    private Path targetPath;
+
+    @BeforeEach
+    void setup() {
+        fileIO = new LocalFileIO();
+        targetPath = new Path(tempDir.resolve("target-file.txt").toString());
+    }
+
+    @Test
+    void testSuccessfulCommit() throws IOException {
+        RenamingTwoPhaseOutputStream stream =
+                new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+        // Write some data
+        String testData = "Hello, World!";
+        stream.write(testData.getBytes());
+
+        // Close for commit
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // Target file should not exist yet
+        assertThat(fileIO.exists(targetPath)).isFalse();
+
+        // Commit the file
+        committer.commit();
+
+        // Now target file should exist with correct content
+        assertThat(fileIO.exists(targetPath)).isTrue();
+
+        // Read and verify content
+        byte[] content = Files.readAllBytes(Paths.get(targetPath.toString()));
+        assertThat(new String(content)).isEqualTo(testData);
+    }
+
+    @Test
+    void testDiscard() throws IOException {
+        RenamingTwoPhaseOutputStream stream =
+                new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+        // Write some data
+        stream.write("Some data".getBytes());
+
+        // Close for commit
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // Discard instead of commit
+        committer.discard();
+
+        // Target file should not exist
+        assertThat(fileIO.exists(targetPath)).isFalse();
+    }
+
+    @Test
+    void testCloseWithoutCommit() throws IOException {
+        RenamingTwoPhaseOutputStream stream =
+                new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+        // Write some data
+        stream.write("Some data".getBytes());
+
+        // Just close (not closeForCommit)
+        stream.close();
+
+        // Target file should not exist (temp file cleaned up)
+        assertThat(fileIO.exists(targetPath)).isFalse();
+    }
+
+    @Test
+    void testDoubleCommitThrows() throws IOException {
+        RenamingTwoPhaseOutputStream stream =
+                new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+        stream.write("data".getBytes());
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // First commit should succeed
+        committer.commit();
+
+        // Second commit should throw
+        assertThatThrownBy(committer::commit).isInstanceOf(IOException.class);
+    }
+
+    @Test
+    void testPositionTracking() throws IOException {
+        RenamingTwoPhaseOutputStream stream =
+                new RenamingTwoPhaseOutputStream(fileIO, targetPath, false);
+
+        assertThat(stream.getPos()).isEqualTo(0);
+
+        stream.write("Hello".getBytes());
+        assertThat(stream.getPos()).isEqualTo(5);
+
+        stream.write(" World!".getBytes());
+        assertThat(stream.getPos()).isEqualTo(12);
+
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+        committer.commit();
+
+        // Verify final content
+        byte[] content = Files.readAllBytes(Paths.get(targetPath.toString()));
+        assertThat(new String(content)).isEqualTo("Hello World!");
+    }
+}
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index 4c457598e7..7e9d8fc68a 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -20,6 +20,8 @@ package org.apache.paimon.oss;
 
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.utils.IOUtils;
 
@@ -106,6 +108,19 @@ public class OSSFileIO extends HadoopCompliantFileIO {
         }
     }
 
+    @Override
+    public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean 
overwrite)
+            throws IOException {
+        if (!overwrite && this.exists(path)) {
+            throw new IOException("File " + path + " already exists.");
+        }
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        FileSystem fs = getFileSystem(hadoopPath);
+        return new OssTwoPhaseOutputStream(
+                new 
OSSMultiPartUpload((org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem) fs),
+                hadoopPath);
+    }
+
     public Options hadoopOptions() {
         return hadoopOptions;
     }
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
new file mode 100644
index 0000000000..6ea486f683
--- /dev/null
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.oss;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
+import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/** Provides the multipart upload by Aliyun OSS. */
+public class OSSMultiPartUpload
+        implements MultiPartUploadStore<PartETag, 
CompleteMultipartUploadResult> {
+
+    private AliyunOSSFileSystem fs;
+    private AliyunOSSFileSystemStore store;
+
+    public OSSMultiPartUpload(AliyunOSSFileSystem fs) {
+        this.fs = fs;
+        this.store = fs.getStore();
+    }
+
+    @Override
+    public Path workingDirectory() {
+        return fs.getWorkingDirectory();
+    }
+
+    @Override
+    public String startMultiPartUpload(String objectName) throws IOException {
+        return store.getUploadId(objectName);
+    }
+
+    @Override
+    public CompleteMultipartUploadResult completeMultipartUpload(
+            String objectName, String uploadId, List<PartETag> partETags, long 
numBytesInParts) {
+        return store.completeMultipartUpload(objectName, uploadId, partETags);
+    }
+
+    @Override
+    public PartETag uploadPart(
+            String objectName, String uploadId, int partNumber, File file, 
long byteLength)
+            throws IOException {
+        return store.uploadPart(file, objectName, uploadId, partNumber);
+    }
+
+    @Override
+    public void abortMultipartUpload(String objectName, String uploadId) {
+        store.abortMultipartUpload(objectName, uploadId);
+    }
+}
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
new file mode 100644
index 0000000000..cce331d8d0
--- /dev/null
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.oss;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+
+import java.io.IOException;
+
+/** OSS implementation of TwoPhaseOutputStream using multipart upload. */
+public class OssTwoPhaseOutputStream
+        extends MultiPartUploadTwoPhaseOutputStream<PartETag, 
CompleteMultipartUploadResult> {
+
+    public OssTwoPhaseOutputStream(
+            MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> 
multiPartUploadStore,
+            org.apache.hadoop.fs.Path hadoopPath)
+            throws IOException {
+        super(multiPartUploadStore, hadoopPath);
+    }
+
+    @Override
+    public long partSizeThreshold() {
+        return 10L << 20;
+    }
+}
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java
 
b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java
new file mode 100644
index 0000000000..ff71b571dc
--- /dev/null
+++ 
b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OssTwoPhaseOutputStreamTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.oss;
+
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.PartETag;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link OssTwoPhaseOutputStream}. */
+public class OssTwoPhaseOutputStreamTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private OssTwoPhaseOutputStream stream;
+    private MockOSSMultiPartUpload mockAccessor;
+    private org.apache.hadoop.fs.Path hadoopPath;
+    private File targetFile;
+
+    @BeforeEach
+    void setup() throws IOException {
+        hadoopPath = new org.apache.hadoop.fs.Path("/test/file.parquet");
+        targetFile = tempDir.resolve("target-file.parquet").toFile();
+        targetFile.getParentFile().mkdirs();
+
+        mockAccessor = new MockOSSMultiPartUpload(targetFile);
+    }
+
+    private OssTwoPhaseOutputStream createStream() throws IOException {
+        return new OssTwoPhaseOutputStream(mockAccessor, hadoopPath);
+    }
+
+    @Test
+    void testLargeDataMultipleParts() throws IOException {
+        stream = createStream();
+
+        // Write data larger than MIN_PART_SIZE to trigger automatic part 
upload,
+        // plus some extra data to ensure there's remaining data for final 
upload
+        byte[] largeData = new byte[120 * 1024 * 1024]; // 120MB - will 
trigger first upload
+        for (int i = 0; i < largeData.length; i++) {
+            largeData[i] = (byte) (i % 256);
+        }
+
+        stream.write(largeData);
+
+        // Write additional data that will remain in buffer for final upload
+        byte[] extraData = "Additional data for final part".getBytes();
+        stream.write(extraData);
+
+        assertThat(stream.getPos()).isEqualTo(largeData.length + 
extraData.length);
+
+        // Should have triggered automatic part upload
+        assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+        assertThat(mockAccessor.uploadPartCalls).isEqualTo(1); // One part 
uploaded automatically
+
+        // Close for commit (uploads remaining data)
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // Should have uploaded the remaining data as a final part
+        assertThat(mockAccessor.uploadPartCalls).isEqualTo(2); // Initial + 
final part
+
+        // Commit
+        committer.commit();
+
+        assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+
+        // Verify target file contains all the data
+        assertThat(targetFile.exists()).isTrue();
+        byte[] writtenContent = Files.readAllBytes(targetFile.toPath());
+
+        // Combine expected data
+        byte[] expectedData = new byte[largeData.length + extraData.length];
+        System.arraycopy(largeData, 0, expectedData, 0, largeData.length);
+        System.arraycopy(extraData, 0, expectedData, largeData.length, 
extraData.length);
+
+        assertThat(writtenContent).isEqualTo(expectedData);
+    }
+
+    @Test
+    void testDiscard() throws IOException {
+        stream = createStream();
+
+        stream.write("Some data".getBytes());
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // Discard instead of commit
+        committer.discard();
+
+        // Verify abort was called, not complete
+        assertThat(mockAccessor.abortMultipartUploadCalled).isTrue();
+        assertThat(mockAccessor.completeMultipartUploadCalled).isFalse();
+
+        // Target file should not exist
+        assertThat(targetFile.exists()).isFalse();
+    }
+
+    @Test
+    void testCommitFailure() throws IOException {
+        stream = createStream();
+        mockAccessor.completeMultipartUploadShouldFail = true;
+
+        stream.write("data".getBytes());
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        assertThatThrownBy(committer::commit)
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Failed to commit multipart upload");
+
+        // Target file should not exist on failed commit
+        assertThat(targetFile.exists()).isFalse();
+    }
+
+    @Test
+    void testUploadPartFailure() throws IOException {
+        stream = createStream();
+        mockAccessor.uploadPartShouldFail = true;
+
+        // Write data and then close to trigger uploadPart during 
closeForCommit
+        stream.write("test data".getBytes());
+
+        assertThatThrownBy(() -> stream.closeForCommit())
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Failed to upload part");
+    }
+
+    @Test
+    void testPositionTracking() throws IOException {
+        stream = createStream();
+
+        assertThat(stream.getPos()).isEqualTo(0);
+
+        stream.write("Hello".getBytes());
+        assertThat(stream.getPos()).isEqualTo(5);
+
+        stream.write(" OSS".getBytes());
+        assertThat(stream.getPos()).isEqualTo(9);
+
+        stream.write(" World!".getBytes());
+        assertThat(stream.getPos()).isEqualTo(16);
+
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+        committer.commit();
+
+        assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+
+        // Verify final content
+        String writtenContent = new 
String(Files.readAllBytes(targetFile.toPath()));
+        assertThat(writtenContent).isEqualTo("Hello OSS World!");
+    }
+
+    @Test
+    void testCommitAfterDiscard() throws IOException {
+        stream = createStream();
+        stream.write("data".getBytes());
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        committer.discard();
+
+        assertThatThrownBy(committer::commit)
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Cannot commit: committer has been 
discarded");
+
+        // Target file should not exist
+        assertThat(targetFile.exists()).isFalse();
+    }
+
+    /**
+     * Mock implementation that actually uses local files to simulate OSS 
multipart upload behavior.
+     * Extends OSSAccessor but overrides all methods to avoid initialization 
issues.
+     */
+    private static class MockOSSMultiPartUpload extends OSSMultiPartUpload {
+
+        boolean startMultipartUploadCalled = false;
+        int uploadPartCalls = 0;
+        boolean completeMultipartUploadCalled = false;
+        int completeMultipartUploadCallCount = 0;
+        boolean abortMultipartUploadCalled = false;
+
+        boolean uploadPartShouldFail = false;
+        boolean completeMultipartUploadShouldFail = false;
+
+        private final String mockUploadId = "mock-upload-" + UUID.randomUUID();
+        private final List<File> tempPartFiles = new ArrayList<>();
+        private final File targetFile;
+
+        @SuppressWarnings("unused")
+        public MockOSSMultiPartUpload(File targetFile) {
+            super(createStubFileSystem()); // Create minimal stub to avoid 
null pointer
+            this.targetFile = targetFile;
+        }
+
+        private static org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem 
createStubFileSystem() {
+            // Create a minimal stub to avoid NullPointerException during 
initialization
+            return new StubAliyunOSSFileSystem();
+        }
+
+        @Override
+        public String pathToObject(org.apache.hadoop.fs.Path hadoopPath) {
+            return hadoopPath.toUri().getPath().substring(1);
+        }
+
+        @Override
+        public String startMultiPartUpload(String objectName) {
+            startMultipartUploadCalled = true;
+            return mockUploadId;
+        }
+
+        @Override
+        public PartETag uploadPart(
+                String objectName, String uploadId, int partNumber, File file, 
long byteLength)
+                throws IOException {
+            uploadPartCalls++;
+
+            if (uploadPartShouldFail) {
+                throw new IOException("Mock upload part failure");
+            }
+
+            // Verify file exists and has content
+            if (!file.exists() || file.length() == 0) {
+                throw new IOException("Invalid file for upload: " + file);
+            }
+
+            // Store the part file in a temporary location (simulating storing 
in OSS)
+            File partFile =
+                    Files.createTempFile("mock-oss-part-" + partNumber + "-", 
".tmp").toFile();
+            Files.copy(file.toPath(), partFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+            tempPartFiles.add(partFile);
+
+            MockPartETag mockPartETag = new MockPartETag(partNumber, 
"mock-etag-" + partNumber);
+            return mockPartETag;
+        }
+
+        @Override
+        public CompleteMultipartUploadResult completeMultipartUpload(
+                String objectName,
+                String uploadId,
+                List<PartETag> partETags,
+                long numBytesInParts) {
+            completeMultipartUploadCalled = true;
+            completeMultipartUploadCallCount++;
+
+            if (completeMultipartUploadShouldFail) {
+                throw new RuntimeException("Mock complete multipart upload 
failure");
+            }
+
+            // Simulate combining all parts into the final target file
+            try {
+                try (FileOutputStream fos = new FileOutputStream(targetFile)) {
+                    for (File partFile : tempPartFiles) {
+                        try (FileInputStream fis = new 
FileInputStream(partFile)) {
+                            byte[] buffer = new byte[8192];
+                            int bytesRead;
+                            while ((bytesRead = fis.read(buffer)) != -1) {
+                                fos.write(buffer, 0, bytesRead);
+                            }
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to complete multipart 
upload", e);
+            }
+
+            // Clean up temp part files
+            for (File partFile : tempPartFiles) {
+                partFile.delete();
+            }
+            tempPartFiles.clear();
+
+            return new MockCompleteMultipartUploadResult(objectName, 
"mock-final-etag");
+        }
+
+        @Override
+        public void abortMultipartUpload(String objectName, String uploadId) {
+            abortMultipartUploadCalled = true;
+
+            // Clean up temp part files on abort
+            for (File partFile : tempPartFiles) {
+                partFile.delete();
+            }
+            tempPartFiles.clear();
+
+            // Ensure target file doesn't exist
+            if (targetFile.exists()) {
+                targetFile.delete();
+            }
+        }
+    }
+
+    /** Mock implementation of PartETag. */
+    private static class MockPartETag extends PartETag {
+        private final int partNumber;
+        private final String eTag;
+
+        public MockPartETag(int partNumber, String eTag) {
+            super(partNumber, eTag);
+            this.partNumber = partNumber;
+            this.eTag = eTag;
+        }
+
+        @Override
+        public int getPartNumber() {
+            return partNumber;
+        }
+
+        @Override
+        public String getETag() {
+            return eTag;
+        }
+    }
+
+    /** Mock implementation of CompleteMultipartUploadResult. */
+    private static class MockCompleteMultipartUploadResult extends 
CompleteMultipartUploadResult {
+        private final String key;
+        private final String eTag;
+
+        public MockCompleteMultipartUploadResult(String key, String eTag) {
+            this.key = key;
+            this.eTag = eTag;
+        }
+
+        @Override
+        public String getKey() {
+            return key;
+        }
+
+        @Override
+        public String getETag() {
+            return eTag;
+        }
+    }
+
+    /**
+     * Minimal stub implementation to avoid NullPointerException during 
OSSAccessor initialization.
+     */
+    private static class StubAliyunOSSFileSystem
+            extends org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem {
+        private final StubAliyunOSSFileSystemStore stubStore = new 
StubAliyunOSSFileSystemStore();
+
+        @Override
+        public org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore 
getStore() {
+            return stubStore;
+        }
+    }
+
+    /** Minimal stub implementation for the store. */
+    private static class StubAliyunOSSFileSystemStore
+            extends org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore {
+        // Empty stub - we override all methods in MockOSSAccessor anyway
+    }
+}
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
index 9238251033..a662e8a075 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
@@ -123,11 +123,11 @@ public abstract class HadoopCompliantFileIO implements 
FileIO {
         return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
     }
 
-    private org.apache.hadoop.fs.Path path(Path path) {
+    protected org.apache.hadoop.fs.Path path(Path path) {
         return new org.apache.hadoop.fs.Path(path.toUri());
     }
 
-    private FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws 
IOException {
+    protected FileSystem getFileSystem(org.apache.hadoop.fs.Path path) throws 
IOException {
         if (fsMap == null) {
             synchronized (this) {
                 if (fsMap == null) {
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
index a58aa18eac..65017da573 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java
@@ -20,6 +20,8 @@ package org.apache.paimon.s3;
 
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.options.Options;
 
 import org.apache.hadoop.conf.Configuration;
@@ -71,6 +73,17 @@ public class S3FileIO extends HadoopCompliantFileIO {
         this.hadoopOptions = 
mirrorCertainHadoopConfig(loadHadoopConfigFromContext(context));
     }
 
+    @Override
+    public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean 
overwrite)
+            throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath);
+        if (!overwrite && this.exists(path)) {
+            throw new IOException("File " + path + " already exists.");
+        }
+        return new S3TwoPhaseOutputStream(new S3MultiPartUpload(fs, 
fs.getConf()), hadoopPath);
+    }
+
     // add additional config entries from the IO config to the Hadoop config
     private Options loadHadoopConfigFromContext(CatalogContext context) {
         Options hadoopConfig = new Options();
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
new file mode 100644
index 0000000000..4fd3590027
--- /dev/null
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.s3;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Provides the multipart upload by Amazon S3. */
+public class S3MultiPartUpload
+        implements MultiPartUploadStore<PartETag, 
CompleteMultipartUploadResult> {
+
+    private final S3AFileSystem s3a;
+
+    private final InternalWriteOperationHelper s3accessHelper;
+
+    public S3MultiPartUpload(S3AFileSystem s3a, Configuration conf) {
+        checkNotNull(s3a);
+        this.s3accessHelper =
+                new InternalWriteOperationHelper(
+                        s3a,
+                        checkNotNull(conf),
+                        s3a.createStoreContext().getInstrumentation(),
+                        s3a.getAuditSpanSource(),
+                        s3a.getActiveAuditSpan());
+        this.s3a = s3a;
+    }
+
+    @Override
+    public Path workingDirectory() {
+        return s3a.getWorkingDirectory();
+    }
+
+    @Override
+    public String startMultiPartUpload(String objectName) throws IOException {
+        return s3accessHelper.initiateMultiPartUpload(objectName);
+    }
+
+    @Override
+    public CompleteMultipartUploadResult completeMultipartUpload(
+            String objectName, String uploadId, List<PartETag> partETags, long 
numBytesInParts)
+            throws IOException {
+        return s3accessHelper.completeMPUwithRetries(
+                objectName, uploadId, partETags, numBytesInParts, new 
AtomicInteger(0));
+    }
+
+    @Override
+    public PartETag uploadPart(
+            String objectName, String uploadId, int partNumber, File file, 
long byteLength)
+            throws IOException {
+        final UploadPartRequest uploadRequest =
+                s3accessHelper.newUploadPartRequest(
+                        objectName,
+                        uploadId,
+                        partNumber,
+                        checkedDownCast(byteLength),
+                        null,
+                        file,
+                        0L);
+        return s3accessHelper.uploadPart(uploadRequest).getPartETag();
+    }
+
+    @Override
+    public void abortMultipartUpload(String destKey, String uploadId) throws 
IOException {
+        s3accessHelper.abortMultipartUpload(destKey, uploadId, false, null);
+    }
+
+    private static final class InternalWriteOperationHelper extends 
WriteOperationHelper {
+
+        InternalWriteOperationHelper(
+                S3AFileSystem owner,
+                Configuration conf,
+                S3AStatisticsContext statisticsContext,
+                AuditSpanSource auditSpanSource,
+                AuditSpan auditSpan) {
+            super(owner, conf, statisticsContext, auditSpanSource, auditSpan);
+        }
+    }
+
+    private static int checkedDownCast(long value) {
+        int downCast = (int) value;
+        if (downCast != value) {
+            throw new IllegalArgumentException(
+                    "Cannot downcast long value " + value + " to integer.");
+        }
+        return downCast;
+    }
+}
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
new file mode 100644
index 0000000000..d66b134d5c
--- /dev/null
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.s3;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+
+import java.io.IOException;
+
+/** S3 implementation of TwoPhaseOutputStream using multipart upload. */
+public class S3TwoPhaseOutputStream
+        extends MultiPartUploadTwoPhaseOutputStream<PartETag, 
CompleteMultipartUploadResult> {
+
+    public S3TwoPhaseOutputStream(
+            MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> 
multiPartUploadStore,
+            org.apache.hadoop.fs.Path hadoopPath)
+            throws IOException {
+        super(multiPartUploadStore, hadoopPath);
+    }
+
+    @Override
+    public long partSizeThreshold() {
+        return 5L << 20;
+    }
+}
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java
 
b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java
new file mode 100644
index 0000000000..3712916e00
--- /dev/null
+++ 
b/paimon-filesystems/paimon-s3-impl/src/test/java/org/apache/paimon/s3/S3TwoPhaseOutputStreamTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.s3;
+
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link S3TwoPhaseOutputStream}. */
+class S3TwoPhaseOutputStreamTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private MockS3MultiPartUpload mockAccessor;
+    private S3TwoPhaseOutputStream stream;
+    private File targetFile;
+    private org.apache.hadoop.fs.Path hadoopPath;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        hadoopPath = new org.apache.hadoop.fs.Path("/test/file.parquet");
+        targetFile = tempDir.resolve("target-file.parquet").toFile();
+        targetFile.getParentFile().mkdirs();
+
+        mockAccessor = new MockS3MultiPartUpload(targetFile);
+    }
+
+    private S3TwoPhaseOutputStream createStream() throws IOException {
+        return new S3TwoPhaseOutputStream(mockAccessor, hadoopPath);
+    }
+
+    @Test
+    void testLargeDataMultipleParts() throws IOException {
+        stream = createStream();
+
+        // Create data larger than MIN_PART_SIZE (5MB) to trigger multiple 
part uploads
+        byte[] bigData = new byte[6 * 1024 * 1024]; // 6MB
+        for (int i = 0; i < bigData.length; i++) {
+            bigData[i] = (byte) (i % 256);
+        }
+
+        // Write the large data
+        stream.write(bigData);
+
+        // This should trigger automatic part uploads during write
+        assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+        assertThat(mockAccessor.uploadPartCalls).isGreaterThan(0);
+
+        // Add more data to ensure closeForCommit creates another part
+        String additionalData = "Additional data for final part";
+        stream.write(additionalData.getBytes());
+
+        assertThat(stream.getPos()).isEqualTo(bigData.length + 
additionalData.length());
+
+        // Close for commit
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // Should have uploaded multiple parts
+        assertThat(mockAccessor.uploadPartCalls).isGreaterThan(1);
+        assertThat(mockAccessor.completeMultipartUploadCalled).isFalse();
+
+        // Target file should not exist yet
+        assertThat(targetFile.exists()).isFalse();
+
+        // Commit
+        committer.commit();
+
+        // Verify complete multipart upload was called
+        assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+        assertThat(mockAccessor.abortMultipartUploadCalled).isFalse();
+
+        // Target file should now exist with correct content
+        assertThat(targetFile.exists()).isTrue();
+
+        // Verify the content is correct by reading it back
+        byte[] writtenContent = Files.readAllBytes(targetFile.toPath());
+        assertThat(writtenContent).hasSize(bigData.length + 
additionalData.length());
+
+        // Check first part (bigData)
+        for (int i = 0; i < bigData.length; i++) {
+            assertThat(writtenContent[i]).isEqualTo(bigData[i]);
+        }
+
+        // Check additional data at the end
+        byte[] additionalBytes = additionalData.getBytes();
+        for (int i = 0; i < additionalBytes.length; i++) {
+            assertThat(writtenContent[bigData.length + 
i]).isEqualTo(additionalBytes[i]);
+        }
+    }
+
+    @Test
+    void testDiscard() throws IOException {
+        stream = createStream();
+        stream.write("Hello S3 World!".getBytes());
+
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // Verify initial state
+        assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+        assertThat(mockAccessor.uploadPartCalls).isEqualTo(1);
+        assertThat(targetFile.exists()).isFalse();
+
+        // Discard instead of commit
+        committer.discard();
+
+        // Verify abort was called
+        assertThat(mockAccessor.abortMultipartUploadCalled).isTrue();
+        assertThat(mockAccessor.completeMultipartUploadCalled).isFalse();
+
+        // Target file should not exist
+        assertThat(targetFile.exists()).isFalse();
+    }
+
+    @Test
+    void testCommitAfterDiscard() throws IOException {
+        stream = createStream();
+        stream.write("data".getBytes());
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        committer.discard();
+
+        assertThatThrownBy(() -> committer.commit())
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Cannot commit: committer has been 
discarded");
+    }
+
+    @Test
+    void testSimpleCommit() throws IOException {
+        stream = createStream();
+
+        String testData = "Hello S3 World!";
+        stream.write(testData.getBytes());
+
+        assertThat(stream.getPos()).isEqualTo(testData.length());
+
+        // Close for commit
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        // Target file should not exist yet
+        assertThat(targetFile.exists()).isFalse();
+
+        // Commit
+        committer.commit();
+
+        // Verify upload completed
+        assertThat(mockAccessor.startMultipartUploadCalled).isTrue();
+        assertThat(mockAccessor.uploadPartCalls).isEqualTo(1);
+        assertThat(mockAccessor.completeMultipartUploadCalled).isTrue();
+
+        // Target file should exist with correct content
+        assertThat(targetFile.exists()).isTrue();
+        String writtenContent = new 
String(Files.readAllBytes(targetFile.toPath()));
+        assertThat(writtenContent).isEqualTo(testData);
+    }
+
+    @Test
+    void testDoubleDiscard() throws IOException {
+        stream = createStream();
+        stream.write("data".getBytes());
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+
+        committer.discard();
+        // Second discard should be safe (no-op)
+        committer.discard();
+
+        // Abort should only be called once
+        assertThat(mockAccessor.abortMultipartUploadCallCount).isEqualTo(1);
+
+        // Target file should not exist
+        assertThat(targetFile.exists()).isFalse();
+    }
+
+    /**
+     * Mock implementation that uses local files to simulate S3 multipart 
upload behavior. Extends
+     * S3Accessor but overrides all methods to avoid initialization issues.
+     */
+    private static class MockS3MultiPartUpload extends S3MultiPartUpload {
+        private final List<File> tempPartFiles = new ArrayList<>();
+        private final File targetFile;
+        private final String mockUploadId = "mock-upload-id-12345";
+
+        // Test tracking variables
+        boolean startMultipartUploadCalled = false;
+        int uploadPartCalls = 0;
+        boolean completeMultipartUploadCalled = false;
+        boolean abortMultipartUploadCalled = false;
+        int abortMultipartUploadCallCount = 0;
+
+        @SuppressWarnings("unused")
+        public MockS3MultiPartUpload(File targetFile) {
+            super(createStubFileSystem(), new Configuration());
+            this.targetFile = targetFile;
+        }
+
+        private static S3AFileSystem createStubFileSystem() {
+            // Create minimal stub to avoid NullPointerException during 
initialization
+            return new StubS3AFileSystem();
+        }
+
+        @Override
+        public String pathToObject(org.apache.hadoop.fs.Path hadoopPath) {
+            return hadoopPath.toUri().getPath().substring(1);
+        }
+
+        @Override
+        public String startMultiPartUpload(String key) {
+            startMultipartUploadCalled = true;
+            return mockUploadId;
+        }
+
+        @Override
+        public PartETag uploadPart(
+                String key, String uploadId, int partNumber, File inputFile, 
long byteLength)
+                throws IOException {
+            uploadPartCalls++;
+
+            // Create a temporary copy of the part file
+            File tempPartFile = Files.createTempFile("s3-part-" + partNumber, 
".tmp").toFile();
+            try (FileInputStream fis = new FileInputStream(inputFile);
+                    FileOutputStream fos = new FileOutputStream(tempPartFile)) 
{
+                byte[] buffer = new byte[8192];
+                int bytesRead;
+                while ((bytesRead = fis.read(buffer)) != -1) {
+                    fos.write(buffer, 0, bytesRead);
+                }
+            }
+            tempPartFiles.add(tempPartFile);
+
+            // Return mock UploadPartResult
+            return new PartETag(partNumber, "etag-" + partNumber);
+        }
+
+        @Override
+        public CompleteMultipartUploadResult completeMultipartUpload(
+                String destKey, String uploadId, List<PartETag> partETags, 
long length) {
+            completeMultipartUploadCalled = true;
+
+            // Simulate combining all parts into the final target file
+            try {
+                try (FileOutputStream fos = new FileOutputStream(targetFile)) {
+                    for (File partFile : tempPartFiles) {
+                        try (FileInputStream fis = new 
FileInputStream(partFile)) {
+                            byte[] buffer = new byte[8192];
+                            int bytesRead;
+                            while ((bytesRead = fis.read(buffer)) != -1) {
+                                fos.write(buffer, 0, bytesRead);
+                            }
+                        }
+                    }
+                }
+
+                // Clean up temp files
+                for (File partFile : tempPartFiles) {
+                    if (partFile.exists()) {
+                        partFile.delete();
+                    }
+                }
+                tempPartFiles.clear();
+
+                return new MockCompleteMultipartUploadResult("mock-bucket", 
destKey, "mock-etag");
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to complete multipart 
upload", e);
+            }
+        }
+
+        @Override
+        public void abortMultipartUpload(String destKey, String uploadId) {
+            abortMultipartUploadCalled = true;
+            abortMultipartUploadCallCount++;
+
+            // Clean up temp files
+            for (File partFile : tempPartFiles) {
+                if (partFile.exists()) {
+                    partFile.delete();
+                }
+            }
+            tempPartFiles.clear();
+
+            // Clean up target file if it exists
+            if (targetFile.exists()) {
+                targetFile.delete();
+            }
+        }
+    }
+
+    /** Mock implementation of PartETag. */
+    private static class MockPartETag extends PartETag {
+        private final String eTag;
+
+        public MockPartETag(String eTag, int partNumber) {
+            super(partNumber, eTag);
+            this.eTag = eTag;
+        }
+
+        @Override
+        public String getETag() {
+            return eTag;
+        }
+    }
+
+    /** Mock implementation of CompleteMultipartUploadResult. */
+    private static class MockCompleteMultipartUploadResult extends 
CompleteMultipartUploadResult {
+        private final String bucketName;
+        private final String key;
+        private final String eTag;
+
+        public MockCompleteMultipartUploadResult(String bucketName, String 
key, String eTag) {
+            this.bucketName = bucketName;
+            this.key = key;
+            this.eTag = eTag;
+        }
+
+        @Override
+        public String getBucketName() {
+            return bucketName;
+        }
+
+        @Override
+        public String getKey() {
+            return key;
+        }
+
+        @Override
+        public String getETag() {
+            return eTag;
+        }
+    }
+
+    /**
+     * Minimal stub implementation to avoid NullPointerException during 
S3Accessor initialization.
+     */
+    private static class StubS3AFileSystem extends S3AFileSystem {
+        // Minimal stub - no implementation needed for our mock
+    }
+}

Reply via email to