This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 716a9cff90 [core] Introduce minRowId and maxRowId to ManifestList file
(#6661)
716a9cff90 is described below
commit 716a9cff90fc57310ab8a71c3120b67deea8d961
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 24 10:54:52 2025 +0800
[core] Introduce minRowId and maxRowId to ManifestList file (#6661)
---
.../org/apache/paimon/manifest/ManifestFile.java | 24 ++++-
.../apache/paimon/manifest/ManifestFileMeta.java | 34 +++++--
.../manifest/ManifestFileMetaSerializer.java | 8 +-
.../paimon/operation/AbstractFileStoreScan.java | 16 +++-
.../paimon/operation/AppendOnlyFileStoreScan.java | 6 --
.../operation/DataEvolutionFileStoreScan.java | 44 +++++++--
.../paimon/operation/KeyValueFileStoreScan.java | 5 -
.../LegacyManifestFileMetaSerializerPaimon10.java | 2 +
.../apache/paimon/manifest/ManifestListTest.java | 2 +
.../paimon/manifest/ManifestTestDataGenerator.java | 4 +-
.../paimon/table/AppendOnlySimpleTableTest.java | 17 ++++
.../paimon/table/DataEvolutionTableTest.java | 105 ++++++++++++++++++++-
12 files changed, 230 insertions(+), 37 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e90a38bf6a..0ce78f37fc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -176,6 +176,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
private int maxBucket = Integer.MIN_VALUE;
private int minLevel = Integer.MAX_VALUE;
private int maxLevel = Integer.MIN_VALUE;
+ private @Nullable RowIdStats rowIdStats = new RowIdStats();
ManifestEntryWriter(FormatWriterFactory factory, Path path, String
fileCompression) {
super(
@@ -208,6 +209,14 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
maxBucket = Math.max(maxBucket, entry.bucket());
minLevel = Math.min(minLevel, entry.level());
maxLevel = Math.max(maxLevel, entry.level());
+ if (rowIdStats != null) {
+ Long firstRowId = entry.file().firstRowId();
+ if (firstRowId == null) {
+ rowIdStats = null;
+ } else {
+ rowIdStats.collect(firstRowId, entry.file().rowCount());
+ }
+ }
partitionStatsCollector.collect(entry.partition());
}
@@ -226,7 +235,20 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
minBucket,
maxBucket,
minLevel,
- maxLevel);
+ maxLevel,
+ rowIdStats == null ? null : rowIdStats.minRowId,
+ rowIdStats == null ? null : rowIdStats.maxRowId);
+ }
+ }
+
+ private static class RowIdStats {
+
+ private long minRowId = Long.MAX_VALUE;
+ private long maxRowId = Long.MIN_VALUE;
+
+ private void collect(long firstRowId, long rowCount) {
+ minRowId = Math.min(minRowId, firstRowId);
+ maxRowId = Math.max(maxRowId, firstRowId + rowCount - 1);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index aa0156360a..613541adc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -54,7 +54,9 @@ public class ManifestFileMeta {
new DataField(6, "_MIN_BUCKET", new IntType(true)),
new DataField(7, "_MAX_BUCKET", new IntType(true)),
new DataField(8, "_MIN_LEVEL", new IntType(true)),
- new DataField(9, "_MAX_LEVEL", new
IntType(true))));
+ new DataField(9, "_MAX_LEVEL", new IntType(true)),
+ new DataField(10, "_MIN_ROW_ID", new
BigIntType(true)),
+ new DataField(11, "_MAX_ROW_ID", new
BigIntType(true))));
private final String fileName;
private final long fileSize;
@@ -66,6 +68,8 @@ public class ManifestFileMeta {
private final @Nullable Integer maxBucket;
private final @Nullable Integer minLevel;
private final @Nullable Integer maxLevel;
+ private final @Nullable Long minRowId;
+ private final @Nullable Long maxRowId;
public ManifestFileMeta(
String fileName,
@@ -77,7 +81,9 @@ public class ManifestFileMeta {
@Nullable Integer minBucket,
@Nullable Integer maxBucket,
@Nullable Integer minLevel,
- @Nullable Integer maxLevel) {
+ @Nullable Integer maxLevel,
+ @Nullable Long minRowId,
+ @Nullable Long maxRowId) {
this.fileName = fileName;
this.fileSize = fileSize;
this.numAddedFiles = numAddedFiles;
@@ -88,6 +94,8 @@ public class ManifestFileMeta {
this.maxBucket = maxBucket;
this.minLevel = minLevel;
this.maxLevel = maxLevel;
+ this.minRowId = minRowId;
+ this.maxRowId = maxRowId;
}
public String fileName() {
@@ -130,6 +138,14 @@ public class ManifestFileMeta {
return maxLevel;
}
+ public @Nullable Long minRowId() {
+ return minRowId;
+ }
+
+ public @Nullable Long maxRowId() {
+ return maxRowId;
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestFileMeta)) {
@@ -145,7 +161,9 @@ public class ManifestFileMeta {
&& Objects.equals(minBucket, that.minBucket)
&& Objects.equals(maxBucket, that.maxBucket)
&& Objects.equals(minLevel, that.minLevel)
- && Objects.equals(maxLevel, that.maxLevel);
+ && Objects.equals(maxLevel, that.maxLevel)
+ && Objects.equals(minRowId, that.minRowId)
+ && Objects.equals(maxRowId, that.maxRowId);
}
@Override
@@ -160,13 +178,15 @@ public class ManifestFileMeta {
minBucket,
maxBucket,
minLevel,
- maxLevel);
+ maxLevel,
+ minRowId,
+ maxRowId);
}
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s}",
+ "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s, %s, %s}",
fileName,
fileSize,
numAddedFiles,
@@ -176,7 +196,9 @@ public class ManifestFileMeta {
minBucket,
maxBucket,
minLevel,
- maxLevel);
+ maxLevel,
+ minRowId,
+ maxRowId);
}
// ----------------------- Serialization -----------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
index b3819e1277..95e30a8856 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
@@ -50,7 +50,9 @@ public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<Manife
meta.minBucket(),
meta.maxBucket(),
meta.minLevel(),
- meta.maxLevel());
+ meta.maxLevel(),
+ meta.minRowId(),
+ meta.maxRowId());
}
@Override
@@ -75,6 +77,8 @@ public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<Manife
row.isNullAt(6) ? null : row.getInt(6),
row.isNullAt(7) ? null : row.getInt(7),
row.isNullAt(8) ? null : row.getInt(8),
- row.isNullAt(9) ? null : row.getInt(9));
+ row.isNullAt(9) ? null : row.getInt(9),
+ row.isNullAt(10) ? null : row.getLong(10),
+ row.isNullAt(11) ? null : row.getLong(11));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 1d92816b07..cd1b86c740 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -89,6 +89,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private ScanMetrics scanMetrics = null;
private boolean dropStats;
+ @Nullable protected List<Long> rowIdList;
public AbstractFileStoreScan(
ManifestsReader manifestsReader,
@@ -238,8 +239,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
@Override
- public FileStoreScan withRowIds(List<Long> indices) {
- // do nothing by default
+ public FileStoreScan withRowIds(List<Long> rowIdList) {
+ this.rowIdList = rowIdList;
return this;
}
@@ -260,6 +261,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
ManifestsReader.Result manifestsResult = readManifests();
Snapshot snapshot = manifestsResult.snapshot;
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
+ manifests = postFilterManifests(manifests);
Iterator<ManifestEntry> iterator = readManifestEntries(manifests,
false);
List<ManifestEntry> files = new ArrayList<>();
@@ -267,7 +269,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
files.add(iterator.next());
}
- files = postFilter(files);
+ files = postFilterManifestEntries(files);
if (wholeBucketFilterEnabled()) {
// We group files by bucket here, and filter them by the whole
bucket filter.
@@ -437,7 +439,13 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
protected abstract boolean filterByStats(ManifestEntry entry);
- protected abstract List<ManifestEntry> postFilter(List<ManifestEntry>
entries);
+ protected List<ManifestFileMeta>
postFilterManifests(List<ManifestFileMeta> manifests) {
+ return manifests;
+ }
+
+ protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
+ return entries;
+ }
protected boolean wholeBucketFilterEnabled() {
return false;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index be5c48539d..991e25ea15 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -33,7 +33,6 @@ import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -122,11 +121,6 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
return testFileIndex(entry.file().embeddedIndex(), entry);
}
- @Override
- protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
- return entries;
- }
-
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes,
ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 152b820ea6..0e2ab2ab19 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.DataEvolutionArray;
import org.apache.paimon.reader.DataEvolutionRow;
@@ -35,8 +36,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.RangeHelper;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
@@ -51,7 +50,6 @@ import static
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
private boolean dropStats = false;
- @Nullable private List<Long> indices;
public DataEvolutionFileStoreScan(
ManifestsReader manifestsReader,
@@ -74,29 +72,57 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
@Override
public FileStoreScan dropStats() {
+ // overwrite to keep stats here
+ // TODO refactor this hacky
this.dropStats = true;
return this;
}
@Override
public FileStoreScan keepStats() {
+ // overwrite to keep stats here
+ // TODO refactor this hacky
this.dropStats = false;
return this;
}
+ @Override
public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
+ // overwrite to keep all filter here
+ // TODO refactor this hacky
this.inputFilter = predicate;
return this;
}
@Override
- public FileStoreScan withRowIds(List<Long> indices) {
- this.indices = indices;
- return this;
+ protected List<ManifestFileMeta>
postFilterManifests(List<ManifestFileMeta> manifests) {
+ if (rowIdList == null || rowIdList.isEmpty()) {
+ return manifests;
+ }
+ return
manifests.stream().filter(this::filterManifestByRowIds).collect(Collectors.toList());
+ }
+
+ private boolean filterManifestByRowIds(ManifestFileMeta manifest) {
+ if (rowIdList == null || rowIdList.isEmpty()) {
+ return true;
+ }
+
+ Long min = manifest.minRowId();
+ Long max = manifest.maxRowId();
+ if (min == null || max == null) {
+ return true;
+ }
+
+ for (long rowId : rowIdList) {
+ if (rowId >= min && rowId <= max) {
+ return true;
+ }
+ }
+ return false;
}
@Override
- protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+ protected List<ManifestEntry>
postFilterManifestEntries(List<ManifestEntry> entries) {
if (inputFilter == null) {
return entries;
}
@@ -214,7 +240,7 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
@Override
protected boolean filterByStats(ManifestEntry entry) {
// If indices is null, all entries should be kept
- if (this.indices == null) {
+ if (this.rowIdList == null) {
return true;
}
@@ -228,7 +254,7 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
long rowCount = entry.file().rowCount();
long endRowId = firstRowId + rowCount;
- for (Long index : this.indices) {
+ for (Long index : this.rowIdList) {
if (index >= firstRowId && index < endRowId) {
return true;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 38bb6cdeef..1de22d4997 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -153,11 +153,6 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
file.rowCount(), stats.minValues(), stats.maxValues(),
stats.nullCounts());
}
- @Override
- protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
- return entries;
- }
-
@Override
protected ManifestEntry dropStats(ManifestEntry entry) {
if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
index 10a31bd67b..a371f15526 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
@@ -90,6 +90,8 @@ public class LegacyManifestFileMetaSerializerPaimon10
null,
null,
null,
+ null,
+ null,
null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index 4ab5d924f5..e3c63f13eb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -140,6 +140,8 @@ public class ManifestListTest {
null,
null,
null,
+ null,
+ null,
null));
}
return result;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 8e09c1bb91..4b576c6bd6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -126,7 +126,9 @@ public class ManifestTestDataGenerator {
minBucket,
maxBucket,
minLevel,
- maxLevel);
+ maxLevel,
+ null,
+ null);
}
private void mergeLevelsIfNeeded(BinaryRow partition, int bucket) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 2ea21b2085..dd356e77fd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Equal;
@@ -195,6 +196,22 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
.isEmpty();
}
+ @Test
+ public void testMinMaxRowIdNull() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+ List<ManifestFileMeta> manifests =
+ table.store()
+ .manifestListFactory()
+ .create()
+ .readDataManifests(table.latestSnapshot().get());
+ assertThat(manifests.size()).isGreaterThan(0);
+ for (ManifestFileMeta manifest : manifests) {
+ assertThat(manifest.minRowId()).isNull();
+ assertThat(manifest.maxRowId()).isNull();
+ }
+ }
+
@Test
public void testReadDeletedFiles() throws Exception {
writeData();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 9b2655f340..6759530cb5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.DataEvolutionFileReader;
@@ -429,10 +430,20 @@ public class DataEvolutionTableTest extends TableTestBase
{
}
@Test
- public void testWithRowIds() throws Exception {
+ public void testWithRowIdsFilterManifestEntries() throws Exception {
+ innerTestWithRowIds(true);
+ }
+
+ @Test
+ public void testWithRowIdsFilterManifests() throws Exception {
+ innerTestWithRowIds(false);
+ }
+
+ public void innerTestWithRowIds(boolean compactManifests) throws Exception
{
createTableDefault();
Schema schema = schemaDefault();
- BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ FileStoreTable table = getTableDefault();
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
// Write first batch of data with firstRowId = 0
RowType writeType0 = schema.rowType().project(Arrays.asList("f0",
"f1"));
@@ -468,7 +479,22 @@ public class DataEvolutionTableTest extends TableTestBase {
commit.commit(commitables);
}
- ReadBuilder readBuilder = getTableDefault().newReadBuilder();
+ if (compactManifests) {
+ try (BatchTableCommit commit = builder.newCommit()) {
+ commit.compactManifests();
+ }
+
+ List<ManifestFileMeta> manifests =
+ table.store()
+ .manifestListFactory()
+ .create()
+ .readDataManifests(table.latestSnapshot().get());
+ assertThat(manifests.size()).isEqualTo(1);
+ assertThat(manifests.get(0).minRowId()).isEqualTo(0);
+ assertThat(manifests.get(0).maxRowId()).isEqualTo(5);
+ }
+
+ ReadBuilder readBuilder = table.newReadBuilder();
// Test 1: Filter by row IDs that exist in the first file (0, 1)
List<Long> rowIds1 = Arrays.asList(0L, 1L);
@@ -600,6 +626,79 @@ public class DataEvolutionTableTest extends TableTestBase {
assertThat(i.get()).isEqualTo(2);
}
+ @Test
+ public void testWithRowIdsFilterManifestsNonExistFile() throws Exception {
+ createTableDefault();
+ Schema schema = schemaDefault();
+ FileStoreTable table = getTableDefault();
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+
+ // Write first batch of data with firstRowId = 0
+ RowType writeType0 = schema.rowType().project(Arrays.asList("f0",
"f1"));
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(1, BinaryString.fromString("a")));
+ write0.write(GenericRow.of(2, BinaryString.fromString("b")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ setFirstRowId(commitables, 0L);
+ commit.commit(commitables);
+ }
+
+ // Write second batch of data with firstRowId = 2
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(3, BinaryString.fromString("c")));
+ write0.write(GenericRow.of(4, BinaryString.fromString("d")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ setFirstRowId(commitables, 2L);
+ commit.commit(commitables);
+ }
+
+ // Write third batch of data with firstRowId = 4
+ try (BatchTableWrite write0 =
builder.newWrite().withWriteType(writeType0)) {
+ write0.write(GenericRow.of(5, BinaryString.fromString("e")));
+ write0.write(GenericRow.of(6, BinaryString.fromString("f")));
+
+ BatchTableCommit commit = builder.newCommit();
+ List<CommitMessage> commitables = write0.prepareCommit();
+ setFirstRowId(commitables, 4L);
+ commit.commit(commitables);
+ }
+
+ // assert manifest row id min max
+ List<ManifestFileMeta> manifests =
+ table.store()
+ .manifestListFactory()
+ .create()
+ .readDataManifests(table.latestSnapshot().get());
+ assertThat(manifests.size()).isEqualTo(3);
+ assertThat(manifests.get(0).minRowId()).isEqualTo(0);
+ assertThat(manifests.get(0).maxRowId()).isEqualTo(1);
+ assertThat(manifests.get(1).minRowId()).isEqualTo(2);
+ assertThat(manifests.get(1).maxRowId()).isEqualTo(3);
+ assertThat(manifests.get(2).minRowId()).isEqualTo(4);
+ assertThat(manifests.get(2).maxRowId()).isEqualTo(5);
+
+ // delete last manifest file, should never read it
+
table.store().manifestFileFactory().create().delete(manifests.get(2).fileName());
+
+ // assert file
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Long> rowIds = Arrays.asList(0L, 3L);
+ List<Split> splits =
readBuilder.withRowIds(rowIds).newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(1);
+ DataSplit dataSplit = (DataSplit) splits.get(0);
+ assertThat(dataSplit.dataFiles().size()).isEqualTo(2);
+ DataFileMeta file1 = dataSplit.dataFiles().get(0);
+ assertThat(file1.firstRowId()).isEqualTo(0L);
+ assertThat(file1.rowCount()).isEqualTo(2L);
+ DataFileMeta file2 = dataSplit.dataFiles().get(1);
+ assertThat(file2.firstRowId()).isEqualTo(2L);
+ assertThat(file2.rowCount()).isEqualTo(2L);
+ }
+
@Test
public void testNonNullColumn() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();