This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b55922dc0 [core][fix] Blob with rolling file failed (#6518)
1b55922dc0 is described below

commit 1b55922dc0cff2a9b1b957d5710fb718c659be32
Author: YeJunHao <[email protected]>
AuthorDate: Mon Nov 3 19:58:45 2025 +0800

    [core][fix] Blob with rolling file failed (#6518)
---
 .../paimon/append/RollingBlobFileWriter.java       | 51 ++++++++++++++--------
 .../org/apache/paimon/append/BlobTableTest.java    | 32 ++++++++++----
 2 files changed, 58 insertions(+), 25 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index ba51f8d3bc..17928dd6fa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
 
@@ -85,9 +86,10 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
     private final Supplier<
                     PeojectedFileWriter<SingleFileWriter<InternalRow, 
DataFileMeta>, DataFileMeta>>
             writerFactory;
-    private final PeojectedFileWriter<
-                    RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>>
-            blobWriter;
+    private final Supplier<
+                    PeojectedFileWriter<
+                            RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>>>
+            blobWriterFactory;
     private final long targetFileSize;
     private final long blobTargetFileSize;
 
@@ -96,6 +98,9 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
     private final List<DataFileMeta> results;
     private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>, 
DataFileMeta>
             currentWriter;
+    private PeojectedFileWriter<
+                    RollingFileWriterImpl<InternalRow, DataFileMeta>, 
List<DataFileMeta>>
+            blobWriter;
     private long recordCount = 0;
     private boolean closed = false;
 
@@ -144,18 +149,19 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
                         statsDenseStore);
 
         // Initialize blob writer
-        this.blobWriter =
-                createBlobWriter(
-                        fileIO,
-                        schemaId,
-                        blobType,
-                        writeSchema,
-                        pathFactory,
-                        seqNumCounter,
-                        fileSource,
-                        asyncFileWrite,
-                        statsDenseStore,
-                        blobTargetFileSize);
+        this.blobWriterFactory =
+                () ->
+                        createBlobWriter(
+                                fileIO,
+                                schemaId,
+                                blobType,
+                                writeSchema,
+                                pathFactory,
+                                seqNumCounter,
+                                fileSource,
+                                asyncFileWrite,
+                                statsDenseStore,
+                                blobTargetFileSize);
     }
 
     /** Creates a factory for normal data writers. */
@@ -265,6 +271,9 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
             if (currentWriter == null) {
                 currentWriter = writerFactory.get();
             }
+            if (blobWriter == null) {
+                blobWriter = blobWriterFactory.get();
+            }
             currentWriter.write(row);
             blobWriter.write(row);
             recordCount++;
@@ -322,7 +331,10 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
         for (FileWriterAbortExecutor abortExecutor : closedWriters) {
             abortExecutor.abort();
         }
-        blobWriter.abort();
+        if (blobWriter != null) {
+            blobWriter.abort();
+            blobWriter = null;
+        }
     }
 
     /** Checks if the current file should be rolled based on size and record 
count. */
@@ -369,8 +381,13 @@ public class RollingBlobFileWriter implements 
RollingFileWriter<InternalRow, Dat
 
     /** Closes the blob writer and processes blob metadata with appropriate 
tags. */
     private List<DataFileMeta> closeBlobWriter() throws IOException {
+        if (blobWriter == null) {
+            return Collections.emptyList();
+        }
         blobWriter.close();
-        return blobWriter.result();
+        List<DataFileMeta> results = blobWriter.result();
+        blobWriter = null;
+        return results;
     }
 
     /** Validates that the row counts match between main and blob files. */
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index ab37159648..334fe8d015 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -57,7 +57,7 @@ public class BlobTableTest extends TableTestBase {
     public void testBasic() throws Exception {
         createTableDefault();
 
-        commitDefault(writeDataDefault(100, 1));
+        commitDefault(writeDataDefault(1000, 1));
 
         AtomicInteger integer = new AtomicInteger(0);
 
@@ -71,7 +71,7 @@ public class BlobTableTest extends TableTestBase {
 
         assertThat(fieldGroups.size()).isEqualTo(2);
         assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
-        assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
+        assertThat(fieldGroups.get(1).files().size()).isEqualTo(10);
 
         readDefault(
                 row -> {
@@ -81,7 +81,7 @@ public class BlobTableTest extends TableTestBase {
                     }
                 });
 
-        assertThat(integer.get()).isEqualTo(100);
+        assertThat(integer.get()).isEqualTo(1000);
     }
 
     @Test
@@ -104,7 +104,7 @@ public class BlobTableTest extends TableTestBase {
     public void testMultiBatch() throws Exception {
         createTableDefault();
 
-        commitDefault(writeDataDefault(100, 2));
+        commitDefault(writeDataDefault(1000, 2));
 
         AtomicInteger integer = new AtomicInteger(0);
 
@@ -120,7 +120,7 @@ public class BlobTableTest extends TableTestBase {
                     DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0);
             assertThat(fieldGroups.size()).isEqualTo(2);
             assertThat(fieldGroups.get(0).files().size()).isEqualTo(1);
-            assertThat(fieldGroups.get(1).files().size()).isEqualTo(8);
+            assertThat(fieldGroups.get(1).files().size()).isEqualTo(10);
         }
 
         readDefault(
@@ -129,7 +129,7 @@ public class BlobTableTest extends TableTestBase {
                         
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
                     }
                 });
-        assertThat(integer.get()).isEqualTo(200);
+        assertThat(integer.get()).isEqualTo(2000);
     }
 
     @Test
@@ -179,6 +179,21 @@ public class BlobTableTest extends TableTestBase {
         assertThat(i.get()).isEqualTo(1);
     }
 
+    @Test
+    public void testRolling() throws Exception {
+        createTableDefault();
+        commitDefault(writeDataDefault(1025, 1));
+        AtomicInteger integer = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    integer.incrementAndGet();
+                    if (integer.get() % 50 == 0) {
+                        
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+                    }
+                });
+        assertThat(integer.get()).isEqualTo(1025);
+    }
+
     protected Schema schemaDefault() {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
@@ -191,12 +206,13 @@ public class BlobTableTest extends TableTestBase {
     }
 
     protected InternalRow dataDefault(int time, int size) {
-        return GenericRow.of(RANDOM.nextInt(), randomString(), new 
BlobData(blobBytes));
+        return GenericRow.of(
+                RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new 
BlobData(blobBytes));
     }
 
     @Override
     protected byte[] randomBytes() {
-        byte[] binary = new byte[2 * 1024 * 1024];
+        byte[] binary = new byte[2 * 1024 * 124];
         RANDOM.nextBytes(binary);
         return binary;
     }

Reply via email to