This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5ce4e018b2 [core] introduce Placeholder for Blob File Format (#7889)
5ce4e018b2 is described below
commit 5ce4e018b2758040c46feb135e47f834fefd1bd5
Author: Faiz <[email protected]>
AuthorDate: Tue May 26 22:09:05 2026 +0800
[core] introduce Placeholder for Blob File Format (#7889)
---
.../org/apache/paimon/data/BlobPlaceholder.java | 59 +++
.../paimon/reader/DataEvolutionFileReader.java | 2 +-
.../paimon/operation/BlobFallbackRecordReader.java | 428 +++++++++++++++++
.../paimon/operation/DataEvolutionSplitRead.java | 270 ++++++++---
.../org/apache/paimon/append/BlobUpdateTest.java | 532 +++++++++++++++++++++
.../operation/BlobFallbackRecordReaderTest.java | 396 +++++++++++++++
.../paimon/operation/DataEvolutionReadTest.java | 340 ++++++++-----
.../paimon/table/AppendOnlySimpleTableTest.java | 21 +-
.../paimon/table/PrimaryKeySimpleTableTest.java | 20 +-
.../apache/paimon/format/blob/BlobFileMeta.java | 8 +-
.../paimon/format/blob/BlobFormatReader.java | 3 +
.../paimon/format/blob/BlobFormatWriter.java | 9 +-
.../paimon/format/blob/BlobFileFormatTest.java | 40 +-
.../pypaimon/read/reader/format_blob_reader.py | 18 +-
paimon-python/pypaimon/table/row/blob.py | 15 +
paimon-python/pypaimon/tests/blob_test.py | 75 ++-
paimon-python/pypaimon/write/blob_format_writer.py | 10 +-
17 files changed, 2002 insertions(+), 244 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BlobPlaceholder.java
b/paimon-common/src/main/java/org/apache/paimon/data/BlobPlaceholder.java
new file mode 100644
index 0000000000..7a693962cb
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobPlaceholder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.fs.SeekableInputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The placeholder blob, mainly for blob update in data-evolution. It should
never be exposed to
+ * users.
+ */
+public class BlobPlaceholder implements Blob, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final BlobPlaceholder INSTANCE = new BlobPlaceholder();
+
+ private BlobPlaceholder() {}
+
+ private Object readResolve() {
+ return INSTANCE;
+ }
+
+ @Override
+ public byte[] toData() {
+ throw new UnsupportedOperationException(
+ "Should never call this method for placeholder blob.");
+ }
+
+ @Override
+ public BlobDescriptor toDescriptor() {
+ throw new UnsupportedOperationException(
+ "Should never call this method for placeholder blob.");
+ }
+
+ @Override
+ public SeekableInputStream newInputStream() throws IOException {
+ throw new UnsupportedOperationException(
+ "Should never call this method for placeholder blob.");
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
index 1c50410fa1..53766440ff 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java
@@ -66,7 +66,7 @@ public class DataEvolutionFileReader implements
RecordReader<InternalRow> {
rowOffsets.length == fieldOffsets.length,
"Row offsets and field offsets must have the same length");
checkArgument(rowOffsets.length > 0, "Row offsets must not be empty");
- checkArgument(readers != null && readers.length > 1, "Readers should
be more than 1");
+ checkArgument(readers != null && readers.length >= 1, "should not pass
empty readers.");
this.rowOffsets = rowOffsets;
this.fieldOffsets = fieldOffsets;
this.readers = readers;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
new file mode 100644
index 0000000000..86f555748f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.append.ForceSingleBatchReader;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Collections.reverseOrder;
+import static java.util.Comparator.comparingLong;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Resolves blob placeholder rows by falling back through older sequence
groups. The read logic is
+ * as below:
+ *
+ * <ol>
+ * <li>Group files by max-seq, higher-seq files will have newer records.
+ * <li>Sort files by range within each group, and create sequential readers
for them. Note that
+ * absent ranges will be read as all-placeholder rows.
+ * <li>Sort by max-seq and merge read all readers, records at each index
will be the first
+ * non-placeholder blob.
+ * </ol>
+ */
+public class BlobFallbackRecordReader implements RecordReader<InternalRow> {
+
+ private final List<RecordReader<InternalRow>> groupReaders = new
ArrayList<>();
+ private final int blobIndex;
+ private boolean returned;
+
+ BlobFallbackRecordReader(
+ List<DataFileMeta> files,
+ BlobFileReaderFactory readerFactory,
+ List<Range> rowRanges,
+ RowType readRowType,
+ int blobIndex) {
+ this.blobIndex = blobIndex;
+
+ checkArgument(!files.isEmpty(), "Blob bunch should not be empty.");
+ long firstRowId = Long.MAX_VALUE;
+ long lastRowId = Long.MIN_VALUE;
+
+ // sort group readers in descending order
+ Map<Long, List<DataFileMeta>> sequenceGroups = new
TreeMap<>(reverseOrder());
+ for (DataFileMeta file : files) {
+ Range fileRange = file.nonNullRowIdRange();
+ firstRowId = Math.min(firstRowId, fileRange.from);
+ lastRowId = Math.max(lastRowId, fileRange.to);
+
+ sequenceGroups
+ .computeIfAbsent(file.maxSequenceNumber(), ignored -> new
ArrayList<>())
+ .add(file);
+ }
+
+ for (Map.Entry<Long, List<DataFileMeta>> entry :
sequenceGroups.entrySet()) {
+ // within each group, sort by first row id
+ List<DataFileMeta> groupFiles = entry.getValue();
+ groupFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId));
+
+ DataFileMeta current, next;
+ for (int i = 0; i < groupFiles.size() - 1; i++) {
+ current = groupFiles.get(i);
+ next = groupFiles.get(i + 1);
+
+ Preconditions.checkState(
+
!current.nonNullRowIdRange().hasIntersection(next.nonNullRowIdRange()),
+ "Blob files within a same max_seq_num should not
overlap. Find: %s, %s",
+ current,
+ next);
+ }
+
+ groupReaders.add(
+ new ForceSingleBatchReader(
+ new BlobSequenceGroupRecordReader(
+ groupFiles,
+ readerFactory,
+ rowRanges,
+ readRowType,
+ blobIndex,
+ firstRowId,
+ lastRowId)));
+ }
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<InternalRow> readBatch() throws IOException {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+
+ // all readers are forced returning single batch
+ RecordIterator<InternalRow>[] iterators = new
RecordIterator[groupReaders.size()];
+ for (int i = 0; i < groupReaders.size(); i++) {
+ RecordIterator<InternalRow> iterator =
groupReaders.get(i).readBatch();
+ if (iterator == null) {
+ if (i != 0) {
+ throw new IllegalStateException("All readers should be a
single batch reader.");
+ }
+ for (int j = i + 1; j < groupReaders.size(); j++) {
+ if (groupReaders.get(j).readBatch() != null) {
+ throw new IllegalStateException(
+ "All readers should be a single batch
reader.");
+ }
+ }
+ return null;
+ }
+ iterators[i] = iterator;
+ }
+
+ return new RecordIterator<InternalRow>() {
+ @Nullable
+ @Override
+ public InternalRow next() throws IOException {
+ InternalRow result = null;
+ // We should always move each iterator forward
+ // This may significantly increase memory usage and decrease
read efficiency
+ // if `blob-as-descriptor` is disabled and many non-null blobs
are updated
+ // TODO: Do not read stale records if there's a newer
non-placeholder
+ // record. e.g. introduce a discard method to directly
discard the
+ // next record?
+ for (int i = 0; i < iterators.length; i++) {
+ RecordIterator<InternalRow> iterator = iterators[i];
+ InternalRow row = iterator.next();
+ if (row == null) {
+ if (i != 0) {
+ throw new IllegalStateException(
+ "All readers of each max_seq group should
have the same number of records.");
+ }
+ for (int j = i + 1; j < iterators.length; j++) {
+ if (iterators[j].next() != null) {
+ throw new IllegalStateException(
+ "All readers of each max_seq group
should have the same number of records.");
+ }
+ }
+ return null;
+ }
+ // result is the first non-placeholder record
+ if (result == null && !isPlaceHolder(row)) {
+ result = row;
+ }
+ }
+ if (result == null) {
+ throw new IllegalStateException(
+ "Invalid state: all blob files at the same row id
store a placeholder, it's a bug.");
+ }
+ return result;
+ }
+
+ @Override
+ public void releaseBatch() {
+ for (RecordIterator<InternalRow> iterator : iterators) {
+ iterator.releaseBatch();
+ }
+ }
+ };
+ }
+
+ private boolean isPlaceHolder(InternalRow row) {
+ return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) ==
BlobPlaceholder.INSTANCE;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException exception = null;
+ for (RecordReader<InternalRow> reader : groupReaders) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ if (exception == null) {
+ exception = e;
+ } else {
+ exception.addSuppressed(e);
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ /**
+ * Reads one blob sequence group (all blob files with the same
max_seq_num) and emits
+ * placeholder rows for row id gaps. For example, if the full row range is
[0, 100], but there's
+ * only one blob file with row range [20, 80], then the rows with row id
[0, 19] and [81, 100]
+ * will be emitted as placeholder rows.
+ *
+ * <p>This reader should always be fully consumed, or the internal states
may be broken.
+ *
+ * <p>Note that we can not simply concat all data files and read them,
even though we guarantee
+ * writing the full-range data during data-evolution. The complexity is
introduced by row-level
+ * compaction. For example, if we execute following operations:
+ *
+ * <ol>
+ * <li>Write [0, 100], generate files [0, 50], [51, 100]
+ * <li>Update [0, 100] files, generate files [0, 25], [26, 75], [76, 100]
+ * <li>Insert new blobs for range [101, 200], generate files [101, 200]
+ * <li>Update new blobs for range [101, 200], generate files [101, 150],
[151, 200]
+ * <li>Compact, merge [0, 100], [101, 200] to a single range
+ * <li>Update the compacted files, generate files [0, 200]
+ * </ol>
+ *
+ * <p>The data files layout would be:
+ *
+ * <pre>
+ * |<----------------------- merged range: row 0 ~ 200
--------------------------------->|
+ * |
|
+ * ┌─────────────────────────┐┌──────────────────────┐
+ * seq1: │ file1 (0~50) ││ file2 (51~100) │
(empty on 101~200)
+ * └─────────────────────────┘└──────────────────────┘
+ * ┌─────────────┐┌──────────────────────┐┌──────────┐
+ * seq2: │ file3(0~25) ││ file4 (26~75) ││f5(76~100)│
(empty on 101~200)
+ * └─────────────┘└──────────────────────┘└──────────┘
+ *
┌────────────────────────────────────┐
+ * seq3: (empty on 0~100) │
file6 (101~200) │
+ *
└────────────────────────────────────┘
+ *
┌─────────────────┐┌─────────────────┐
+ * seq4: (empty on 0~100) │ file7
(101~150) ││ file8 (151~200) │
+ *
└─────────────────┘└─────────────────┘
+ *
┌───────────────────────────────────────────────────────────────────────────────────────┐
+ * seq6: │ file9 (0~200)
│
+ *
└───────────────────────────────────────────────────────────────────────────────────────┘
+ * </pre>
+ *
+ * <p>We treat all gaps as full-placeholders, and correctly resolve
pushed-ranges.
+ */
+ public static class BlobSequenceGroupRecordReader implements
RecordReader<InternalRow> {
+
+ private final List<DataFileMeta> files;
+ private final BlobFileReaderFactory readerFactory;
+ // pushed row ranges
+ private final List<Range> rowRanges;
+ private final RowType readRowType;
+ private final int blobIndex;
+ private final long lastRowId;
+
+ private RecordReader<InternalRow> currentReader;
+ private DataFileMeta currentFile;
+ private int nextFileIndex;
+ private int nextRowRangeIndex;
+ // expected next row id
+ private long nextRowId;
+
+ private InternalRow placeholderRow;
+
+ BlobSequenceGroupRecordReader(
+ List<DataFileMeta> files,
+ BlobFileReaderFactory readerFactory,
+ List<Range> rowRanges,
+ RowType readRowType,
+ int blobIndex,
+ long firstRowId,
+ long lastRowId) {
+ this.files = files;
+ this.readerFactory = readerFactory;
+ this.rowRanges = rowRanges == null ? null :
Range.sortAndMergeOverlap(rowRanges);
+ this.readRowType = readRowType;
+ this.blobIndex = blobIndex;
+ this.lastRowId = lastRowId;
+
+ this.nextFileIndex = 0;
+ this.nextRowRangeIndex = 0;
+ setNextRowId(firstRowId);
+
+ this.placeholderRow = null;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<InternalRow> readBatch() throws IOException {
+ while (true) {
+ if (currentReader != null) {
+ RecordIterator<InternalRow> batch =
currentReader.readBatch();
+ if (batch != null) {
+ return batch;
+ }
+ // row ranges have been pushed to readers
+ // directly set nextRowId as the lastRowId + 1
+ setNextRowId(lastRowId(currentFile) + 1);
+ closeCurrentFileReader();
+ continue;
+ }
+
+ if (nextRowId > lastRowId) {
+ return null;
+ }
+
+ // skip files whose ranges are before nextRowId
+ while (nextFileIndex < files.size()
+ && lastRowId(files.get(nextFileIndex)) < nextRowId) {
+ nextFileIndex++;
+ }
+ if (nextFileIndex >= files.size()) {
+ return placeHolderBatch(lastRowId);
+ }
+
+ DataFileMeta nextFile = files.get(nextFileIndex);
+ if (nextFile.nonNullFirstRowId() > nextRowId) {
+ return placeHolderBatch(nextFile.nonNullFirstRowId() - 1);
+ }
+
+ createReader(nextFile);
+ }
+ }
+
+ /**
+ * Set nextRowId and try to move to the next selected row id. So the
final nextRowId may be
+ * greater than the input value.
+ */
+ private void setNextRowId(long nextRowId) {
+ this.nextRowId = nextRowId;
+ tryMoveToSelectedRow();
+ }
+
+ private void tryMoveToSelectedRow() {
+ if (nextRowId > lastRowId || rowRanges == null) {
+ return;
+ }
+
+ while (nextRowRangeIndex < rowRanges.size()) {
+ Range range = rowRanges.get(nextRowRangeIndex);
+ if (nextRowId >= range.from && nextRowId <= range.to) {
+ // if nextRowId is within the range, do not need to move
+ return;
+ } else if (nextRowId < range.from) {
+ // else if nextRowId < next range, move to next range's
`from`
+ nextRowId = range.from;
+ return;
+ }
+ // else nextRowId > range.to, try next range
+ nextRowRangeIndex++;
+ }
+
+ // all ranges consumed, no need to read
+ nextRowId = lastRowId + 1;
+ }
+
+ private RecordIterator<InternalRow> placeHolderBatch(long endRowId) {
+ return new RecordIterator<InternalRow>() {
+ long rowId;
+
+ @Nullable
+ @Override
+ public InternalRow next() {
+ rowId = nextRowId;
+ if (rowId > endRowId) {
+ return null;
+ }
+ setNextRowId(rowId + 1);
+ return placeHolderRow();
+ }
+
+ @Override
+ public void releaseBatch() {
+ // nothing to release
+ }
+ };
+ }
+
+ private InternalRow placeHolderRow() {
+ if (placeholderRow == null) {
+ GenericRow row = new GenericRow(readRowType.getFieldCount());
+ row.setField(blobIndex, BlobPlaceholder.INSTANCE);
+ placeholderRow = row;
+ }
+ return placeholderRow;
+ }
+
+ private long lastRowId(DataFileMeta file) {
+ return file.nonNullFirstRowId() + file.rowCount() - 1;
+ }
+
+ private void closeCurrentFileReader() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ currentFile = null;
+ }
+
+ private void createReader(DataFileMeta nextFile) throws IOException {
+ currentFile = nextFile;
+ currentReader = readerFactory.create(nextFile);
+ nextFileIndex++;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeCurrentFileReader();
+ }
+ }
+
+ /** Factory to create readers. */
+ interface BlobFileReaderFactory {
+ RecordReader<InternalRow> create(DataFileMeta file) throws IOException;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 0df1b1428e..145fdc9ad4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -46,10 +46,12 @@ import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
import org.apache.paimon.utils.FormatReaderMapping.Builder;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.RoaringBitmap32;
@@ -228,7 +230,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
||
isVectorStoreFile(file.fileName()),
"Only blob/vector-store files need to call
this method.");
return
schemaFetcher.apply(file.schemaId()).logicalRowType();
- });
+ },
+ rowRanges != null);
long rowCount = fieldsFiles.get(0).rowCount();
long firstRowId =
fieldsFiles.get(0).files().get(0).nonNullFirstRowId();
@@ -302,15 +305,16 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
dataSchema,
readFields,
false));
+ RowType partialReadRowType = new RowType(readFields);
fileRecordReaders[i] =
new ForceSingleBatchReader(
- createFileReader(
+ createFieldBunchReader(
partition,
bunch,
dataFilePathFactory,
formatReaderMapping,
rowRanges,
- readRowType));
+ partialReadRowType));
}
}
@@ -327,31 +331,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
return new DataEvolutionFileReader(rowOffsets, fieldOffsets,
fileRecordReaders);
}
- private FileRecordReader<InternalRow> createFileReader(
- BinaryRow partition,
- DataFilePathFactory dataFilePathFactory,
- DataFileMeta file,
- Builder formatBuilder,
- List<Range> rowRanges,
- RowType readRowType)
- throws IOException {
- String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
- long schemaId = file.schemaId();
- FormatReaderMapping formatReaderMapping =
- formatReaderMappings.computeIfAbsent(
- new FormatKey(file.schemaId(), formatIdentifier),
- key ->
- formatBuilder.build(
- formatIdentifier,
- schema,
- schemaId == schema.id()
- ? schema
- :
schemaFetcher.apply(schemaId)));
- return createFileReader(
- partition, file, dataFilePathFactory, formatReaderMapping,
rowRanges, readRowType);
- }
-
- private RecordReader<InternalRow> createFileReader(
+ private RecordReader<InternalRow> createFieldBunchReader(
BinaryRow partition,
FieldBunch bunch,
DataFilePathFactory dataFilePathFactory,
@@ -359,7 +339,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
List<Range> rowRanges,
RowType readRowType)
throws IOException {
- if (bunch.files().size() == 1) {
+ if (bunch instanceof DataBunch) {
+ // for data bunch, directly read the single file
return createFileReader(
partition,
bunch.files().get(0),
@@ -367,9 +348,51 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
formatReaderMapping,
rowRanges,
readRowType);
+ } else if (bunch instanceof VectorFileBunch) {
+ // for vector bunch, sequential read all data files and concat them
+ return sequentialReadFiles(
+ bunch.files(), partition, dataFilePathFactory,
formatReaderMapping, rowRanges);
+ } else if (bunch instanceof BlobFileBunch) {
+ // for blob bunch, fallback on placeholders
+
+ // fast path: only contains one max_seq group
+ if (((BlobFileBunch) bunch).sequentialReadOptimize()) {
+ return sequentialReadFiles(
+ bunch.files(),
+ partition,
+ dataFilePathFactory,
+ formatReaderMapping,
+ rowRanges);
+ }
+ int blobIndex = findBlobFieldIndex(readRowType);
+ checkArgument(blobIndex >= 0, "Blob bunch read type should contain
a blob field.");
+ return new BlobFallbackRecordReader(
+ bunch.files(),
+ file ->
+ createFileReader(
+ partition,
+ file,
+ dataFilePathFactory,
+ formatReaderMapping,
+ rowRanges,
+ readRowType),
+ rowRanges,
+ readRowType,
+ blobIndex);
+ } else {
+ throw new UnsupportedOperationException("Unsupported bunch type: "
+ bunch);
}
+ }
+
+ private RecordReader<InternalRow> sequentialReadFiles(
+ List<DataFileMeta> files,
+ BinaryRow partition,
+ DataFilePathFactory dataFilePathFactory,
+ FormatReaderMapping formatReaderMapping,
+ List<Range> rowRanges)
+ throws IOException {
List<ReaderSupplier<InternalRow>> readerSuppliers = new ArrayList<>();
- for (DataFileMeta file : bunch.files()) {
+ for (DataFileMeta file : files) {
RoaringBitmap32 selection = file.toFileSelection(rowRanges);
FormatReaderContext formatReaderContext =
new FormatReaderContext(
@@ -394,6 +417,39 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
return ConcatRecordReader.create(readerSuppliers);
}
+ private static int findBlobFieldIndex(RowType rowType) {
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private FileRecordReader<InternalRow> createFileReader(
+ BinaryRow partition,
+ DataFilePathFactory dataFilePathFactory,
+ DataFileMeta file,
+ Builder formatBuilder,
+ List<Range> rowRanges,
+ RowType readRowType)
+ throws IOException {
+ String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
+ long schemaId = file.schemaId();
+ FormatReaderMapping formatReaderMapping =
+ formatReaderMappings.computeIfAbsent(
+ new FormatKey(file.schemaId(), formatIdentifier),
+ key ->
+ formatBuilder.build(
+ formatIdentifier,
+ schema,
+ schemaId == schema.id()
+ ? schema
+ :
schemaFetcher.apply(schemaId)));
+ return createFileReader(
+ partition, file, dataFilePathFactory, formatReaderMapping,
rowRanges, readRowType);
+ }
+
private FileRecordReader<InternalRow> createFileReader(
BinaryRow partition,
DataFileMeta file,
@@ -433,8 +489,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
Function<DataFileMeta, RowType> fileToRowType,
boolean rowIdPushDown) {
List<FieldBunch> fieldsFiles = new ArrayList<>();
- Map<Integer, SpecialFieldBunch> blobBunchMap = new HashMap<>();
- Map<VectorStoreBunchKey, SpecialFieldBunch> vectorStoreBunchMap = new
TreeMap<>();
+ Map<Integer, BlobFileBunch> blobBunchMap = new HashMap<>();
+ Map<VectorStoreBunchKey, VectorFileBunch> vectorStoreBunchMap = new
TreeMap<>();
long rowCount = -1;
for (DataFileMeta file : needMergeFiles) {
if (isBlobFile(file.fileName())) {
@@ -443,8 +499,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
final long expectedRowCount = rowCount;
blobBunchMap
.computeIfAbsent(
- fieldId,
- key -> new SpecialFieldBunch(expectedRowCount,
rowIdPushDown))
+ fieldId, key -> new
BlobFileBunch(expectedRowCount, rowIdPushDown))
.add(file);
} else if (isVectorStoreFile(file.fileName())) {
RowType rowType = fileToRowType.apply(file);
@@ -456,7 +511,7 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
vectorStoreBunchMap
.computeIfAbsent(
vectorStoreKey,
- key -> new SpecialFieldBunch(expectedRowCount,
rowIdPushDown))
+ key -> new VectorFileBunch(expectedRowCount,
rowIdPushDown))
.add(file);
} else {
// Normal file, just add it to the current merge split
@@ -496,8 +551,84 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
}
}
+ /**
+ * The {@link FieldBunch} for blobs. Compared to {@link VectorFileBunch}
which only contains
+ * data files of the max max_seq number, {@link BlobFileBunch} contains
all blob files.
+ */
@VisibleForTesting
- static class SpecialFieldBunch implements FieldBunch {
+ static class BlobFileBunch implements FieldBunch {
+
+ final List<DataFileMeta> files;
+ final List<Range> ranges;
+ final long expectedRowCount;
+ final boolean rowIdPushdown;
+
+ BlobFileBunch(long expectedRowCount, boolean rowIdPushdown) {
+ this.files = new ArrayList<>();
+ this.expectedRowCount = expectedRowCount;
+ this.ranges = new ArrayList<>();
+ this.rowIdPushdown = rowIdPushdown;
+ }
+
+ void add(DataFileMeta file) {
+ if (!isBlobFile(file.fileName())) {
+ throw new IllegalArgumentException("Only blob file can be
added to this bunch.");
+ }
+ if (!files.isEmpty()) {
+ checkArgument(
+ file.writeCols().equals(files.get(0).writeCols()),
+ "All files in this bunch should have the same write
columns.");
+ }
+
+ files.add(file);
+ ranges.add(file.nonNullRowIdRange());
+ }
+
+ @Override
+ public long rowCount() {
+ List<Range> merged = Range.sortAndMergeOverlap(ranges, true);
+ if (!rowIdPushdown) {
+ Preconditions.checkState(
+ merged.size() == 1,
+ "Blob file bunch should always contain a contiguous
row range.");
+
+ long rowCount = merged.get(0).count();
+ if (expectedRowCount >= 0) {
+ Preconditions.checkState(
+ rowCount == expectedRowCount,
+ "The merged rowCount %s of blob file bunch should
be aligned with normal files %s.",
+ rowCount,
+ expectedRowCount);
+ }
+ }
+
+ return merged.stream().mapToLong(Range::count).sum();
+ }
+
+ public boolean sequentialReadOptimize() {
+ Preconditions.checkState(!files.isEmpty(), "Blob file bunch should
not be empty.");
+
+ // If blob files share the same max_seq_num, we could sequentially
read them.
+ // Files have already been sorted by first_row_id
+ long maxSeq = files.get(0).maxSequenceNumber();
+ for (int i = 1; i < files.size(); i++) {
+ if (files.get(i).maxSequenceNumber() != maxSeq) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public List<DataFileMeta> files() {
+ return files;
+ }
+ }
+
+ /** {@link FieldBunch} for vector-store files. */
+ @VisibleForTesting
+ static class VectorFileBunch implements FieldBunch {
final List<DataFileMeta> files;
final long expectedRowCount;
@@ -508,72 +639,67 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
long latestMaxSequenceNumber = -1;
long rowCount;
- SpecialFieldBunch(long expectedRowCount, boolean rowIdPushDown) {
+ VectorFileBunch(long expectedRowCount, boolean rowIdPushDown) {
this.files = new ArrayList<>();
- this.rowCount = 0;
this.expectedRowCount = expectedRowCount;
this.rowIdPushDown = rowIdPushDown;
}
void add(DataFileMeta file) {
- if (!isBlobFile(file.fileName()) &&
!isVectorStoreFile(file.fileName())) {
+ if (!isVectorStoreFile(file.fileName())) {
throw new IllegalArgumentException(
- "Only blob/vector-store file can be added to this
bunch.");
+ "Only vector-store file can be added to this bunch.");
}
-
if (file.nonNullFirstRowId() == latestFistRowId) {
if (file.maxSequenceNumber() >= latestMaxSequenceNumber) {
throw new IllegalArgumentException(
- "Blob/vector-store file with same first row id
should have decreasing sequence number.");
+ "Vector file with same first row id should have
decreasing sequence number.");
}
return;
}
+
if (!files.isEmpty()) {
long firstRowId = file.nonNullFirstRowId();
- if (rowIdPushDown) {
- if (firstRowId < expectedNextFirstRowId) {
- if (file.maxSequenceNumber() >
latestMaxSequenceNumber) {
- DataFileMeta lastFile = files.remove(files.size()
- 1);
- rowCount -= lastFile.rowCount();
- } else {
- return;
- }
- }
- } else {
- if (firstRowId < expectedNextFirstRowId) {
- checkArgument(
- file.maxSequenceNumber() <
latestMaxSequenceNumber,
- "Blob/vector-store file with overlapping row
id should have decreasing sequence number.");
+ if (rowIdPushDown && firstRowId < expectedNextFirstRowId) {
+ if (file.maxSequenceNumber() > latestMaxSequenceNumber) {
+ DataFileMeta lastFile = files.remove(files.size() - 1);
+ rowCount -= lastFile.rowCount();
+ } else {
return;
- } else if (firstRowId > expectedNextFirstRowId) {
- throw new IllegalArgumentException(
- "Blob/vector-store file first row id should be
continuous, expect "
- + expectedNextFirstRowId
- + " but got "
- + firstRowId);
}
+ } else if (firstRowId < expectedNextFirstRowId) {
+ checkArgument(
+ file.maxSequenceNumber() < latestMaxSequenceNumber,
+ "Vector file with overlapping row id should have
decreasing sequence number.");
+ return;
+ } else if (!rowIdPushDown && firstRowId >
expectedNextFirstRowId) {
+ throw new IllegalArgumentException(
+ "Vector file first row id should be continuous,
expect "
+ + expectedNextFirstRowId
+ + " but got "
+ + firstRowId);
}
+
if (!files.isEmpty()) {
- if (!isBlobFile(file.fileName())) {
- checkArgument(
- file.schemaId() == files.get(0).schemaId(),
- "All files in this bunch should have the same
schema id.");
- }
+ checkArgument(
+ file.schemaId() == files.get(0).schemaId(),
+ "All files in this bunch should have the same
schema id.");
checkArgument(
file.writeCols().equals(files.get(0).writeCols()),
"All files in this bunch should have the same
write columns.");
}
}
+
files.add(file);
rowCount += file.rowCount();
- if (expectedRowCount >= 0) {
+ if (expectedRowCount > 0) {
checkArgument(
rowCount <= expectedRowCount,
- "Blob/vector-store files row count exceed the expect "
+ expectedRowCount);
+ "Vector files row count exceed the expect " +
expectedRowCount);
}
- this.latestMaxSequenceNumber = file.maxSequenceNumber();
- this.latestFistRowId = file.nonNullFirstRowId();
- this.expectedNextFirstRowId = latestFistRowId + file.rowCount();
+ latestMaxSequenceNumber = file.maxSequenceNumber();
+ latestFistRowId = file.nonNullFirstRowId();
+ expectedNextFirstRowId = latestFistRowId + file.rowCount();
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java
new file mode 100644
index 0000000000..c8c2a7f90c
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobUpdateTest.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for blob update scenarios. */
+public class BlobUpdateTest extends TableTestBase {
+
+ private final byte[] blobBytes = randomBytes();
+
+ @Test
+ public void testReadBlobPlaceHolderFallback() throws Exception {
+ createTableDefault();
+ writeDataDefault(
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("first"), new
BlobData(blobBytes)),
+ GenericRow.of(
+ 2, BinaryString.fromString("second"), new
BlobData(blobBytes)),
+ GenericRow.of(
+ 3, BinaryString.fromString("third"), new
BlobData(blobBytes))));
+
+ FileStoreTable table = getTableDefault();
+ byte[] updatedBytes = "updated-blob".getBytes();
+ DataFilePathFactory pathFactory =
+
table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+ DataFileMeta newBlobFile =
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(BlobPlaceholder.INSTANCE, new
BlobData(updatedBytes)),
+ 0,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2"));
+ commitBlobFiles(newBlobFile);
+
+ List<byte[]> actual = new ArrayList<>();
+ ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+ assertThat(actual.size()).isEqualTo(3);
+ assertThat(actual.get(0)).isEqualTo(blobBytes);
+ assertThat(actual.get(1)).isEqualTo(updatedBytes);
+ assertThat(actual.get(2)).isEqualTo(blobBytes);
+ }
+
+ @Test
+ public void testReadBlobPlaceHolderFallbackWithRowIdPushDown() throws
Exception {
+ createTableDefault();
+
+ List<byte[]> originalBlobs = new ArrayList<>();
+ List<InternalRow> rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ byte[] bytes = fixedBlobBytes(i);
+ originalBlobs.add(bytes);
+ rows.add(GenericRow.of(i, BinaryString.fromString("row-" + i), new
BlobData(bytes)));
+ }
+ writeDataDefault(rows);
+
+ FileStoreTable table = getTableDefault();
+ assertBlobFileRowIdRanges(
+ table,
+ Arrays.asList(
+ new Range(0L, 2L),
+ new Range(3L, 5L),
+ new Range(6L, 8L),
+ new Range(9L, 9L)));
+
+ DataFilePathFactory pathFactory =
+
table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+ byte[] updated4 = fixedBlobBytes(44);
+ byte[] updated9 = fixedBlobBytes(99);
+ DataFileMeta updateFile0 =
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ new BlobData(updated4)),
+ 0,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2"));
+ DataFileMeta updateFile1 =
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ new BlobData(updated9)),
+ 5,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2"));
+ commitBlobFiles(updateFile0, updateFile1);
+
+ FileStoreTable readTable = getTableDefault();
+ ReadBuilder readBuilder =
+ readTable
+ .newReadBuilder()
+
.withReadType(readTable.rowType().project(Collections.singletonList("f2")))
+ .withRowRanges(Arrays.asList(new Range(5L, 5L), new
Range(9L, 9L)));
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ assertThat(plan.splits().size()).isEqualTo(1);
+ DataSplit dataSplit = ((IndexedSplit)
plan.splits().get(0)).dataSplit();
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(3);
+ RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(plan);
+
+ List<byte[]> actual = new ArrayList<>();
+ reader.forEachRemaining(row -> actual.add(row.getBlob(0).toData()));
+
+ assertThat(actual.size()).isEqualTo(2);
+ assertThat(actual.get(0)).isEqualTo(originalBlobs.get(5));
+ assertThat(actual.get(1)).isEqualTo(updated9);
+ }
+
+ /**
+ * This test manually simulates the compacted layout described in
BlobSequenceGroupRecordReader,
+ * scaled down to row id [0, 9].
+ *
+ * <pre>
+ * row id: 0 1 2 3 4 5 6 7 8 9
+ * seq1: [b0 b1 b2] [b3 b4] . . . . .
+ * seq2: [u20 P] [P u23][u24] . . . . .
+ * seq3: . . . . . [b5 b6 b7 b8 b9]
+ * seq4: . . . . . [P u46 P] [u48 P]
+ * seq6: [P u61 P P P P P P P u69]
+ *
+ * RESULT: u20 u61 b2 u23 u24 b5 u46 b7 u48 u69
+ * </pre>
+ */
+ @Test
+ public void testReadCompactedBlobSequenceGroups() throws Exception {
+ createTableDefault();
+
+ List<byte[]> baseBlobs = new ArrayList<>();
+ List<InternalRow> rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ byte[] bytes = fixedBlobBytes(i);
+ baseBlobs.add(bytes);
+ rows.add(GenericRow.of(i, BinaryString.fromString("row-" + i), new
BlobData(bytes)));
+ }
+ writeDataDefault(rows);
+
+ FileStoreTable table = getTableDefault();
+ List<DataFileMeta> oldBlobFiles = blobFiles(table);
+ assertThat(oldBlobFiles.size()).isEqualTo(4);
+
+ byte[] seq2Row0 = fixedBlobBytes(20);
+ byte[] seq2Row3 = fixedBlobBytes(23);
+ byte[] seq2Row4 = fixedBlobBytes(24);
+ byte[] seq4Row6 = fixedBlobBytes(46);
+ byte[] seq4Row8 = fixedBlobBytes(48);
+ byte[] seq6Row1 = fixedBlobBytes(61);
+ byte[] seq6Row9 = fixedBlobBytes(69);
+
+ DataFilePathFactory pathFactory =
+
table.store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+ List<DataFileMeta> compactedLayout =
+ Arrays.asList(
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ new BlobData(baseBlobs.get(0)),
+ new BlobData(baseBlobs.get(1)),
+ new BlobData(baseBlobs.get(2))),
+ 0,
+ 1,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ new BlobData(baseBlobs.get(3)),
+ new BlobData(baseBlobs.get(4))),
+ 3,
+ 1,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(new BlobData(seq2Row0),
BlobPlaceholder.INSTANCE),
+ 0,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(BlobPlaceholder.INSTANCE, new
BlobData(seq2Row3)),
+ 2,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Collections.singletonList(new
BlobData(seq2Row4)),
+ 4,
+ 2,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ new BlobData(baseBlobs.get(5)),
+ new BlobData(baseBlobs.get(6)),
+ new BlobData(baseBlobs.get(7)),
+ new BlobData(baseBlobs.get(8)),
+ new BlobData(baseBlobs.get(9))),
+ 5,
+ 3,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ new BlobData(seq4Row6),
+ BlobPlaceholder.INSTANCE),
+ 5,
+ 4,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(new BlobData(seq4Row8),
BlobPlaceholder.INSTANCE),
+ 8,
+ 4,
+ table.schema().id(),
+ Collections.singletonList("f2")),
+ writeBlobFile(
+ table.fileIO(),
+ pathFactory.newBlobPath(),
+ Arrays.asList(
+ BlobPlaceholder.INSTANCE,
+ new BlobData(seq6Row1),
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ BlobPlaceholder.INSTANCE,
+ new BlobData(seq6Row9)),
+ 0,
+ 6,
+ table.schema().id(),
+ Collections.singletonList("f2")));
+
+ commitCompactFiles(oldBlobFiles, compactedLayout);
+
+ FileStoreTable readTable = getTableDefault();
+ assertBlobFileLayout(
+ readTable,
+ Arrays.asList(
+ "seq1:0-2",
+ "seq1:3-4",
+ "seq2:0-1",
+ "seq2:2-3",
+ "seq2:4-4",
+ "seq3:5-9",
+ "seq4:5-7",
+ "seq4:8-9",
+ "seq6:0-9"));
+
+ ReadBuilder readBuilder = readTable.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+ List<byte[]> actual = new ArrayList<>();
+ reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+ List<byte[]> expected =
+ Arrays.asList(
+ seq2Row0,
+ seq6Row1,
+ baseBlobs.get(2),
+ seq2Row3,
+ seq2Row4,
+ baseBlobs.get(5),
+ seq4Row6,
+ baseBlobs.get(7),
+ seq4Row8,
+ seq6Row9);
+ assertThat(actual.size()).isEqualTo(expected.size());
+ for (int i = 0; i < expected.size(); i++) {
+ assertThat(actual.get(i)).isEqualTo(expected.get(i));
+ }
+
+ // additionally, assert row id pushdown
+ for (int i = 0; i < expected.size(); i++) {
+ assertRowRangeRead(readTable, i, expected.get(i));
+ }
+ }
+
+ private void commitBlobFiles(DataFileMeta... dataFiles) throws Exception {
+ commitDataFiles(Arrays.asList(dataFiles), Collections.emptyList());
+ }
+
+ private void commitDataFiles(List<DataFileMeta> newFiles,
List<DataFileMeta> deletedFiles)
+ throws Exception {
+ commitDefault(
+ Collections.singletonList(
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ new DataIncrement(newFiles, deletedFiles,
Collections.emptyList()),
+ CompactIncrement.emptyIncrement())));
+ }
+
+ private void commitCompactFiles(
+ List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter)
throws Exception {
+ commitDefault(
+ Collections.singletonList(
+ new CommitMessageImpl(
+ BinaryRow.EMPTY_ROW,
+ 0,
+ null,
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(
+ compactBefore, compactAfter,
Collections.emptyList()))));
+ }
+
+ private static List<DataFileMeta> blobFiles(FileStoreTable table) {
+ List<DataFileMeta> actual = new ArrayList<>();
+ for (ManifestEntry entry : table.store().newScan().plan().files()) {
+ DataFileMeta file = entry.file();
+ if (BlobFileFormat.isBlobFile(file.fileName())) {
+ actual.add(file);
+ }
+ }
+ return actual;
+ }
+
+ private static void assertBlobFileRowIdRanges(FileStoreTable table,
List<Range> expected) {
+ List<Range> actual = new ArrayList<>();
+ for (DataFileMeta file : blobFiles(table)) {
+ actual.add(file.nonNullRowIdRange());
+ }
+ Collections.sort(actual, (left, right) -> Long.compare(left.from,
right.from));
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private static void assertBlobFileLayout(FileStoreTable table,
List<String> expected) {
+ List<DataFileMeta> files = blobFiles(table);
+ Collections.sort(
+ files,
+ (left, right) -> {
+ int seqCompare =
+ Long.compare(left.maxSequenceNumber(),
right.maxSequenceNumber());
+ if (seqCompare != 0) {
+ return seqCompare;
+ }
+ int fromCompare =
+ Long.compare(left.nonNullFirstRowId(),
right.nonNullFirstRowId());
+ if (fromCompare != 0) {
+ return fromCompare;
+ }
+ return Long.compare(left.nonNullRowIdRange().to,
right.nonNullRowIdRange().to);
+ });
+ List<String> actual = new ArrayList<>();
+ for (DataFileMeta file : files) {
+ Range range = file.nonNullRowIdRange();
+ actual.add("seq" + file.maxSequenceNumber() + ":" + range.from +
"-" + range.to);
+ }
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private static void assertRowRangeRead(FileStoreTable table, long rowId,
byte[] expected)
+ throws Exception {
+ ReadBuilder readBuilder =
+ table.newReadBuilder()
+ .withRowRanges(Collections.singletonList(new
Range(rowId, rowId)));
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+ List<byte[]> actual = new ArrayList<>();
+ reader.forEachRemaining(row -> actual.add(row.getBlob(2).toData()));
+
+ assertThat(actual.size()).isEqualTo(1);
+ assertThat(actual.get(0)).isEqualTo(expected);
+ }
+
+ private static byte[] fixedBlobBytes(int value) {
+ byte[] bytes = new byte[2 * 1024 * 124];
+ Arrays.fill(bytes, (byte) value);
+ return bytes;
+ }
+
+ private static DataFileMeta writeBlobFile(
+ FileIO fileIO,
+ Path path,
+ List<Blob> blobs,
+ long firstRowId,
+ long maxSequenceNumber,
+ long schemaId,
+ List<String> writeCols)
+ throws IOException {
+ try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
+ FormatWriter writer =
+ new BlobFileFormat()
+ .createWriterFactory(RowType.of(DataTypes.BLOB()))
+ .create(out, "none");
+ for (Blob blob : blobs) {
+ writer.addElement(GenericRow.of(blob));
+ }
+ writer.close();
+ }
+ return DataFileMeta.create(
+ path.getName(),
+ fileIO.getFileSize(path),
+ blobs.size(),
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ maxSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ 0L,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
+ @Override
+ protected Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "700
KB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ return schemaBuilder.build();
+ }
+
+ @Override
+ protected InternalRow dataDefault(int time, int size) {
+ return GenericRow.of(
+ RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new
BlobData(blobBytes));
+ }
+
+ @Override
+ protected byte[] randomBytes() {
+ byte[] binary = new byte[2 * 1024 * 124];
+ RANDOM.nextBytes(binary);
+ return binary;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
new file mode 100644
index 0000000000..dcea0a6ff1
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import
org.apache.paimon.operation.BlobFallbackRecordReader.BlobSequenceGroupRecordReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link BlobFallbackRecordReader}. */
+public class BlobFallbackRecordReaderTest {
+
+ private static final int BLOB_INDEX = 0;
+ private static final String BLOB_FIELD = "blob_col";
+ private static final RowType READ_ROW_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(BLOB_INDEX, BLOB_FIELD,
DataTypes.BLOB()),
+ new DataField(1, SpecialFields.ROW_ID.name(),
DataTypes.BIGINT()),
+ new DataField(
+ 2, SpecialFields.SEQUENCE_NUMBER.name(),
DataTypes.BIGINT())));
+
+ @Test
+ public void testBlobSequenceGroupReaderWithRowRanges() throws Exception {
+ List<DataFileMeta> files =
+ Arrays.asList(blobFile("blob1", 0, 3, 10), blobFile("blob2",
5, 2, 10));
+ List<Range> rowRanges = ranges(1, 1, 3, 5);
+
+ ReadResult rows = readSequenceGroup(files, rowRanges, 0, 6, 10);
+
+ assertThat(rows.rowIds).containsExactly(1L, 5L);
+ assertThat(rows.placeholderRowCount).isEqualTo(2);
+ assertThat(rows.batchSizes).containsExactly(1, 2, 1);
+ }
+
+ @Test
+ public void testBlobSequenceGroupReaderWithMultipleRangesInFileAndGap()
throws Exception {
+ DataFileMeta wideFile = blobFile("wide-file", 20, 31, 10);
+ List<Range> wideFileRanges = ranges(9, 10, 19, 20, 25, 30, 35, 40, 43,
45, 48, 52, 55, 56);
+
+ ReadResult wideFileRows =
+ readSequenceGroup(Collections.singletonList(wideFile),
wideFileRanges, 10, 60, 10);
+
+ List<Long> wideFileActualRowIds = rowIdsInRanges(20, 50,
wideFileRanges);
+
assertThat(wideFileRows.rowIds).containsExactlyElementsOf(wideFileActualRowIds);
+ assertThat(wideFileRows.placeholderRowCount)
+ .isEqualTo(
+ rowIdsInRanges(10, 19, wideFileRanges).size()
+ + rowIdsInRanges(51, 60,
wideFileRanges).size());
+ assertThat(wideFileRows.batchSizes)
+ .containsExactlyElementsOf(batchSizes(2,
wideFileActualRowIds.size(), 1, 4));
+
+ DataFileMeta firstFile = blobFile("first-file", 0, 11, 10);
+ DataFileMeta secondFile = blobFile("second-file", 50, 11, 10);
+ List<Range> gapRanges = ranges(0, 0, 9, 12, 25, 30, 35, 40, 43, 45,
48, 52, 58, 62);
+
+ ReadResult gapRows =
+ readSequenceGroup(Arrays.asList(firstFile, secondFile),
gapRanges, 0, 62, 10);
+
+ assertThat(gapRows.rowIds)
+ .containsExactlyElementsOf(
+ concat(
+ rowIdsInRanges(0, 10, gapRanges),
+ rowIdsInRanges(50, 60, gapRanges)));
+ assertThat(gapRows.placeholderRowCount)
+ .isEqualTo(
+ rowIdsInRanges(11, 49, gapRanges).size()
+ + rowIdsInRanges(61, 62, gapRanges).size());
+ assertThat(gapRows.batchSizes).containsExactly(1, 1, 1, 19, 1, 1, 1,
1, 1, 1, 2);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReader() throws Exception {
+ DataFileMeta newFile = blobFile("new-file", 0, 3, 2);
+ DataFileMeta oldFile = blobFile("old-file", 0, 5, 1);
+
+ ReadResult rows =
+ readFallback(Arrays.asList(newFile, oldFile), null,
placeholderRows(newFile, 1));
+
+ assertThat(rows.rowIds).containsExactly(0L, 1L, 2L, 3L, 4L);
+ assertThat(rows.sequenceNumbers).containsExactly(2L, 1L, 2L, 1L, 1L);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderDerivesRowIdBoundsFromFiles()
throws Exception {
+ DataFileMeta newFile = blobFile("new-file", 10, 2, 2);
+ DataFileMeta oldFile = blobFile("old-file", 8, 6, 1);
+
+ ReadResult rows =
+ readFallback(Arrays.asList(newFile, oldFile), null,
placeholderRows(newFile, 10));
+
+ assertThat(rows.rowIds).containsExactly(8L, 9L, 10L, 11L, 12L, 13L);
+ assertThat(rows.sequenceNumbers).containsExactly(1L, 1L, 1L, 2L, 1L,
1L);
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderThrowsIfAllRowsArePlaceholders() {
+ DataFileMeta newFile = blobFile("new-placeholder-file", 0, 1, 2);
+ DataFileMeta oldFile = blobFile("old-placeholder-file", 0, 1, 1);
+
+ assertThatThrownBy(
+ () ->
+ readFallback(
+ Arrays.asList(newFile, oldFile),
+ null,
+ placeholderRows(newFile, 0, oldFile,
0)))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("all blob files at the same row id store
a placeholder");
+ }
+
+ @Test
+ public void testBlobFallbackRecordReaderWithRowRanges() throws Exception {
+ DataFileMeta oldFile = blobFile("old-file", 20, 26, 1);
+ DataFileMeta newFile1 = blobFile("new-file-1", 20, 11, 2);
+ DataFileMeta newFile2 = blobFile("new-file-2", 40, 6, 2);
+ List<Range> rowRanges = ranges(15, 20, 25, 26, 29, 33, 35, 41);
+
+ ReadResult rows =
+ readFallback(
+ Arrays.asList(newFile2, oldFile, newFile1),
+ rowRanges,
+ Collections.emptySet());
+
+ assertThat(rows.rowIds)
+ .containsExactly(
+ 20L, 25L, 26L, 29L, 30L, 31L, 32L, 33L, 35L, 36L, 37L,
38L, 39L, 40L, 41L);
+ assertThat(rows.sequenceNumbers)
+ .containsExactly(2L, 2L, 2L, 2L, 2L, 1L, 1L, 1L, 1L, 1L, 1L,
1L, 1L, 2L, 2L);
+ }
+
+ private static ReadResult readFallback(
+ List<DataFileMeta> files, List<Range> rowRanges, Set<String>
placeholderRows)
+ throws Exception {
+ return ReadResult.read(
+ new BlobFallbackRecordReader(
+ files,
+ file -> oneRowPerBatchReader(fileRows(file, rowRanges,
placeholderRows)),
+ rowRanges,
+ READ_ROW_TYPE,
+ BLOB_INDEX));
+ }
+
+ private static ReadResult readSequenceGroup(
+ List<DataFileMeta> files,
+ List<Range> rowRanges,
+ long firstRowId,
+ long lastRowId,
+ long sequenceNumber)
+ throws Exception {
+ return ReadResult.read(
+ new BlobSequenceGroupRecordReader(
+ files,
+ file -> oneRowPerBatchReader(fileRows(file,
rowRanges)),
+ rowRanges,
+ READ_ROW_TYPE,
+ BLOB_INDEX,
+ firstRowId,
+ lastRowId));
+ }
+
+ private static DataFileMeta blobFile(
+ String fileName, long firstRowId, long rowCount, long
maxSequenceNumber) {
+ return DataFileMeta.create(
+ fileName + ".blob",
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ 0L,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ Arrays.asList(BLOB_FIELD));
+ }
+
+ private static List<Range> ranges(long... bounds) {
+ if (bounds.length % 2 != 0) {
+ throw new IllegalArgumentException("Range bounds should be
paired.");
+ }
+
+ List<Range> ranges = new ArrayList<>();
+ for (int i = 0; i < bounds.length; i += 2) {
+ ranges.add(new Range(bounds[i], bounds[i + 1]));
+ }
+ return ranges;
+ }
+
+ private static List<InternalRow> fileRows(DataFileMeta file, List<Range>
rowRanges) {
+ return fileRows(file, rowRanges, Collections.emptySet());
+ }
+
+ private static List<InternalRow> fileRows(
+ DataFileMeta file, List<Range> rowRanges, Set<String>
placeholderRows) {
+ List<InternalRow> rows = new ArrayList<>();
+ long lastRowId = file.nonNullFirstRowId() + file.rowCount() - 1;
+ for (long rowId = file.nonNullFirstRowId(); rowId <= lastRowId;
rowId++) {
+ if (selected(rowId, rowRanges)) {
+ rows.add(
+ blobRow(
+ rowId,
+ file.maxSequenceNumber(),
+ placeholderRows.contains(rowKey(file,
rowId))));
+ }
+ }
+ return rows;
+ }
+
+ private static boolean selected(long rowId, List<Range> rowRanges) {
+ if (rowRanges == null) {
+ return true;
+ }
+ for (Range range : rowRanges) {
+ if (rowId < range.from) {
+ return false;
+ }
+ if (rowId <= range.to) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static List<Long> rowIdsInRanges(long firstRowId, long lastRowId,
List<Range> ranges) {
+ List<Long> rowIds = new ArrayList<>();
+ for (long rowId = firstRowId; rowId <= lastRowId; rowId++) {
+ if (selected(rowId, ranges)) {
+ rowIds.add(rowId);
+ }
+ }
+ return rowIds;
+ }
+
+ private static List<Long> concat(List<Long> first, List<Long> second) {
+ List<Long> all = new ArrayList<>(first);
+ all.addAll(second);
+ return all;
+ }
+
+ private static List<Integer> batchSizes(
+ int firstBatchSize, int repeatedBatchCount, int repeatedBatchSize,
int lastBatchSize) {
+ List<Integer> batchSizes = new ArrayList<>();
+ batchSizes.add(firstBatchSize);
+ for (int i = 0; i < repeatedBatchCount; i++) {
+ batchSizes.add(repeatedBatchSize);
+ }
+ batchSizes.add(lastBatchSize);
+ return batchSizes;
+ }
+
+ private static Set<String> placeholderRows(DataFileMeta file, long rowId) {
+ Set<String> keys = new HashSet<>();
+ keys.add(rowKey(file, rowId));
+ return keys;
+ }
+
+ private static Set<String> placeholderRows(
+ DataFileMeta firstFile, long firstRowId, DataFileMeta secondFile,
long secondRowId) {
+ Set<String> keys = placeholderRows(firstFile, firstRowId);
+ keys.add(rowKey(secondFile, secondRowId));
+ return keys;
+ }
+
+ private static String rowKey(DataFileMeta file, long rowId) {
+ return file.fileName() + "#" + rowId;
+ }
+
+ private static InternalRow blobRow(long rowId, long sequenceNumber,
boolean placeholder) {
+ GenericRow row = new GenericRow(3);
+ row.setField(
+ BLOB_INDEX,
+ placeholder ? BlobPlaceholder.INSTANCE : new BlobData(new
byte[] {(byte) rowId}));
+ row.setField(1, rowId);
+ row.setField(2, sequenceNumber);
+ return row;
+ }
+
+ private static RecordReader<InternalRow>
oneRowPerBatchReader(List<InternalRow> rows) {
+ return new RecordReader<InternalRow>() {
+
+ int index;
+
+ @Override
+ public RecordIterator<InternalRow> readBatch() {
+ if (index >= rows.size()) {
+ return null;
+ }
+ InternalRow row = rows.get(index++);
+ return new RecordIterator<InternalRow>() {
+
+ boolean returned;
+
+ @Override
+ public InternalRow next() {
+ if (returned) {
+ return null;
+ }
+ returned = true;
+ return row;
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ private static class ReadResult {
+ final List<Long> rowIds = new ArrayList<>();
+ final List<Long> sequenceNumbers = new ArrayList<>();
+ final List<Integer> batchSizes = new ArrayList<>();
+ int placeholderRowCount;
+
+ static ReadResult read(RecordReader<InternalRow> reader) throws
Exception {
+ try {
+ ReadResult result = new ReadResult();
+ RecordIterator<InternalRow> batch;
+ while ((batch = reader.readBatch()) != null) {
+ int batchSize = 0;
+ InternalRow row;
+ while ((row = batch.next()) != null) {
+ result.add(row);
+ batchSize++;
+ }
+ batch.releaseBatch();
+ result.batchSizes.add(batchSize);
+ }
+ return result;
+ } finally {
+ reader.close();
+ }
+ }
+
+ private void add(InternalRow row) {
+ if (row.getBlob(BLOB_INDEX) == BlobPlaceholder.INSTANCE) {
+ placeholderRowCount++;
+ } else {
+ rowIds.add(row.getLong(1));
+ sequenceNumbers.add(row.getLong(2));
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index d6e6998e6c..51a0a93aa2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -21,8 +21,9 @@ package org.apache.paimon.operation;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.operation.DataEvolutionSplitRead.BlobFileBunch;
import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
-import org.apache.paimon.operation.DataEvolutionSplitRead.SpecialFieldBunch;
+import org.apache.paimon.operation.DataEvolutionSplitRead.VectorFileBunch;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -42,150 +43,149 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link SpecialFieldBunch}. */
+/** Tests for blob and vector field bunches. */
public class DataEvolutionReadTest {
- private SpecialFieldBunch blobBunch;
+ private VectorFileBunch vectorBunch;
@BeforeEach
public void setUp() {
- blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, false);
+ vectorBunch = new VectorFileBunch(Long.MAX_VALUE, false);
}
@Test
- public void testAddSingleBlobEntry() {
- DataFileMeta blobEntry = createBlobFile("blob1", 0L, 100L, 1L);
+ public void testAddSingleVectorEntry() {
+ DataFileMeta vectorEntry = createVectorFile("vector1", 0L, 100L, 1L);
- blobBunch.add(blobEntry);
+ vectorBunch.add(vectorEntry);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
- assertThat(blobBunch.rowCount()).isEqualTo(100);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
-
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+ assertThat(vectorBunch.rowCount()).isEqualTo(100);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+
assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
}
@Test
- public void testAddBlobEntryAndTail() {
- DataFileMeta blobEntry = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobTail = createBlobFile("blob2", 100, 200, 1);
-
- blobBunch.add(blobEntry);
- blobBunch.add(blobTail);
-
- assertThat(blobBunch.files).hasSize(2);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry);
- assertThat(blobBunch.files.get(1)).isEqualTo(blobTail);
- assertThat(blobBunch.rowCount()).isEqualTo(300);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
-
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
- assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
+ public void testAddVectorEntryAndTail() {
+ DataFileMeta vectorEntry = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorTail = createVectorFile("vector2", 100, 200, 1);
+
+ vectorBunch.add(vectorEntry);
+ vectorBunch.add(vectorTail);
+
+ assertThat(vectorBunch.files).hasSize(2);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry);
+ assertThat(vectorBunch.files.get(1)).isEqualTo(vectorTail);
+ assertThat(vectorBunch.rowCount()).isEqualTo(300);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+
assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
+ assertThat(vectorBunch.files.get(0).schemaId()).isEqualTo(0L);
}
@Test
- public void testAddNonBlobFileThrowsException() {
+ public void testAddNonVectorFileThrowsException() {
DataFileMeta normalFile = createNormalFile("normal1.parquet", 0, 100,
1, 0L);
- assertThatThrownBy(() -> blobBunch.add(normalFile))
+ assertThatThrownBy(() -> vectorBunch.add(normalFile))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Only blob/vector-store file can be added to this
bunch.");
+ .hasMessage("Only vector-store file can be added to this
bunch.");
}
@Test
- public void testAddBlobFileWithSameFirstRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 2);
+ public void testAddVectorFileWithSameFirstRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 2);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with same firstRowId but higher sequence number should
throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file with same first row id should
have decreasing sequence number.");
+ "Vector file with same first row id should have
decreasing sequence number.");
}
@Test
- public void testAddBlobFileWithSameFirstRowIdAndLowerSequenceNumber() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 0, 50, 1);
+ public void testAddVectorFileWithSameFirstRowIdAndLowerSequenceNumber() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 0, 50, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with same firstRowId and lower sequence number should
be ignored
- blobBunch.add(blobEntry2);
+ vectorBunch.add(vectorEntry2);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
}
@Test
- public void testAddBlobFileWithOverlappingRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 2);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 1);
+ public void testAddVectorFileWithOverlappingRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 2);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with overlapping row id and lower sequence number
should be ignored
- blobBunch.add(blobEntry2);
+ vectorBunch.add(vectorEntry2);
- assertThat(blobBunch.files).hasSize(1);
- assertThat(blobBunch.files.get(0)).isEqualTo(blobEntry1);
+ assertThat(vectorBunch.files).hasSize(1);
+ assertThat(vectorBunch.files.get(0)).isEqualTo(vectorEntry1);
}
@Test
- public void testAddBlobFileWithOverlappingRowIdAndHigherSequenceNumber() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 50, 150, 2);
+ public void testAddVectorFileWithOverlappingRowIdAndHigherSequenceNumber()
{
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 50, 150, 2);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with overlapping row id and higher sequence number
should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file with overlapping row id should
have decreasing sequence number.");
+ "Vector file with overlapping row id should have
decreasing sequence number.");
}
@Test
- public void testAddBlobFileWithNonContinuousRowId() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+ public void testAddVectorFileWithNonContinuousRowId() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with non-continuous row id should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
- "Blob/vector-store file first row id should be
continuous, expect 100 but got 200");
+ "Vector file first row id should be continuous, expect
100 but got 200");
}
@Test
- public void testAddBlobFileWithDifferentWriteCols() {
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 =
- createBlobFileWithCols("blob2", 100, 200, 1,
Arrays.asList("different_col"));
+ public void testAddVectorFileWithDifferentWriteCols() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 =
+ createVectorFileWithCols("vector2", 100, 200, 1,
Arrays.asList("different_col"));
- blobBunch.add(blobEntry1);
+ vectorBunch.add(vectorEntry1);
// Adding file with different write columns should throw exception
- assertThatThrownBy(() -> blobBunch.add(blobEntry2))
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("All files in this bunch should have the same
write columns.");
}
@Test
- public void testComplexBlobBunchScenario() {
- // Create a complex scenario with multiple blob entries and a tail
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 100, 200, 1);
- DataFileMeta blobEntry3 = createBlobFile("blob3", 300, 300, 1);
- DataFileMeta blobTail = createBlobFile("blob4", 600, 400, 1);
-
- blobBunch.add(blobEntry1);
- blobBunch.add(blobEntry2);
- blobBunch.add(blobEntry3);
- blobBunch.add(blobTail);
-
- assertThat(blobBunch.files).hasSize(4);
- assertThat(blobBunch.rowCount()).isEqualTo(1000);
- assertThat(blobBunch.files.get(0).firstRowId()).isEqualTo(0);
-
assertThat(blobBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("blob_col"));
+ public void testComplexVectorBunchScenario() {
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 100, 200, 1);
+ DataFileMeta vectorEntry3 = createVectorFile("vector3", 300, 300, 1);
+ DataFileMeta vectorTail = createVectorFile("vector4", 600, 400, 1);
+
+ vectorBunch.add(vectorEntry1);
+ vectorBunch.add(vectorEntry2);
+ vectorBunch.add(vectorEntry3);
+ vectorBunch.add(vectorTail);
+
+ assertThat(vectorBunch.files).hasSize(4);
+ assertThat(vectorBunch.rowCount()).isEqualTo(1000);
+ assertThat(vectorBunch.files.get(0).firstRowId()).isEqualTo(0);
+
assertThat(vectorBunch.files.get(0).writeCols()).isEqualTo(Arrays.asList("vector_col"));
}
@Test
@@ -209,26 +209,31 @@ public class DataEvolutionReadTest {
List<DataFileMeta> batch = batches.get(0);
- assertThat(batch.get(1).fileName()).contains("blob5"); // pick
- assertThat(batch.get(2).fileName()).contains("blob2"); // skip
- assertThat(batch.get(3).fileName()).contains("blob1"); // skip
- assertThat(batch.get(4).fileName()).contains("blob9"); // pick
- assertThat(batch.get(5).fileName()).contains("blob6"); // skip
- assertThat(batch.get(6).fileName()).contains("blob3"); // skip
- assertThat(batch.get(7).fileName()).contains("blob7"); // pick
- assertThat(batch.get(8).fileName()).contains("blob4"); // skip
- assertThat(batch.get(9).fileName()).contains("blob8"); // pick
+ assertThat(batch.get(1).fileName()).contains("blob5");
+ assertThat(batch.get(2).fileName()).contains("blob2");
+ assertThat(batch.get(3).fileName()).contains("blob1");
+ assertThat(batch.get(4).fileName()).contains("blob9");
+ assertThat(batch.get(5).fileName()).contains("blob6");
+ assertThat(batch.get(6).fileName()).contains("blob3");
+ assertThat(batch.get(7).fileName()).contains("blob7");
+ assertThat(batch.get(8).fileName()).contains("blob4");
+ assertThat(batch.get(9).fileName()).contains("blob8");
List<FieldBunch> fieldBunches =
splitFieldBunches(batch, file ->
makeBlobRowType(file.writeCols(), f -> 0));
assertThat(fieldBunches.size()).isEqualTo(2);
- SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
- assertThat(blobBunch.files).hasSize(4);
+ BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
}
@Test
@@ -275,19 +280,29 @@ public class DataEvolutionReadTest {
batch, file -> makeBlobRowType(file.writeCols(),
String::hashCode));
assertThat(fieldBunches.size()).isEqualTo(3);
- SpecialFieldBunch blobBunch = (SpecialFieldBunch) fieldBunches.get(1);
- assertThat(blobBunch.files).hasSize(4);
+ BlobFileBunch blobBunch = (BlobFileBunch) fieldBunches.get(1);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob5");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob9");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob7");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob8");
-
- blobBunch = (SpecialFieldBunch) fieldBunches.get(2);
- assertThat(blobBunch.files).hasSize(4);
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob2");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob1");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob9");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob6");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob3");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob7");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob4");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob8");
+
+ blobBunch = (BlobFileBunch) fieldBunches.get(2);
+ assertThat(blobBunch.files).hasSize(9);
assertThat(blobBunch.files.get(0).fileName()).contains("blob15");
- assertThat(blobBunch.files.get(1).fileName()).contains("blob19");
- assertThat(blobBunch.files.get(2).fileName()).contains("blob17");
- assertThat(blobBunch.files.get(3).fileName()).contains("blob18");
+ assertThat(blobBunch.files.get(1).fileName()).contains("blob12");
+ assertThat(blobBunch.files.get(2).fileName()).contains("blob11");
+ assertThat(blobBunch.files.get(3).fileName()).contains("blob19");
+ assertThat(blobBunch.files.get(4).fileName()).contains("blob16");
+ assertThat(blobBunch.files.get(5).fileName()).contains("blob13");
+ assertThat(blobBunch.files.get(6).fileName()).contains("blob17");
+ assertThat(blobBunch.files.get(7).fileName()).contains("blob14");
+ assertThat(blobBunch.files.get(8).fileName()).contains("blob18");
}
@Test
@@ -372,8 +387,81 @@ public class DataEvolutionReadTest {
writeCols);
}
+ private DataFileMeta createVectorFile(
+ String fileName, long firstRowId, long rowCount, long
maxSequenceNumber) {
+ return createVectorFileWithCols(
+ fileName, firstRowId, rowCount, maxSequenceNumber,
Arrays.asList("vector_col"));
+ }
+
+ private DataFileMeta createVectorFileWithSchema(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId) {
+ return createFile(
+ fileName + ".vector.avro",
+ firstRowId,
+ rowCount,
+ maxSequenceNumber,
+ schemaId,
+ Arrays.asList("vector_col"));
+ }
+
+ private DataFileMeta createVectorFileWithCols(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ List<String> writeCols) {
+ return createFile(
+ fileName + ".vector.avro", firstRowId, rowCount,
maxSequenceNumber, 0L, writeCols);
+ }
+
+ private DataFileMeta createFile(
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long maxSequenceNumber,
+ long schemaId,
+ List<String> writeCols) {
+ return DataFileMeta.create(
+ fileName,
+ rowCount,
+ rowCount,
+ DataFileMeta.EMPTY_MIN_KEY,
+ DataFileMeta.EMPTY_MAX_KEY,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ maxSequenceNumber,
+ schemaId,
+ DataFileMeta.DUMMY_LEVEL,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ rowCount,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ writeCols);
+ }
+
+ @Test
+ void testAddVectorFilesWithDifferentSchemaId() {
+ DataFileMeta vectorEntry1 = createVectorFileWithSchema("vector1", 0,
100, 1, 0L);
+ DataFileMeta vectorEntry2 = createVectorFileWithSchema("vector2", 100,
200, 1, 1L);
+
+ vectorBunch.add(vectorEntry1);
+ assertThatThrownBy(() -> vectorBunch.add(vectorEntry2))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("All files in this bunch should have the same
schema id.");
+ }
+
@Test
void testAddBlobFilesWithDifferentSchemaId() {
+ BlobFileBunch blobBunch = new BlobFileBunch(300, false);
DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1,
0L);
DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200,
1, 1L);
@@ -388,24 +476,24 @@ public class DataEvolutionReadTest {
@Test
public void testRowIdPushDown() {
- SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE,
true);
- DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
- blobBunch.add(blobEntry1);
- SpecialFieldBunch finalBlobBunch = blobBunch;
- DataFileMeta finalBlobEntry = blobEntry2;
- assertThatCode(() ->
finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
-
- blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
- blobEntry1 = createBlobFile("blob1", 0, 100, 1);
- blobEntry2 = createBlobFile("blob2", 50, 200, 2);
- blobBunch.add(blobEntry1);
- blobBunch.add(blobEntry2);
- assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
-
- SpecialFieldBunch finalBlobBunch2 = blobBunch;
- DataFileMeta blobEntry3 = createBlobFile("blob2", 250, 100, 2);
- assertThatCode(() ->
finalBlobBunch2.add(blobEntry3)).doesNotThrowAnyException();
+ VectorFileBunch vectorBunch = new VectorFileBunch(Long.MAX_VALUE,
true);
+ DataFileMeta vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ DataFileMeta vectorEntry2 = createVectorFile("vector2", 200, 300, 1);
+ vectorBunch.add(vectorEntry1);
+ VectorFileBunch finalVectorBunch = vectorBunch;
+ DataFileMeta finalVectorEntry = vectorEntry2;
+ assertThatCode(() ->
finalVectorBunch.add(finalVectorEntry)).doesNotThrowAnyException();
+
+ vectorBunch = new VectorFileBunch(Long.MAX_VALUE, true);
+ vectorEntry1 = createVectorFile("vector1", 0, 100, 1);
+ vectorEntry2 = createVectorFile("vector2", 50, 200, 2);
+ vectorBunch.add(vectorEntry1);
+ vectorBunch.add(vectorEntry2);
+ assertThat(vectorBunch.files).containsExactlyInAnyOrder(vectorEntry2);
+
+ VectorFileBunch finalVectorBunch2 = vectorBunch;
+ DataFileMeta vectorEntry3 = createVectorFile("vector2", 250, 100, 2);
+ assertThatCode(() ->
finalVectorBunch2.add(vectorEntry3)).doesNotThrowAnyException();
}
/** Creates a normal (non-blob) file for testing. */
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index c91dbb4c8e..e1bd365b23 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -79,8 +79,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchMergeHandler;
import org.apache.paimon.utils.RoaringBitmap32;
-import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
-
import org.apache.commons.math3.random.RandomDataGenerator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -1031,10 +1029,9 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
+ "."
+ CoreOptions.COLUMNS,
"price");
- options.set(ParquetOutputFormat.BLOCK_SIZE,
"1048576");
- options.set(
-
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
+ options.set("parquet.block.size", "1048576");
+ options.set("parquet.page.size.row.check.min",
"100");
+ options.set("parquet.page.row.count.limit", "300");
});
int bound = 300000;
@@ -1116,9 +1113,9 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
+ "."
+ CoreOptions.COLUMNS,
"price");
- options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
-
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
- options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ options.set("parquet.block.size", "1048576");
+ options.set("parquet.page.size.row.check.min", "100");
+ options.set("parquet.page.row.count.limit", "300");
};
// in unaware-bucket mode, we split files into splits all the time
FileStoreTable table = createUnawareBucketFileStoreTable(rowType,
configure);
@@ -1215,9 +1212,9 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
options.set(WRITE_ONLY, true);
options.set(SOURCE_SPLIT_TARGET_SIZE,
MemorySize.ofBytes(1));
- options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
-
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
- options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ options.set("parquet.block.size", "1048576");
+ options.set("parquet.page.size.row.check.min", "100");
+ options.set("parquet.page.row.count.limit", "300");
};
// in unaware-bucket mode, we split files into splits all the time
FileStoreTable table = createUnawareBucketFileStoreTable(rowType,
configure);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 80cfb975bd..82b097d894 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -82,8 +82,6 @@ import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RoaringBitmap32;
-import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
-
import org.apache.commons.math3.random.RandomDataGenerator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -1139,9 +1137,9 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
conf.set(BUCKET, 1);
conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
conf.set(DELETION_VECTORS_ENABLED, true);
- conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
-
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
- conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ conf.set("parquet.block.size", "524288");
+ conf.set("parquet.page.size.row.check.min", "100");
+ conf.set("parquet.page.row.count.limit", "300");
conf.set("file-index.bitmap.columns", "b");
});
@@ -1248,9 +1246,9 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
conf.set(BUCKET, 1);
conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
conf.set(DELETION_VECTORS_ENABLED, true);
- conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
-
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
- conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ conf.set("parquet.block.size", "524288");
+ conf.set("parquet.page.size.row.check.min", "100");
+ conf.set("parquet.page.row.count.limit", "300");
conf.set("file-index.range-bitmap.columns",
indexColumnName);
});
@@ -1335,9 +1333,9 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(SOURCE_SPLIT_TARGET_SIZE,
MemorySize.ofBytes(1));
- conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288");
-
conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
- conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ conf.set("parquet.block.size", "524288");
+ conf.set("parquet.page.size.row.check.min", "100");
+ conf.set("parquet.page.row.count.limit", "300");
});
int rowCount = 10000;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
index 27a1ed5617..344e1325a3 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileMeta.java
@@ -55,7 +55,7 @@ public class BlobFileMeta {
long[] blobOffsets = new long[blobLengths.length];
long offset = 0;
for (int i = 0; i < blobLengths.length; i++) {
- if (blobLengths[i] == -1) {
+ if (blobLengths[i] < 0) {
blobOffsets[i] = -1;
} else {
blobOffsets[i] = offset;
@@ -86,7 +86,11 @@ public class BlobFileMeta {
}
public boolean isNull(int i) {
- return blobLengths[i] == -1;
+ return blobLengths[i] == BlobFormatWriter.NULL_LENGTH;
+ }
+
+ public boolean isPlaceHolder(int i) {
+ return blobLengths[i] == BlobFormatWriter.PLACE_HOLDER_LENGTH;
}
public long blobLength(int i) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
index 801964b862..73222ea575 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.blob;
import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
@@ -94,6 +95,8 @@ public class BlobFormatReader implements
FileRecordReader<InternalRow> {
Blob blob;
if (fileMeta.isNull(currentPosition)) {
blob = null;
+ } else if (fileMeta.isPlaceHolder(currentPosition)) {
+ blob = BlobPlaceholder.INSTANCE;
} else {
long offset = fileMeta.blobOffset(currentPosition) + 4;
long length = fileMeta.blobLength(currentPosition) - 16;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
index 83d6c0eb91..f97158c171 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.blob;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.BlobDescriptor;
+import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileAwareFormatWriter;
import org.apache.paimon.format.FormatWriter;
@@ -46,6 +47,8 @@ public class BlobFormatWriter implements
FileAwareFormatWriter {
public static final byte VERSION = 1;
public static final int MAGIC_NUMBER = 1481511375;
public static final byte[] MAGIC_NUMBER_BYTES =
intToLittleEndian(MAGIC_NUMBER);
+ public static final long NULL_LENGTH = -1L;
+ public static final long PLACE_HOLDER_LENGTH = -2L;
private final PositionOutputStream out;
@Nullable private final BlobConsumer writeConsumer;
@@ -81,13 +84,17 @@ public class BlobFormatWriter implements
FileAwareFormatWriter {
public void addElement(InternalRow element) throws IOException {
checkArgument(element.getFieldCount() == 1, "BlobFormatWriter only
support one field.");
if (element.isNullAt(0)) {
- lengths.add(-1L);
+ lengths.add(NULL_LENGTH);
if (writeConsumer != null) {
writeConsumer.accept(blobFieldName, null);
}
return;
}
Blob blob = element.getBlob(0);
+ if (blob == BlobPlaceholder.INSTANCE) {
+ lengths.add(PLACE_HOLDER_LENGTH);
+ return;
+ }
long previousPos = out.getPos();
crc32.reset();
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
index 0b66e15c2b..6c7542bfcb 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.blob;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.BlobRef;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -79,16 +80,22 @@ public class BlobFileFormatTest {
// write
FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
- List<byte[]> blobs =
- Arrays.asList("hello".getBytes(), null, "world".getBytes(),
new byte[0]);
+ List<Object> blobs =
+ Arrays.asList(
+ "hello".getBytes(),
+ null,
+ BlobPlaceholder.INSTANCE,
+ "world".getBytes(),
+ new byte[0]);
try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
FormatWriter formatWriter = writerFactory.create(out, null);
- for (byte[] bytes : blobs) {
- if (bytes == null) {
+ for (Object blob : blobs) {
+ if (blob == null) {
formatWriter.addElement(GenericRow.of((Object) null));
- continue;
+ } else if (blob == BlobPlaceholder.INSTANCE) {
+
formatWriter.addElement(GenericRow.of(BlobPlaceholder.INSTANCE));
} else {
- formatWriter.addElement(GenericRow.of(new
BlobData(bytes)));
+ formatWriter.addElement(GenericRow.of(new
BlobData((byte[]) blob)));
}
}
formatWriter.close();
@@ -98,7 +105,7 @@ public class BlobFileFormatTest {
FormatReaderFactory readerFactory = format.createReaderFactory(null,
rowType, null);
FormatReaderContext context =
new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file));
- List<byte[]> result = new ArrayList<>();
+ List<Object> result = new ArrayList<>();
readerFactory
.createReader(context)
.forEachRemaining(
@@ -107,7 +114,10 @@ public class BlobFileFormatTest {
result.add(null);
} else {
Blob blob = row.getBlob(0);
- if (blobAsDescriptor) {
+ if (blob == BlobPlaceholder.INSTANCE) {
+ result.add(BlobPlaceholder.INSTANCE);
+ return;
+ } else if (blobAsDescriptor) {
assertThat(blob).isInstanceOf(BlobRef.class);
} else {
assertThat(blob).isInstanceOf(BlobData.class);
@@ -117,19 +127,23 @@ public class BlobFileFormatTest {
});
// assert
- assertThat(result).containsExactlyElementsOf(blobs);
+ assertThat(result).hasSize(blobs.size());
+ assertThat((byte[]) result.get(0)).isEqualTo((byte[]) blobs.get(0));
+ assertThat(result.get(1)).isNull();
+ assertThat(result.get(2)).isSameAs(BlobPlaceholder.INSTANCE);
+ assertThat((byte[]) result.get(3)).isEqualTo((byte[]) blobs.get(3));
+ assertThat((byte[]) result.get(4)).isEqualTo((byte[]) blobs.get(4));
// read with selection
RoaringBitmap32 selection = new RoaringBitmap32();
selection.add(2);
context = new FormatReaderContext(fileIO, file,
fileIO.getFileSize(file), selection);
result.clear();
- readerFactory
- .createReader(context)
- .forEachRemaining(row -> result.add(row.getBlob(0).toData()));
+ readerFactory.createReader(context).forEachRemaining(row ->
result.add(row.getBlob(0)));
// assert
- assertThat(result).containsOnly(blobs.get(2));
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0)).isSameAs(BlobPlaceholder.INSTANCE);
}
@Test
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index e7e65394aa..355fb36dc4 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -32,6 +32,8 @@ from pypaimon.table.row.row_kind import RowKind
class FormatBlobReader(RecordBatchReader):
+ NULL_LENGTH = -1
+ PLACE_HOLDER_LENGTH = -2
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField], push_down_predicate: Any,
blob_as_descriptor: bool,
@@ -97,6 +99,10 @@ class FormatBlobReader(RecordBatchReader):
for field_name in self._fields:
if blob is None:
pydict_data[field_name].append(None)
+ elif blob is Blob.PLACE_HOLDER:
+ raise RuntimeError(
+ "Blob placeholder is not supported by
FormatBlobReader yet."
+ )
elif self._blob_as_descriptor:
pydict_data[field_name].append(blob.to_descriptor().serialize())
else:
@@ -163,7 +169,7 @@ class FormatBlobReader(RecordBatchReader):
blob_offsets = []
offset = 0
for length in blob_lengths:
- if length == -1:
+ if length < 0:
blob_offsets.append(-1)
else:
blob_offsets.append(offset)
@@ -175,6 +181,8 @@ class FormatBlobReader(RecordBatchReader):
class BlobRecordIterator:
MAGIC_NUMBER_SIZE = 4
METADATA_OVERHEAD = 16
+ NULL_LENGTH = -1
+ PLACE_HOLDER_LENGTH = -2
def __init__(self, file_io: FileIO, file_path: str, blob_lengths:
List[int],
blob_offsets: List[int], field_name: str):
@@ -192,13 +200,17 @@ class BlobRecordIterator:
if self.current_position >= len(self.blob_lengths):
raise StopIteration
fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
- if self.blob_lengths[self.current_position] == -1:
+ length = self.blob_lengths[self.current_position]
+ if length == self.NULL_LENGTH:
self.current_position += 1
return GenericRow([None], fields, RowKind.INSERT)
+ if length == self.PLACE_HOLDER_LENGTH:
+ self.current_position += 1
+ return GenericRow([Blob.PLACE_HOLDER], fields, RowKind.INSERT)
# Create blob reference for the current blob
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4
bytes) = 12 bytes
blob_offset = self.blob_offsets[self.current_position] +
self.MAGIC_NUMBER_SIZE # Skip magic number
- blob_length = self.blob_lengths[self.current_position] -
self.METADATA_OVERHEAD
+ blob_length = length - self.METADATA_OVERHEAD
blob = Blob.from_file(self.file_io, self.file_path, blob_offset,
blob_length)
self.current_position += 1
return GenericRow([blob], fields, RowKind.INSERT)
diff --git a/paimon-python/pypaimon/table/row/blob.py
b/paimon-python/pypaimon/table/row/blob.py
index 19a932a9f9..43391775bd 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -297,6 +297,21 @@ class Blob(ABC):
return BlobData(data)
+class _PlaceholderBlob(Blob):
+
+ def to_data(self) -> bytes:
+ raise RuntimeError("Should never call this method for placeholder
blob.")
+
+ def to_descriptor(self) -> BlobDescriptor:
+ raise RuntimeError("Should never call this method for placeholder
blob.")
+
+ def new_input_stream(self) -> BinaryIO:
+ raise RuntimeError("Should never call this method for placeholder
blob.")
+
+
+Blob.PLACE_HOLDER = _PlaceholderBlob()
+
+
class BlobData(Blob):
def __init__(self, data: Optional[Union[bytes, bytearray]] = None):
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
index db0c639888..e6b856432b 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -29,7 +29,7 @@ from pypaimon import CatalogFactory
from pypaimon.common.file_io import FileIO
from pypaimon.filesystem.local_file_io import LocalFileIO
from pypaimon.common.options import Options
-from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+from pypaimon.read.reader.format_blob_reader import BlobRecordIterator,
FormatBlobReader
from pypaimon.schema.data_types import AtomicType, DataField
from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor
from pypaimon.table.row.generic_row import GenericRowDeserializer,
GenericRowSerializer, GenericRow
@@ -1296,6 +1296,79 @@ class BlobEndToEndTest(unittest.TestCase):
self.assertEqual(desc2.uri, blob_file_path)
reader.close()
+ def test_placeholder_blob_write_read(self):
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_file_path = os.path.join(self.temp_dir, "placeholder_blob.blob")
+
+ output = open(blob_file_path, 'wb')
+ writer = BlobFormatWriter(output)
+ fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+ writer.add_element(GenericRow([BlobData(b"hello")], fields,
RowKind.INSERT))
+ writer.add_element(GenericRow([Blob.PLACE_HOLDER], fields,
RowKind.INSERT))
+ writer.add_element(GenericRow([None], fields, RowKind.INSERT))
+ writer.add_element(GenericRow([BlobData(b"world")], fields,
RowKind.INSERT))
+ self.assertEqual(
+ writer.lengths[1:3],
+ [BlobFormatWriter.PLACE_HOLDER_LENGTH,
BlobFormatWriter.NULL_LENGTH])
+ writer.close()
+
+ with open(blob_file_path, 'rb') as blob_file:
+ blob_file.seek(-1, os.SEEK_END)
+ self.assertEqual(blob_file.read(1), struct.pack('<B',
BlobFormatWriter.VERSION))
+
+ blob_field_name = "blob_field"
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=blob_file_path,
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=False
+ )
+
+ self.assertEqual(reader.blob_lengths[1],
BlobFormatWriter.PLACE_HOLDER_LENGTH)
+ iterator = BlobRecordIterator(
+ file_io, blob_file_path, reader.blob_lengths, reader.blob_offsets,
blob_field_name)
+ self.assertEqual(next(iterator).values[0].to_data(), b"hello")
+ self.assertIs(next(iterator).values[0], Blob.PLACE_HOLDER)
+ self.assertIsNone(next(iterator).values[0])
+ self.assertEqual(next(iterator).values[0].to_data(), b"world")
+
+ with self.assertRaisesRegex(RuntimeError, "Blob placeholder is not
supported"):
+ reader.read_arrow_batch()
+ reader.close()
+
+ def test_placeholder_blob_read_as_descriptor(self):
+ from pypaimon.write.blob_format_writer import BlobFormatWriter
+
+ file_io = LocalFileIO(self.temp_dir, Options({}))
+ blob_file_path = os.path.join(self.temp_dir, "placeholder_desc.blob")
+
+ output = open(blob_file_path, 'wb')
+ writer = BlobFormatWriter(output)
+ fields = [DataField(0, "blob_field", AtomicType("BLOB"))]
+ writer.add_element(GenericRow([Blob.PLACE_HOLDER], fields,
RowKind.INSERT))
+ writer.add_element(GenericRow([BlobData(b"world")], fields,
RowKind.INSERT))
+ writer.close()
+
+ blob_field_name = "blob_field"
+ read_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=blob_file_path,
+ read_fields=[blob_field_name],
+ full_fields=read_fields,
+ push_down_predicate=None,
+ blob_as_descriptor=True
+ )
+
+ with self.assertRaisesRegex(RuntimeError, "Blob placeholder is not
supported"):
+ reader.read_arrow_batch()
+ reader.close()
+
class OffsetInputStreamTest(unittest.TestCase):
diff --git a/paimon-python/pypaimon/write/blob_format_writer.py
b/paimon-python/pypaimon/write/blob_format_writer.py
index f019655c84..92257f4ca9 100644
--- a/paimon-python/pypaimon/write/blob_format_writer.py
+++ b/paimon-python/pypaimon/write/blob_format_writer.py
@@ -26,6 +26,8 @@ from pypaimon.common.delta_varint_compressor import
DeltaVarintCompressor
class BlobFormatWriter:
VERSION = 1
MAGIC_NUMBER = 1481511375
+ NULL_LENGTH = -1
+ PLACE_HOLDER_LENGTH = -2
BUFFER_SIZE = 4096
METADATA_SIZE = 12 # 8-byte length + 4-byte CRC
@@ -40,12 +42,16 @@ class BlobFormatWriter:
blob_value = row.values[0]
if blob_value is None:
- self.lengths.append(-1)
+ self.lengths.append(self.NULL_LENGTH)
return
if not isinstance(blob_value, Blob):
raise ValueError("Field must be Blob/BlobData instance")
+ if blob_value is Blob.PLACE_HOLDER:
+ self.lengths.append(self.PLACE_HOLDER_LENGTH)
+ return
+
previous_pos = self.position
crc32 = 0 # Initialize CRC32
@@ -97,7 +103,7 @@ class BlobFormatWriter:
if col_data is None:
if not is_blob:
raise RuntimeError("Null values are only supported for BLOB
type fields")
- self.lengths.append(-1)
+ self.lengths.append(self.NULL_LENGTH)
return
if is_blob: