This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bc64466ec6 [core] format table: jindo io use upload part to support
two-phase commit (#6470)
bc64466ec6 is described below
commit bc64466ec657a76608b864d3740e5f3f0eac0d22
Author: jerry <[email protected]>
AuthorDate: Mon Oct 27 15:41:11 2025 +0800
[core] format table: jindo io use upload part to support two-phase commit
(#6470)
---
.../org/apache/paimon/fs/MultiPartUploadStore.java | 2 +-
.../fs/MultiPartUploadTwoPhaseOutputStream.java | 53 ++++++++---
.../MultiPartUploadTwoPhaseOutputStreamTest.java | 56 ++++++++---
.../apache/paimon/jindo/HadoopCompliantFileIO.java | 2 +-
.../java/org/apache/paimon/jindo/JindoFileIO.java | 14 +++
.../apache/paimon/jindo/JindoMultiPartUpload.java | 104 +++++++++++++++++++++
.../jindo/JindoMultiPartUploadCommitter.java | 50 ++++++++++
.../paimon/jindo/JindoTwoPhaseOutputStream.java} | 24 ++---
.../org/apache/paimon/oss/OSSMultiPartUpload.java | 2 +-
.../apache/paimon/oss/OssTwoPhaseOutputStream.java | 5 -
.../org/apache/paimon/s3/S3MultiPartUpload.java | 19 +---
.../apache/paimon/s3/S3TwoPhaseOutputStream.java | 5 -
12 files changed, 265 insertions(+), 71 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
index 2a18bed8a2..83a9f03348 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadStore.java
@@ -43,7 +43,7 @@ public interface MultiPartUploadStore<T, C> {
String objectName, String uploadId, List<T> partETags, long
numBytesInParts)
throws IOException;
- T uploadPart(String objectName, String uploadId, int partNumber, File
file, long byteLength)
+ T uploadPart(String objectName, String uploadId, int partNumber, File
file, int byteLength)
throws IOException;
void abortMultipartUpload(String objectName, String uploadId) throws
IOException;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index ab20048f36..ed4cf3809e 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -57,7 +57,12 @@ public abstract class MultiPartUploadTwoPhaseOutputStream<T,
C> extends TwoPhase
this.position = 0;
}
- public abstract long partSizeThreshold();
+ // OSS limit: 100KB ~ 5GB
+ // S3 limit: 5MiB ~ 5GiB
+ // Considering memory usage, and referencing Flink's setting of 10MiB.
+ public int partSizeThreshold() {
+ return 10 << 20;
+ }
public abstract Committer committer(
String uploadId, List<T> uploadedParts, String objectName, long
position);
@@ -89,10 +94,23 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
if (closed) {
throw new IOException("Stream is closed");
}
- buffer.write(b, off, len);
- position += len;
- if (buffer.size() >= partSizeThreshold()) {
- uploadPart();
+ int remaining = len;
+ int offset = off;
+ while (remaining > 0) {
+ if (buffer.size() >= partSizeThreshold()) {
+ uploadPart();
+ }
+ int currentSize = buffer.size();
+ int space = partSizeThreshold() - currentSize;
+ int count = Math.min(remaining, space);
+ buffer.write(b, offset, count);
+ offset += count;
+ remaining -= count;
+ position += count;
+ // consume buffer if it is full
+ if (buffer.size() >= partSizeThreshold()) {
+ uploadPart();
+ }
}
}
@@ -133,25 +151,25 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
}
File tempFile = null;
+ int partNumber = uploadedParts.size() + 1;
try {
- byte[] data = buffer.toByteArray();
tempFile = Files.createTempFile("multi-part-" + UUID.randomUUID(),
".tmp").toFile();
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
- fos.write(data);
+ buffer.writeTo(fos);
fos.flush();
}
T partETag =
multiPartUploadStore.uploadPart(
- objectName, uploadId, uploadedParts.size() + 1,
tempFile, data.length);
+ objectName,
+ uploadId,
+ partNumber,
+ tempFile,
+ checkedDownCast(tempFile.length()));
uploadedParts.add(partETag);
buffer.reset();
} catch (Exception e) {
throw new IOException(
- "Failed to upload part "
- + (uploadedParts.size() + 1)
- + " for upload ID: "
- + uploadId,
- e);
+ "Failed to upload part " + partNumber + " for upload ID: "
+ uploadId, e);
} finally {
if (tempFile != null && tempFile.exists()) {
if (!tempFile.delete()) {
@@ -160,4 +178,13 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
}
}
}
+
+ private static int checkedDownCast(long value) {
+ int downCast = (int) value;
+ if (downCast != value) {
+ throw new IllegalArgumentException(
+ "Cannot downcast long value " + value + " to integer.");
+ }
+ return downCast;
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
index dec343e49a..1cf28389b3 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
@@ -30,6 +30,7 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -66,7 +67,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
.isEqualTo("hello
world!".getBytes(StandardCharsets.UTF_8).length);
TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
- assertThat(store.getUploadedParts()).hasSize(2);
+ assertThat(store.getUploadedParts()).hasSize(3);
assertThat(committer.targetFilePath().toString()).isEqualTo(store.getStartedObjectName());
committer.commit(fileIO);
@@ -122,11 +123,44 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
first.commit(fileIO);
}
+ @Test
+ void testBigWriteSplitByThreshold() throws IOException {
+ TestMultiPartUploadTwoPhaseOutputStream stream =
+ new TestMultiPartUploadTwoPhaseOutputStream(store, objectPath,
5);
+
+ byte[] data1 = "abc".getBytes(StandardCharsets.UTF_8);
+ stream.write(data1);
+ byte[] data2 = "abcdefghij".getBytes(StandardCharsets.UTF_8);
+ stream.write(data2);
+
+ assertThat(store.getUploadedParts()).hasSize(2);
+ assertThat(store.getUploadedParts())
+ .extracting(TestPart::getPartNumber)
+ .containsExactly(1, 2);
+ assertThat(store.getUploadedParts())
+ .extracting(TestPart::getContent)
+ .containsExactly("abcab", "cdefg");
+ assertThat(stream.getPos()).isEqualTo(data1.length + data2.length);
+ stream.flush();
+ assertThat(store.getUploadedParts())
+ .extracting(TestPart::getContent)
+ .containsExactly("abcab", "cdefg", "hij");
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+ assertThat(store.getUploadedParts()).hasSize(3);
+
+ committer.commit(fileIO);
+
+
assertThat(store.getCompletedUploadId()).isEqualTo(store.getStartedUploadId());
+
assertThat(store.getCompletedObjectName()).isEqualTo(store.getStartedObjectName());
+
assertThat(store.getCompletedParts()).containsExactlyElementsOf(store.getUploadedParts());
+ assertThat(store.getCompletedBytes()).isEqualTo(stream.getPos());
+ assertThat(store.getAbortedUploadId()).isNull();
+ }
+
/** Fake store implementation for testing. */
private static class FakeMultiPartUploadStore
implements MultiPartUploadStore<TestPart, String> {
- private int uploadCounter = 0;
private final List<TestPart> uploadedParts = new ArrayList<>();
private String startedUploadId;
private String startedObjectName;
@@ -144,7 +178,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
@Override
public String startMultiPartUpload(String objectName) {
this.startedObjectName = objectName;
- this.startedUploadId = "upload-" + (++uploadCounter);
+ this.startedUploadId = UUID.randomUUID().toString();
this.uploadedParts.clear();
this.completedUploadId = null;
this.completedObjectName = null;
@@ -169,7 +203,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
@Override
public TestPart uploadPart(
- String objectName, String uploadId, int partNumber, File file,
long byteLength)
+ String objectName, String uploadId, int partNumber, File file,
int byteLength)
throws IOException {
byte[] bytes = Files.readAllBytes(file.toPath());
String content = new String(bytes, StandardCharsets.UTF_8);
@@ -220,10 +254,10 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
extends MultiPartUploadTwoPhaseOutputStream<TestPart, String> {
private final FakeMultiPartUploadStore store;
- private final long threshold;
+ private final int threshold;
private TestMultiPartUploadTwoPhaseOutputStream(
- FakeMultiPartUploadStore store, org.apache.hadoop.fs.Path
path, long threshold)
+ FakeMultiPartUploadStore store, org.apache.hadoop.fs.Path
path, int threshold)
throws IOException {
super(store, path);
this.store = store;
@@ -231,7 +265,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
}
@Override
- public long partSizeThreshold() {
+ public int partSizeThreshold() {
return threshold;
}
@@ -257,12 +291,12 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
String uploadId,
List<TestPart> parts,
String objectName,
- long byteLength) {
+ long position) {
this.store = store;
this.uploadId = uploadId;
this.parts = new ArrayList<>(parts);
this.objectName = objectName;
- this.byteLength = byteLength;
+ this.byteLength = position;
}
@Override
@@ -308,9 +342,5 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
String getContent() {
return content;
}
-
- long getByteLength() {
- return byteLength;
- }
}
}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
index 99c80b912a..98628d9338 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
@@ -124,7 +124,7 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
return getFileSystemPair(path).getKey();
}
- private Pair<JindoHadoopSystem, String>
getFileSystemPair(org.apache.hadoop.fs.Path path)
+ protected Pair<JindoHadoopSystem, String>
getFileSystemPair(org.apache.hadoop.fs.Path path)
throws IOException {
if (fsMap == null) {
synchronized (this) {
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
index 58a3a0dfd4..614ca36b79 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -20,6 +20,8 @@ package org.apache.paimon.jindo;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
@@ -122,6 +124,18 @@ public class JindoFileIO extends HadoopCompliantFileIO {
return hadoopOptions;
}
+ @Override
+ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean
overwrite)
+ throws IOException {
+ if (!overwrite && this.exists(path)) {
+ throw new IOException("File " + path + " already exists.");
+ }
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
+ JindoHadoopSystem fs = pair.getKey();
+ return new JindoTwoPhaseOutputStream(new JindoMultiPartUpload(fs,
hadoopPath), hadoopPath);
+ }
+
@Override
protected Pair<JindoHadoopSystem, String>
createFileSystem(org.apache.hadoop.fs.Path path) {
final String scheme = path.toUri().getScheme();
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
new file mode 100644
index 0000000000..ce5aab4122
--- /dev/null
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.MultiPartUploadStore;
+
+import com.aliyun.jindodata.api.spec.protos.JdoMpuUploadPartReply;
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPartList;
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+import com.aliyun.jindodata.store.JindoMpuStore;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+/** Provides the multipart upload by Jindo. */
+public class JindoMultiPartUpload implements
MultiPartUploadStore<JdoObjectPart, String> {
+
+ private final JindoHadoopSystem fs;
+ private final JindoMpuStore mpuStore;
+
+ public JindoMultiPartUpload(JindoHadoopSystem fs, Path filePath) {
+ this.fs = fs;
+ this.mpuStore = fs.getMpuStore(filePath);
+ }
+
+ @Override
+ public Path workingDirectory() {
+ return fs.getWorkingDirectory();
+ }
+
+ @Override
+ public String startMultiPartUpload(String objectName) throws IOException {
+ return mpuStore.initMultiPartUpload(new Path(objectName));
+ }
+
+ @Override
+ public String completeMultipartUpload(
+ String objectName,
+ String uploadId,
+ List<JdoObjectPart> partETags,
+ long numBytesInParts) {
+ try {
+ JdoObjectPartList partList =
+ new
com.aliyun.jindodata.api.spec.protos.JdoObjectPartList();
+ partList.setParts(partETags.toArray(new JdoObjectPart[0]));
+ mpuStore.commitMultiPartUpload(new Path(objectName), uploadId,
partList);
+ return uploadId;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to complete multipart upload
for: " + objectName, e);
+ }
+ }
+
+ @Override
+ public JdoObjectPart uploadPart(
+ String objectName, String uploadId, int partNumber, File file, int
byteLength)
+ throws IOException {
+ try {
+ ByteBuffer buffer;
+ try (FileInputStream fis = new FileInputStream(file);
+ FileChannel channel = fis.getChannel()) {
+ buffer = ByteBuffer.allocate(byteLength);
+ channel.read(buffer);
+ buffer.flip();
+ }
+
+ JdoMpuUploadPartReply result =
+ mpuStore.uploadPart(new Path(objectName), uploadId,
partNumber, buffer);
+ return result.getPartInfo();
+ } catch (Exception e) {
+ throw new IOException("Failed to upload part " + partNumber + "
for: " + objectName, e);
+ }
+ }
+
+ @Override
+ public void abortMultipartUpload(String objectName, String uploadId) {
+ try {
+ mpuStore.abortMultipartUpload(new Path(objectName), uploadId);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to abort multipart upload for:
" + objectName, e);
+ }
+ }
+}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
new file mode 100644
index 0000000000..43448431b1
--- /dev/null
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.BaseMultiPartUploadCommitter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.MultiPartUploadStore;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.Pair;
+
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Jindo implementation of MultiPartUploadCommitter. */
+public class JindoMultiPartUploadCommitter
+ extends BaseMultiPartUploadCommitter<JdoObjectPart, String> {
+ public JindoMultiPartUploadCommitter(
+ String uploadId, List<JdoObjectPart> uploadedParts, String
objectName, long position) {
+ super(uploadId, uploadedParts, objectName, position);
+ }
+
+ @Override
+ protected MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore(
+ FileIO fileIO, Path targetPath) throws IOException {
+ JindoFileIO jindoFileIO = (JindoFileIO) fileIO;
+ org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath);
+ Pair<JindoHadoopSystem, String> pair =
jindoFileIO.getFileSystemPair(hadoopPath);
+ JindoHadoopSystem fs = pair.getKey();
+ return new JindoMultiPartUpload(fs, hadoopPath);
+ }
+}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
similarity index 61%
copy from
paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
copy to
paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
index f90b87c467..fb85cbd3c3 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
@@ -16,36 +16,30 @@
* limitations under the License.
*/
-package org.apache.paimon.oss;
+package org.apache.paimon.jindo;
import org.apache.paimon.fs.MultiPartUploadStore;
import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
-import com.aliyun.oss.model.CompleteMultipartUploadResult;
-import com.aliyun.oss.model.PartETag;
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
import java.io.IOException;
import java.util.List;
-/** OSS implementation of TwoPhaseOutputStream using multipart upload. */
-public class OssTwoPhaseOutputStream
- extends MultiPartUploadTwoPhaseOutputStream<PartETag,
CompleteMultipartUploadResult> {
+/** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
+public class JindoTwoPhaseOutputStream
+ extends MultiPartUploadTwoPhaseOutputStream<JdoObjectPart, String> {
- public OssTwoPhaseOutputStream(
- MultiPartUploadStore<PartETag, CompleteMultipartUploadResult>
multiPartUploadStore,
+ public JindoTwoPhaseOutputStream(
+ MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
org.apache.hadoop.fs.Path hadoopPath)
throws IOException {
super(multiPartUploadStore, hadoopPath);
}
- @Override
- public long partSizeThreshold() {
- return 10L << 20;
- }
-
@Override
public Committer committer(
- String uploadId, List<PartETag> uploadedParts, String objectName,
long position) {
- return new OSSMultiPartUploadCommitter(uploadId, uploadedParts,
objectName, position);
+ String uploadId, List<JdoObjectPart> uploadedParts, String
objectName, long position) {
+ return new JindoMultiPartUploadCommitter(uploadId, uploadedParts,
objectName, position);
}
}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
index 6ea486f683..031ee1f848 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSMultiPartUpload.java
@@ -60,7 +60,7 @@ public class OSSMultiPartUpload
@Override
public PartETag uploadPart(
- String objectName, String uploadId, int partNumber, File file,
long byteLength)
+ String objectName, String uploadId, int partNumber, File file, int
byteLength)
throws IOException {
return store.uploadPart(file, objectName, uploadId, partNumber);
}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
index f90b87c467..edc15ac864 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java
@@ -38,11 +38,6 @@ public class OssTwoPhaseOutputStream
super(multiPartUploadStore, hadoopPath);
}
- @Override
- public long partSizeThreshold() {
- return 10L << 20;
- }
-
@Override
public Committer committer(
String uploadId, List<PartETag> uploadedParts, String objectName,
long position) {
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
index 4fd3590027..ca112bac4e 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java
@@ -78,17 +78,11 @@ public class S3MultiPartUpload
@Override
public PartETag uploadPart(
- String objectName, String uploadId, int partNumber, File file,
long byteLength)
+ String objectName, String uploadId, int partNumber, File file, int
byteLength)
throws IOException {
final UploadPartRequest uploadRequest =
s3accessHelper.newUploadPartRequest(
- objectName,
- uploadId,
- partNumber,
- checkedDownCast(byteLength),
- null,
- file,
- 0L);
+ objectName, uploadId, partNumber, byteLength, null,
file, 0L);
return s3accessHelper.uploadPart(uploadRequest).getPartETag();
}
@@ -108,13 +102,4 @@ public class S3MultiPartUpload
super(owner, conf, statisticsContext, auditSpanSource, auditSpan);
}
}
-
- private static int checkedDownCast(long value) {
- int downCast = (int) value;
- if (downCast != value) {
- throw new IllegalArgumentException(
- "Cannot downcast long value " + value + " to integer.");
- }
- return downCast;
- }
}
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
index 49a81ca6e6..76a424ecd3 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java
@@ -38,11 +38,6 @@ public class S3TwoPhaseOutputStream
super(multiPartUploadStore, hadoopPath);
}
- @Override
- public long partSizeThreshold() {
- return 5L << 20;
- }
-
@Override
public Committer committer(
String uploadId, List<PartETag> uploadedParts, String objectName,
long position) {