This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bc64466ec6 [core] format table: jindo io use upload part to support 
two-phase commit (#6470)
bc64466ec6 is described below

commit bc64466ec657a76608b864d3740e5f3f0eac0d22
Author: jerry <[email protected]>
AuthorDate: Mon Oct 27 15:41:11 2025 +0800

    [core] format table: jindo io use upload part to support two-phase commit 
(#6470)
---
 .../org/apache/paimon/fs/MultiPartUploadStore.java |   2 +-
 .../fs/MultiPartUploadTwoPhaseOutputStream.java    |  53 ++++++++---
 .../MultiPartUploadTwoPhaseOutputStreamTest.java   |  56 ++++++++---
 .../apache/paimon/jindo/HadoopCompliantFileIO.java |   2 +-
 .../java/org/apache/paimon/jindo/JindoFileIO.java  |  14 +++
 .../apache/paimon/jindo/JindoMultiPartUpload.java  | 104 +++++++++++++++++++++
 .../jindo/JindoMultiPartUploadCommitter.java       |  50 ++++++++++
 .../paimon/jindo/JindoTwoPhaseOutputStream.java}   |  24 ++---
 .../org/apache/paimon/oss/OSSMultiPartUpload.java  |   2 +-
 .../apache/paimon/oss/OssTwoPhaseOutputStream.java |   5 -
 .../org/apache/paimon/s3/S3MultiPartUpload.java    |  19 +---
 .../apache/paimon/s3/S3TwoPhaseOutputStream.java   |   5 -
 12 files changed, 265 insertions(+), 71 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
index 2a18bed8a2..83a9f03348 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
@@ -43,7 +43,7 @@ public interface MultiPartUploadStore<T, C> {
             String objectName, String uploadId, List<T> partETags, long 
numBytesInParts)
             throws IOException;
 
-    T uploadPart(String objectName, String uploadId, int partNumber, File 
file, long byteLength)
+    T uploadPart(String objectName, String uploadId, int partNumber, File 
file, int byteLength)
             throws IOException;
 
     void abortMultipartUpload(String objectName, String uploadId) throws 
IOException;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index ab20048f36..ed4cf3809e 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -57,7 +57,12 @@ public abstract class MultiPartUploadTwoPhaseOutputStream<T, 
C> extends TwoPhase
         this.position = 0;
     }
 
-    public abstract long partSizeThreshold();
+    // OSS limit:  100KB ~ 5GB
+    // S3 limit:  5MiB ~ 5GiB
+    // Considering memory usage, and referencing Flink's setting of 10MiB.
+    public int partSizeThreshold() {
+        return 10 << 20;
+    }
 
     public abstract Committer committer(
             String uploadId, List<T> uploadedParts, String objectName, long 
position);
@@ -89,10 +94,23 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
         if (closed) {
             throw new IOException("Stream is closed");
         }
-        buffer.write(b, off, len);
-        position += len;
-        if (buffer.size() >= partSizeThreshold()) {
-            uploadPart();
+        int remaining = len;
+        int offset = off;
+        while (remaining > 0) {
+            if (buffer.size() >= partSizeThreshold()) {
+                uploadPart();
+            }
+            int currentSize = buffer.size();
+            int space = partSizeThreshold() - currentSize;
+            int count = Math.min(remaining, space);
+            buffer.write(b, offset, count);
+            offset += count;
+            remaining -= count;
+            position += count;
+            // consume buffer if it is full
+            if (buffer.size() >= partSizeThreshold()) {
+                uploadPart();
+            }
         }
     }
 
@@ -133,25 +151,25 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
         }
 
         File tempFile = null;
+        int partNumber = uploadedParts.size() + 1;
         try {
-            byte[] data = buffer.toByteArray();
             tempFile = Files.createTempFile("multi-part-" + UUID.randomUUID(), 
".tmp").toFile();
             try (FileOutputStream fos = new FileOutputStream(tempFile)) {
-                fos.write(data);
+                buffer.writeTo(fos);
                 fos.flush();
             }
             T partETag =
                     multiPartUploadStore.uploadPart(
-                            objectName, uploadId, uploadedParts.size() + 1, 
tempFile, data.length);
+                            objectName,
+                            uploadId,
+                            partNumber,
+                            tempFile,
+                            checkedDownCast(tempFile.length()));
             uploadedParts.add(partETag);
             buffer.reset();
         } catch (Exception e) {
             throw new IOException(
-                    "Failed to upload part "
-                            + (uploadedParts.size() + 1)
-                            + " for upload ID: "
-                            + uploadId,
-                    e);
+                    "Failed to upload part " + partNumber + " for upload ID: " 
+ uploadId, e);
         } finally {
             if (tempFile != null && tempFile.exists()) {
                 if (!tempFile.delete()) {
@@ -160,4 +178,13 @@ public abstract class 
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
             }
         }
     }
+
+    private static int checkedDownCast(long value) {
+        int downCast = (int) value;
+        if (downCast != value) {
+            throw new IllegalArgumentException(
+                    "Cannot downcast long value " + value + " to integer.");
+        }
+        return downCast;
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
index dec343e49a..1cf28389b3 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
@@ -30,6 +30,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -66,7 +67,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
                 .isEqualTo("hello 
world!".getBytes(StandardCharsets.UTF_8).length);
 
         TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
-        assertThat(store.getUploadedParts()).hasSize(2);
+        assertThat(store.getUploadedParts()).hasSize(3);
         
assertThat(committer.targetFilePath().toString()).isEqualTo(store.getStartedObjectName());
 
         committer.commit(fileIO);
@@ -122,11 +123,44 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         first.commit(fileIO);
     }
 
+    @Test
+    void testBigWriteSplitByThreshold() throws IOException {
+        TestMultiPartUploadTwoPhaseOutputStream stream =
+                new TestMultiPartUploadTwoPhaseOutputStream(store, objectPath, 
5);
+
+        byte[] data1 = "abc".getBytes(StandardCharsets.UTF_8);
+        stream.write(data1);
+        byte[] data2 = "abcdefghij".getBytes(StandardCharsets.UTF_8);
+        stream.write(data2);
+
+        assertThat(store.getUploadedParts()).hasSize(2);
+        assertThat(store.getUploadedParts())
+                .extracting(TestPart::getPartNumber)
+                .containsExactly(1, 2);
+        assertThat(store.getUploadedParts())
+                .extracting(TestPart::getContent)
+                .containsExactly("abcab", "cdefg");
+        assertThat(stream.getPos()).isEqualTo(data1.length + data2.length);
+        stream.flush();
+        assertThat(store.getUploadedParts())
+                .extracting(TestPart::getContent)
+                .containsExactly("abcab", "cdefg", "hij");
+        TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+        assertThat(store.getUploadedParts()).hasSize(3);
+
+        committer.commit(fileIO);
+
+        
assertThat(store.getCompletedUploadId()).isEqualTo(store.getStartedUploadId());
+        
assertThat(store.getCompletedObjectName()).isEqualTo(store.getStartedObjectName());
+        
assertThat(store.getCompletedParts()).containsExactlyElementsOf(store.getUploadedParts());
+        assertThat(store.getCompletedBytes()).isEqualTo(stream.getPos());
+        assertThat(store.getAbortedUploadId()).isNull();
+    }
+
     /** Fake store implementation for testing. */
     private static class FakeMultiPartUploadStore
             implements MultiPartUploadStore<TestPart, String> {
 
-        private int uploadCounter = 0;
         private final List<TestPart> uploadedParts = new ArrayList<>();
         private String startedUploadId;
         private String startedObjectName;
@@ -144,7 +178,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         @Override
         public String startMultiPartUpload(String objectName) {
             this.startedObjectName = objectName;
-            this.startedUploadId = "upload-" + (++uploadCounter);
+            this.startedUploadId = UUID.randomUUID().toString();
             this.uploadedParts.clear();
             this.completedUploadId = null;
             this.completedObjectName = null;
@@ -169,7 +203,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
 
         @Override
         public TestPart uploadPart(
-                String objectName, String uploadId, int partNumber, File file, 
long byteLength)
+                String objectName, String uploadId, int partNumber, File file, 
int byteLength)
                 throws IOException {
             byte[] bytes = Files.readAllBytes(file.toPath());
             String content = new String(bytes, StandardCharsets.UTF_8);
@@ -220,10 +254,10 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
             extends MultiPartUploadTwoPhaseOutputStream<TestPart, String> {
 
         private final FakeMultiPartUploadStore store;
-        private final long threshold;
+        private final int threshold;
 
         private TestMultiPartUploadTwoPhaseOutputStream(
-                FakeMultiPartUploadStore store, org.apache.hadoop.fs.Path 
path, long threshold)
+                FakeMultiPartUploadStore store, org.apache.hadoop.fs.Path 
path, int threshold)
                 throws IOException {
             super(store, path);
             this.store = store;
@@ -231,7 +265,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         }
 
         @Override
-        public long partSizeThreshold() {
+        public int partSizeThreshold() {
             return threshold;
         }
 
@@ -257,12 +291,12 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
                 String uploadId,
                 List<TestPart> parts,
                 String objectName,
-                long byteLength) {
+                long position) {
             this.store = store;
             this.uploadId = uploadId;
             this.parts = new ArrayList<>(parts);
             this.objectName = objectName;
-            this.byteLength = byteLength;
+            this.byteLength = position;
         }
 
         @Override
@@ -308,9 +342,5 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
         String getContent() {
             return content;
         }
-
-        long getByteLength() {
-            return byteLength;
-        }
     }
 }
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
index 99c80b912a..98628d9338 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
@@ -124,7 +124,7 @@ public abstract class HadoopCompliantFileIO implements 
FileIO {
         return getFileSystemPair(path).getKey();
     }
 
-    private Pair<JindoHadoopSystem, String> 
getFileSystemPair(org.apache.hadoop.fs.Path path)
+    protected Pair<JindoHadoopSystem, String> 
getFileSystemPair(org.apache.hadoop.fs.Path path)
             throws IOException {
         if (fsMap == null) {
             synchronized (this) {
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
index 58a3a0dfd4..614ca36b79 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -20,6 +20,8 @@ package org.apache.paimon.jindo;
 
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Pair;
@@ -122,6 +124,18 @@ public class JindoFileIO extends HadoopCompliantFileIO {
         return hadoopOptions;
     }
 
+    @Override
+    public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean 
overwrite)
+            throws IOException {
+        if (!overwrite && this.exists(path)) {
+            throw new IOException("File " + path + " already exists.");
+        }
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
+        JindoHadoopSystem fs = pair.getKey();
+        return new JindoTwoPhaseOutputStream(new JindoMultiPartUpload(fs, 
hadoopPath), hadoopPath);
+    }
+
     @Override
     protected Pair<JindoHadoopSystem, String> 
createFileSystem(org.apache.hadoop.fs.Path path) {
         final String scheme = path.toUri().getScheme();
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
new file mode 100644
index 0000000000..ce5aab4122
--- /dev/null
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+
+import com.aliyun.jindodata.api.spec.protos.JdoMpuUploadPartReply;
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPartList;
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+import com.aliyun.jindodata.store.JindoMpuStore;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+/** Provides the multipart upload by Jindo. */
+public class JindoMultiPartUpload implements 
MultiPartUploadStore<JdoObjectPart, String> {
+
+    private final JindoHadoopSystem fs;
+    private final JindoMpuStore mpuStore;
+
+    public JindoMultiPartUpload(JindoHadoopSystem fs, Path filePath) {
+        this.fs = fs;
+        this.mpuStore = fs.getMpuStore(filePath);
+    }
+
+    @Override
+    public Path workingDirectory() {
+        return fs.getWorkingDirectory();
+    }
+
+    @Override
+    public String startMultiPartUpload(String objectName) throws IOException {
+        return mpuStore.initMultiPartUpload(new Path(objectName));
+    }
+
+    @Override
+    public String completeMultipartUpload(
+            String objectName,
+            String uploadId,
+            List<JdoObjectPart> partETags,
+            long numBytesInParts) {
+        try {
+            JdoObjectPartList partList =
+                    new 
com.aliyun.jindodata.api.spec.protos.JdoObjectPartList();
+            partList.setParts(partETags.toArray(new JdoObjectPart[0]));
+            mpuStore.commitMultiPartUpload(new Path(objectName), uploadId, 
partList);
+            return uploadId;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to complete multipart upload 
for: " + objectName, e);
+        }
+    }
+
+    @Override
+    public JdoObjectPart uploadPart(
+            String objectName, String uploadId, int partNumber, File file, int 
byteLength)
+            throws IOException {
+        try {
+            ByteBuffer buffer;
+            try (FileInputStream fis = new FileInputStream(file);
+                    FileChannel channel = fis.getChannel()) {
+                buffer = ByteBuffer.allocate(byteLength);
+                channel.read(buffer);
+                buffer.flip();
+            }
+
+            JdoMpuUploadPartReply result =
+                    mpuStore.uploadPart(new Path(objectName), uploadId, 
partNumber, buffer);
+            return result.getPartInfo();
+        } catch (Exception e) {
+            throw new IOException("Failed to upload part " + partNumber + " 
for: " + objectName, e);
+        }
+    }
+
+    @Override
+    public void abortMultipartUpload(String objectName, String uploadId) {
+        try {
+            mpuStore.abortMultipartUpload(new Path(objectName), uploadId);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to abort multipart upload for: 
" + objectName, e);
+        }
+    }
+}
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
new file mode 100644
index 0000000000..43448431b1
--- /dev/null
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.BaseMultiPartUploadCommitter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.MultiPartUploadStore;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.Pair;
+
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Jindo implementation of MultiPartUploadCommitter. */
+public class JindoMultiPartUploadCommitter
+        extends BaseMultiPartUploadCommitter<JdoObjectPart, String> {
+    public JindoMultiPartUploadCommitter(
+            String uploadId, List<JdoObjectPart> uploadedParts, String 
objectName, long position) {
+        super(uploadId, uploadedParts, objectName, position);
+    }
+
+    @Override
+    protected MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore(
+            FileIO fileIO, Path targetPath) throws IOException {
+        JindoFileIO jindoFileIO = (JindoFileIO) fileIO;
+        org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath);
+        Pair<JindoHadoopSystem, String> pair = 
jindoFileIO.getFileSystemPair(hadoopPath);
+        JindoHadoopSystem fs = pair.getKey();
+        return new JindoMultiPartUpload(fs, hadoopPath);
+    }
+}
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
similarity index 61%
copy from 
paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
copy to 
paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
index f90b87c467..fb85cbd3c3 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
@@ -16,36 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.oss;
+package org.apache.paimon.jindo;
 
 import org.apache.paimon.fs.MultiPartUploadStore;
 import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
 
-import com.aliyun.oss.model.CompleteMultipartUploadResult;
-import com.aliyun.oss.model.PartETag;
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
 
 import java.io.IOException;
 import java.util.List;
 
-/** OSS implementation of TwoPhaseOutputStream using multipart upload. */
-public class OssTwoPhaseOutputStream
-        extends MultiPartUploadTwoPhaseOutputStream<PartETag, 
CompleteMultipartUploadResult> {
+/** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
+public class JindoTwoPhaseOutputStream
+        extends MultiPartUploadTwoPhaseOutputStream<JdoObjectPart, String> {
 
-    public OssTwoPhaseOutputStream(
-            MultiPartUploadStore<PartETag, CompleteMultipartUploadResult> 
multiPartUploadStore,
+    public JindoTwoPhaseOutputStream(
+            MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
             org.apache.hadoop.fs.Path hadoopPath)
             throws IOException {
         super(multiPartUploadStore, hadoopPath);
     }
 
-    @Override
-    public long partSizeThreshold() {
-        return 10L << 20;
-    }
-
     @Override
     public Committer committer(
-            String uploadId, List<PartETag> uploadedParts, String objectName, 
long position) {
-        return new OSSMultiPartUploadCommitter(uploadId, uploadedParts, 
objectName, position);
+            String uploadId, List<JdoObjectPart> uploadedParts, String 
objectName, long position) {
+        return new JindoMultiPartUploadCommitter(uploadId, uploadedParts, 
objectName, position);
     }
 }
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
index 6ea486f683..031ee1f848 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
@@ -60,7 +60,7 @@ public class OSSMultiPartUpload
 
     @Override
     public PartETag uploadPart(
-            String objectName, String uploadId, int partNumber, File file, 
long byteLength)
+            String objectName, String uploadId, int partNumber, File file, int 
byteLength)
             throws IOException {
         return store.uploadPart(file, objectName, uploadId, partNumber);
     }
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
index f90b87c467..edc15ac864 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
@@ -38,11 +38,6 @@ public class OssTwoPhaseOutputStream
         super(multiPartUploadStore, hadoopPath);
     }
 
-    @Override
-    public long partSizeThreshold() {
-        return 10L << 20;
-    }
-
     @Override
     public Committer committer(
             String uploadId, List<PartETag> uploadedParts, String objectName, 
long position) {
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
index 4fd3590027..ca112bac4e 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
@@ -78,17 +78,11 @@ public class S3MultiPartUpload
 
     @Override
     public PartETag uploadPart(
-            String objectName, String uploadId, int partNumber, File file, 
long byteLength)
+            String objectName, String uploadId, int partNumber, File file, int 
byteLength)
             throws IOException {
         final UploadPartRequest uploadRequest =
                 s3accessHelper.newUploadPartRequest(
-                        objectName,
-                        uploadId,
-                        partNumber,
-                        checkedDownCast(byteLength),
-                        null,
-                        file,
-                        0L);
+                        objectName, uploadId, partNumber, byteLength, null, 
file, 0L);
         return s3accessHelper.uploadPart(uploadRequest).getPartETag();
     }
 
@@ -108,13 +102,4 @@ public class S3MultiPartUpload
             super(owner, conf, statisticsContext, auditSpanSource, auditSpan);
         }
     }
-
-    private static int checkedDownCast(long value) {
-        int downCast = (int) value;
-        if (downCast != value) {
-            throw new IllegalArgumentException(
-                    "Cannot downcast long value " + value + " to integer.");
-        }
-        return downCast;
-    }
 }
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
index 49a81ca6e6..76a424ecd3 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
@@ -38,11 +38,6 @@ public class S3TwoPhaseOutputStream
         super(multiPartUploadStore, hadoopPath);
     }
 
-    @Override
-    public long partSizeThreshold() {
-        return 5L << 20;
-    }
-
     @Override
     public Committer committer(
             String uploadId, List<PartETag> uploadedParts, String objectName, 
long position) {

Reply via email to