This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1b55922dc0 [core][fix] Blob with rolling file failed (#6518)
1b55922dc0 is described below
commit 1b55922dc0cff2a9b1b957d5710fb718c659be32
Author: YeJunHao <[email protected]>
AuthorDate: Mon Nov 3 19:58:45 2025 +0800
[core][fix] Blob with rolling file failed (#6518)
---
.../paimon/append/RollingBlobFileWriter.java | 51 ++++++++++++++--------
.../org/apache/paimon/append/BlobTableTest.java | 32 ++++++++++----
2 files changed, 58 insertions(+), 25 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index ba51f8d3bc..17928dd6fa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
@@ -85,9 +86,10 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
private final Supplier<
PeojectedFileWriter<SingleFileWriter<InternalRow,
DataFileMeta>, DataFileMeta>>
writerFactory;
- private final PeojectedFileWriter<
- RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>
- blobWriter;
+ private final Supplier<
+ PeojectedFileWriter<
+ RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>>
+ blobWriterFactory;
private final long targetFileSize;
private final long blobTargetFileSize;
@@ -96,6 +98,9 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
private final List<DataFileMeta> results;
private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>,
DataFileMeta>
currentWriter;
+ private PeojectedFileWriter<
+ RollingFileWriterImpl<InternalRow, DataFileMeta>,
List<DataFileMeta>>
+ blobWriter;
private long recordCount = 0;
private boolean closed = false;
@@ -144,18 +149,19 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
statsDenseStore);
// Initialize blob writer
- this.blobWriter =
- createBlobWriter(
- fileIO,
- schemaId,
- blobType,
- writeSchema,
- pathFactory,
- seqNumCounter,
- fileSource,
- asyncFileWrite,
- statsDenseStore,
- blobTargetFileSize);
+ this.blobWriterFactory =
+ () ->
+ createBlobWriter(
+ fileIO,
+ schemaId,
+ blobType,
+ writeSchema,
+ pathFactory,
+ seqNumCounter,
+ fileSource,
+ asyncFileWrite,
+ statsDenseStore,
+ blobTargetFileSize);
}
/** Creates a factory for normal data writers. */
@@ -265,6 +271,9 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
if (currentWriter == null) {
currentWriter = writerFactory.get();
}
+ if (blobWriter == null) {
+ blobWriter = blobWriterFactory.get();
+ }
currentWriter.write(row);
blobWriter.write(row);
recordCount++;
@@ -322,7 +331,10 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
for (FileWriterAbortExecutor abortExecutor : closedWriters) {
abortExecutor.abort();
}
- blobWriter.abort();
+ if (blobWriter != null) {
+ blobWriter.abort();
+ blobWriter = null;
+ }
}
/** Checks if the current file should be rolled based on size and record
count. */
@@ -369,8 +381,13 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
/** Closes the blob writer and processes blob metadata with appropriate
tags. */
private List<DataFileMeta> closeBlobWriter() throws IOException {
+ if (blobWriter == null) {
+ return Collections.emptyList();
+ }
blobWriter.close();
- return blobWriter.result();
+ List<DataFileMeta> results = blobWriter.result();
+ blobWriter = null;
+ return results;
}
/** Validates that the row counts match between main and blob files. */
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index ab37159648..334fe8d015 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -57,7 +57,7 @@ public class BlobTableTest extends TableTestBase {
public void testBasic() throws Exception {
createTableDefault();
- commitDefault(writeDataDefault(100, 1));
+ commitDefault(writeDataDefault(1000, 1));
AtomicInteger integer = new AtomicInteger(0);
@@ -71,7 +71,7 @@ public class BlobTableTest extends TableTestBase {
assertThat(fieldGroups.size()).isEqualTo(2);
assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
- assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(10);
readDefault(
row -> {
@@ -81,7 +81,7 @@ public class BlobTableTest extends TableTestBase {
}
});
- assertThat(integer.get()).isEqualTo(100);
+ assertThat(integer.get()).isEqualTo(1000);
}
@Test
@@ -104,7 +104,7 @@ public class BlobTableTest extends TableTestBase {
public void testMultiBatch() throws Exception {
createTableDefault();
- commitDefault(writeDataDefault(100, 2));
+ commitDefault(writeDataDefault(1000, 2));
AtomicInteger integer = new AtomicInteger(0);
@@ -120,7 +120,7 @@ public class BlobTableTest extends TableTestBase {
DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0);
assertThat(fieldGroups.size()).isEqualTo(2);
assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
- assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
+ assertThat(fieldGroups.get(1).files().size()).isEqualTo(10);
}
readDefault(
@@ -129,7 +129,7 @@ public class BlobTableTest extends TableTestBase {
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
}
});
- assertThat(integer.get()).isEqualTo(200);
+ assertThat(integer.get()).isEqualTo(2000);
}
@Test
@@ -179,6 +179,21 @@ public class BlobTableTest extends TableTestBase {
assertThat(i.get()).isEqualTo(1);
}
+ @Test
+ public void testRolling() throws Exception {
+ createTableDefault();
+ commitDefault(writeDataDefault(1025, 1));
+ AtomicInteger integer = new AtomicInteger(0);
+ readDefault(
+ row -> {
+ integer.incrementAndGet();
+ if (integer.get() % 50 == 0) {
+
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ }
+ });
+ assertThat(integer.get()).isEqualTo(1025);
+ }
+
protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
@@ -191,12 +206,13 @@ public class BlobTableTest extends TableTestBase {
}
protected InternalRow dataDefault(int time, int size) {
- return GenericRow.of(RANDOM.nextInt(), randomString(), new
BlobData(blobBytes));
+ return GenericRow.of(
+ RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new
BlobData(blobBytes));
}
@Override
protected byte[] randomBytes() {
- byte[] binary = new byte[2 * 1024 * 1024];
+ byte[] binary = new byte[2 * 1024 * 124];
RANDOM.nextBytes(binary);
return binary;
}