This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e35962bb6f [core] Disable blob type force spill yet (#6427)
e35962bb6f is described below
commit e35962bb6fb718710a4972f309f75ff4da158d30
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 21 15:06:03 2025 +0800
[core] Disable blob type force spill yet (#6427)
---
.../paimon/operation/BaseAppendFileStoreWrite.java | 7 ++
.../apache/paimon/append/AppendOnlyWriterTest.java | 124 +++++++++++++++++++++
2 files changed, 131 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 98d699cb9d..ff04b2d1ed 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -59,6 +59,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import static org.apache.paimon.format.FileFormat.fileFormat;
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
import static
org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories;
/** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
@@ -78,6 +79,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
private RowType writeType;
private @Nullable List<String> writeCols;
private boolean forceBufferSpill = false;
+ private boolean withBlob;
public BaseAppendFileStoreWrite(
FileIO fileIO,
@@ -100,6 +102,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
this.writeCols = null;
this.fileFormat = fileFormat(options);
this.pathFactory = pathFactory;
+ this.withBlob = rowType.getFieldTypes().stream().anyMatch(t ->
t.is(BLOB));
this.fileIndexOptions = options.indexColumnsOptions();
}
@@ -142,6 +145,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
@Override
public void withWriteType(RowType writeType) {
this.writeType = writeType;
+ this.withBlob = writeType.getFieldTypes().stream().anyMatch(t ->
t.is(BLOB));
int fullCount = rowType.getFieldCount();
List<String> fullNames = rowType.getFieldNames();
this.writeCols = writeType.getFieldNames();
@@ -235,6 +239,9 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
if (ioManager == null) {
return;
}
+ if (withBlob) {
+ return;
+ }
if (forceBufferSpill) {
return;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index ce3a2d4e85..62a50699b3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -21,7 +21,9 @@ package org.apache.paimon.append;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.ChannelWithMeta;
@@ -38,9 +40,15 @@ import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStatsConverter;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
@@ -62,6 +70,7 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -394,6 +403,69 @@ public class AppendOnlyWriterTest {
});
}
+ @Test
+ public void testNoSpillWhenMeetBlobType() throws Exception {
+ // Create a schema with BLOB type
+ RowType blobSchema =
+ RowType.builder()
+ .fields(
+ new DataType[] {new IntType(), new
VarCharType(), new BlobType()},
+ new String[] {"id", "name", "data"})
+ .build();
+
+ AppendOnlyFileStoreTable table =
+ (AppendOnlyFileStoreTable)
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ pathFactory.newPath(),
+ TableSchema.create(
+ 0,
+ new Schema(
+ blobSchema.getFields(),
+
Collections.singletonList("id"),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(
+
CoreOptions.DATA_EVOLUTION_ENABLED
+ .key(),
+ "true");
+ put(
+
CoreOptions.ROW_TRACKING_ENABLED
+ .key(),
+ "true");
+ }
+ },
+ "")));
+ BaseAppendFileStoreWrite writer = table.store().newWrite("test");
+ writer.withIOManager(IOManager.create(tempDir.toString()));
+ writer.withMemoryPoolFactory(
+ new MemoryPoolFactory(new HeapMemorySegmentPool(16384L,
1024)));
+
+ char[] largeString = new char[990];
+ Arrays.fill(largeString, 'a');
+ byte[] largeBlobData = new byte[1024];
+ Arrays.fill(largeBlobData, (byte) 'b');
+
+ BinaryRow binaryRow = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+ for (int j = 0; j < 100; j++) {
+ binaryRowWriter.reset();
+ binaryRowWriter.writeInt(0, j);
+ binaryRowWriter.complete();
+ writer.write(
+ binaryRow, 0, createBlobRow(j,
String.valueOf(largeString), largeBlobData));
+ }
+
+ binaryRowWriter.reset();
+ binaryRowWriter.writeInt(0, 1000);
+ binaryRowWriter.complete();
+ AppendOnlyWriter appendOnlyWriter = (AppendOnlyWriter)
writer.createWriter(binaryRow, 0);
+ RowBuffer buffer = appendOnlyWriter.getWriteBuffer();
+ assertThat(buffer).isNull();
+ writer.close();
+ }
+
@Test
public void testNoBuffer() throws Exception {
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE);
@@ -686,4 +758,56 @@ public class AppendOnlyWriterTest {
null,
null);
}
+
+ private InternalRow createBlobRow(int id, String name, byte[] blobData) {
+ return GenericRow.of(id, BinaryString.fromString(name), new
BlobData(blobData));
+ }
+
+ private AppendOnlyWriter createWriterWithBlobSchema(
+ RowType schema, long targetFileSize, boolean spillable) {
+ FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
+ LinkedList<DataFileMeta> toCompact = new LinkedList<>();
+ BucketedAppendCompactManager compactManager =
+ new BucketedAppendCompactManager(
+ Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorThreadFactory("compaction-thread")),
+ toCompact,
+ null,
+ MIN_FILE_NUM,
+ targetFileSize,
+ false,
+ compactBefore -> Collections.emptyList(),
+ null);
+ CoreOptions options =
+ new
CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
+ AppendOnlyWriter writer =
+ new AppendOnlyWriter(
+ LocalFileIO.create(),
+ IOManager.create(tempDir.toString()),
+ SCHEMA_ID,
+ fileFormat,
+ targetFileSize,
+ schema,
+ null,
+ getMaxSequenceNumber(toCompact),
+ compactManager,
+ files -> {
+ throw new RuntimeException("Can't read back in
blob mode");
+ },
+ false,
+ pathFactory,
+ null,
+ false,
+ spillable,
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ CompressOptions.defaultOptions(),
+ new StatsCollectorFactories(options),
+ MemorySize.MAX_VALUE,
+ new FileIndexOptions(),
+ true,
+ false);
+ writer.setMemoryPool(
+ new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
+ return writer;
+ }
}