This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 785e4c77f0 [core] format table: fix two phase stream flush bug (#6620)
785e4c77f0 is described below
commit 785e4c77f06706cd918690e7d45c712af1d5d979
Author: jerry <[email protected]>
AuthorDate: Mon Nov 17 18:54:43 2025 +0800
[core] format table: fix two phase stream flush bug (#6620)
---
.../fs/MultiPartUploadTwoPhaseOutputStream.java | 28 ++++++++---------
.../MultiPartUploadTwoPhaseOutputStreamTest.java | 35 +++++++++++++++++++++-
2 files changed, 46 insertions(+), 17 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index 3d1b5b0ce4..60f0bc68dd 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -84,9 +84,7 @@ public abstract class MultiPartUploadTwoPhaseOutputStream<T,
C> extends TwoPhase
}
buffer.write(b);
position++;
- if (buffer.size() >= partSizeThreshold()) {
- uploadPart();
- }
+ uploadPartIfLargerThanThreshold();
}
@Override
@@ -102,9 +100,7 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
int remaining = len;
int offset = off;
while (remaining > 0) {
- if (buffer.size() >= partSizeThreshold()) {
- uploadPart();
- }
+ uploadPartIfLargerThanThreshold();
int currentSize = buffer.size();
int space = partSizeThreshold() - currentSize;
int count = Math.min(remaining, space);
@@ -112,10 +108,7 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
offset += count;
remaining -= count;
position += count;
- // consume buffer if it is full
- if (buffer.size() >= partSizeThreshold()) {
- uploadPart();
- }
+ uploadPartIfLargerThanThreshold();
}
}
@@ -124,7 +117,7 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
if (closed) {
throw new IOException("Stream is closed");
}
- uploadPart();
+ uploadPartIfLargerThanThreshold();
}
@Override
@@ -142,15 +135,18 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
throw new IOException("Stream is already closed but committer is
null");
}
closed = true;
+ // Only last upload part can be smaller than part size threshold
+ uploadPartUtil();
+ return committer();
+ }
- if (buffer.size() > 0) {
- uploadPart();
+ private void uploadPartIfLargerThanThreshold() throws IOException {
+ if (buffer.size() >= partSizeThreshold()) {
+ uploadPartUtil();
}
-
- return committer();
}
- private void uploadPart() throws IOException {
+ private void uploadPartUtil() throws IOException {
if (buffer.size() == 0) {
return;
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
index 0a002fd783..9d3f759737 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java
@@ -144,7 +144,7 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
stream.flush();
assertThat(store.getUploadedParts())
.extracting(TestPart::getContent)
- .containsExactly("abcab", "cdefg", "hij");
+ .containsExactly("abcab", "cdefg");
TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
assertThat(store.getUploadedParts()).hasSize(3);
@@ -157,6 +157,39 @@ class MultiPartUploadTwoPhaseOutputStreamTest {
assertThat(store.getAbortedUploadId()).isNull();
}
+ @Test
+ void testFlushWhenBufferSizeIsSmallerThanThresholdDoesNotUpload() throws
IOException {
+ TestMultiPartUploadTwoPhaseOutputStream stream =
+ new TestMultiPartUploadTwoPhaseOutputStream(store, objectPath,
10);
+
+ // Write 3 bytes, which is less than the threshold of 10
+ stream.write("abc".getBytes(StandardCharsets.UTF_8));
+ assertThat(store.getUploadedParts()).isEmpty();
+
+ // Flush should not trigger upload since buffer size (3) < threshold
(10)
+ stream.flush();
+ assertThat(store.getUploadedParts()).isEmpty();
+ assertThat(stream.getPos()).isEqualTo(3);
+
+ // Write another 4 bytes, total buffer size is 7, still less than
threshold
+ stream.write("defg".getBytes(StandardCharsets.UTF_8));
+ assertThat(store.getUploadedParts()).isEmpty();
+
+ // Flush again, should still not upload since buffer size (7) <
threshold (10)
+ stream.flush();
+ assertThat(store.getUploadedParts()).isEmpty();
+ assertThat(stream.getPos()).isEqualTo(7);
+
+ // Only when closeForCommit is called, the remaining buffer should be
uploaded
+ TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
+ assertThat(store.getUploadedParts()).hasSize(1);
+
assertThat(store.getUploadedParts().get(0).getContent()).isEqualTo("abcdefg");
+
assertThat(store.getUploadedParts().get(0).getPartNumber()).isEqualTo(1);
+
+ committer.commit(fileIO);
+ assertThat(store.getCompletedBytes()).isEqualTo(7);
+ }
+
/** Fake store implementation for testing. */
private static class FakeMultiPartUploadStore
implements MultiPartUploadStore<TestPart, String> {