This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e35962bb6f [core] Disable blob type force spill yet (#6427)
e35962bb6f is described below

commit e35962bb6fb718710a4972f309f75ff4da158d30
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 21 15:06:03 2025 +0800

    [core] Disable blob type force spill yet (#6427)
---
 .../paimon/operation/BaseAppendFileStoreWrite.java |   7 ++
 .../apache/paimon/append/AppendOnlyWriterTest.java | 124 +++++++++++++++++++++
 2 files changed, 131 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 98d699cb9d..ff04b2d1ed 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 import static org.apache.paimon.format.FileFormat.fileFormat;
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
 import static 
org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories;
 
 /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
@@ -78,6 +79,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     private RowType writeType;
     private @Nullable List<String> writeCols;
     private boolean forceBufferSpill = false;
+    private boolean withBlob;
 
     public BaseAppendFileStoreWrite(
             FileIO fileIO,
@@ -100,6 +102,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         this.writeCols = null;
         this.fileFormat = fileFormat(options);
         this.pathFactory = pathFactory;
+        this.withBlob = rowType.getFieldTypes().stream().anyMatch(t -> 
t.is(BLOB));
 
         this.fileIndexOptions = options.indexColumnsOptions();
     }
@@ -142,6 +145,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
     @Override
     public void withWriteType(RowType writeType) {
         this.writeType = writeType;
+        this.withBlob = writeType.getFieldTypes().stream().anyMatch(t -> 
t.is(BLOB));
         int fullCount = rowType.getFieldCount();
         List<String> fullNames = rowType.getFieldNames();
         this.writeCols = writeType.getFieldNames();
@@ -235,6 +239,9 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         if (ioManager == null) {
             return;
         }
+        if (withBlob) {
+            return;
+        }
         if (forceBufferSpill) {
             return;
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index ce3a2d4e85..62a50699b3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -21,7 +21,9 @@ package org.apache.paimon.append;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.ChannelWithMeta;
@@ -38,9 +40,15 @@ import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
@@ -62,6 +70,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -394,6 +403,69 @@ public class AppendOnlyWriterTest {
                 });
     }
 
+    @Test
+    public void testNoSpillWhenMeetBlobType() throws Exception {
+        // Create a schema with BLOB type
+        RowType blobSchema =
+                RowType.builder()
+                        .fields(
+                                new DataType[] {new IntType(), new 
VarCharType(), new BlobType()},
+                                new String[] {"id", "name", "data"})
+                        .build();
+
+        AppendOnlyFileStoreTable table =
+                (AppendOnlyFileStoreTable)
+                        FileStoreTableFactory.create(
+                                LocalFileIO.create(),
+                                pathFactory.newPath(),
+                                TableSchema.create(
+                                        0,
+                                        new Schema(
+                                                blobSchema.getFields(),
+                                                
Collections.singletonList("id"),
+                                                Collections.emptyList(),
+                                                new HashMap<String, String>() {
+                                                    {
+                                                        put(
+                                                                
CoreOptions.DATA_EVOLUTION_ENABLED
+                                                                        .key(),
+                                                                "true");
+                                                        put(
+                                                                
CoreOptions.ROW_TRACKING_ENABLED
+                                                                        .key(),
+                                                                "true");
+                                                    }
+                                                },
+                                                "")));
+        BaseAppendFileStoreWrite writer = table.store().newWrite("test");
+        writer.withIOManager(IOManager.create(tempDir.toString()));
+        writer.withMemoryPoolFactory(
+                new MemoryPoolFactory(new HeapMemorySegmentPool(16384L, 
1024)));
+
+        char[] largeString = new char[990];
+        Arrays.fill(largeString, 'a');
+        byte[] largeBlobData = new byte[1024];
+        Arrays.fill(largeBlobData, (byte) 'b');
+
+        BinaryRow binaryRow = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+        for (int j = 0; j < 100; j++) {
+            binaryRowWriter.reset();
+            binaryRowWriter.writeInt(0, j);
+            binaryRowWriter.complete();
+            writer.write(
+                    binaryRow, 0, createBlobRow(j, 
String.valueOf(largeString), largeBlobData));
+        }
+
+        binaryRowWriter.reset();
+        binaryRowWriter.writeInt(0, 1000);
+        binaryRowWriter.complete();
+        AppendOnlyWriter appendOnlyWriter = (AppendOnlyWriter) 
writer.createWriter(binaryRow, 0);
+        RowBuffer buffer = appendOnlyWriter.getWriteBuffer();
+        assertThat(buffer).isNull();
+        writer.close();
+    }
+
     @Test
     public void testNoBuffer() throws Exception {
         AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE);
@@ -686,4 +758,56 @@ public class AppendOnlyWriterTest {
                 null,
                 null);
     }
+
+    private InternalRow createBlobRow(int id, String name, byte[] blobData) {
+        return GenericRow.of(id, BinaryString.fromString(name), new 
BlobData(blobData));
+    }
+
+    private AppendOnlyWriter createWriterWithBlobSchema(
+            RowType schema, long targetFileSize, boolean spillable) {
+        FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
+        LinkedList<DataFileMeta> toCompact = new LinkedList<>();
+        BucketedAppendCompactManager compactManager =
+                new BucketedAppendCompactManager(
+                        Executors.newSingleThreadScheduledExecutor(
+                                new 
ExecutorThreadFactory("compaction-thread")),
+                        toCompact,
+                        null,
+                        MIN_FILE_NUM,
+                        targetFileSize,
+                        false,
+                        compactBefore -> Collections.emptyList(),
+                        null);
+        CoreOptions options =
+                new 
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
+        AppendOnlyWriter writer =
+                new AppendOnlyWriter(
+                        LocalFileIO.create(),
+                        IOManager.create(tempDir.toString()),
+                        SCHEMA_ID,
+                        fileFormat,
+                        targetFileSize,
+                        schema,
+                        null,
+                        getMaxSequenceNumber(toCompact),
+                        compactManager,
+                        files -> {
+                            throw new RuntimeException("Can't read back in 
blob mode");
+                        },
+                        false,
+                        pathFactory,
+                        null,
+                        false,
+                        spillable,
+                        CoreOptions.FILE_COMPRESSION.defaultValue(),
+                        CompressOptions.defaultOptions(),
+                        new StatsCollectorFactories(options),
+                        MemorySize.MAX_VALUE,
+                        new FileIndexOptions(),
+                        true,
+                        false);
+        writer.setMemoryPool(
+                new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
+        return writer;
+    }
 }

Reply via email to