This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ea394c9ec6 [core] Add file_key_ranges system table (#7500)
ea394c9ec6 is described below
commit ea394c9ec6c363781c8490db5b03a31ba5cf108c
Author: sanshi <[email protected]>
AuthorDate: Sun Mar 22 08:26:00 2026 +0800
[core] Add file_key_ranges system table (#7500)
Add a new system table `$file_key_ranges` that exposes per-file key
ranges (min_key, max_key), file path, and first_row_id for each data
file. This enables users and DBAs to diagnose data distribution and
Global Index (PIP-41) coverage via SQL queries.
Schema (11 fields): partition, bucket, file_path, file_format,
schema_id, level, record_count, file_size_in_bytes, min_key, max_key,
first_row_id.
---
.../{FilesTable.java => FileKeyRangesTable.java} | 232 +++------------------
.../org/apache/paimon/table/system/FilesTable.java | 4 +-
.../paimon/table/system/SystemTableLoader.java | 2 +
.../table/system/FileKeyRangesTableTest.java | 204 ++++++++++++++++++
4 files changed, 233 insertions(+), 209 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
similarity index 63%
copy from
paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
index fcfaafaea5..feeedbc7b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
@@ -20,18 +20,13 @@ package org.apache.paimon.table.system;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericArray;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.LazyGenericRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.In;
import org.apache.paimon.predicate.LeafBinaryFunction;
@@ -42,8 +37,6 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.stats.SimpleStatsEvolution;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
@@ -51,18 +44,13 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
-import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.InternalRowUtils;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -81,21 +69,21 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.OptionalLong;
-import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
-/** A {@link Table} for showing files of a snapshot in specific table. */
-public class FilesTable implements ReadonlyTable {
+/**
+ * A {@link Table} for showing key ranges and file path information for each
data file, supporting
+ * diagnosis of data distribution and Global Index (PIP-41) coverage.
+ */
+public class FileKeyRangesTable implements ReadonlyTable {
private static final long serialVersionUID = 1L;
- public static final String FILES = "files";
+ public static final String FILE_KEY_RANGES = "file_key_ranges";
public static final RowType TABLE_TYPE =
new RowType(
@@ -111,31 +99,17 @@ public class FilesTable implements ReadonlyTable {
new DataField(7, "file_size_in_bytes", new
BigIntType(false)),
new DataField(8, "min_key",
SerializationUtils.newStringType(true)),
new DataField(9, "max_key",
SerializationUtils.newStringType(true)),
- new DataField(
- 10,
- "null_value_counts",
- SerializationUtils.newStringType(false)),
- new DataField(
- 11, "min_value_stats",
SerializationUtils.newStringType(false)),
- new DataField(
- 12, "max_value_stats",
SerializationUtils.newStringType(false)),
- new DataField(13, "min_sequence_number", new
BigIntType(true)),
- new DataField(14, "max_sequence_number", new
BigIntType(true)),
- new DataField(15, "creation_time",
DataTypes.TIMESTAMP_MILLIS()),
- new DataField(16, "deleteRowCount",
DataTypes.BIGINT()),
- new DataField(17, "file_source",
DataTypes.STRING()),
- new DataField(18, "first_row_id",
DataTypes.BIGINT()),
- new DataField(19, "write_cols",
DataTypes.ARRAY(DataTypes.STRING()))));
+ new DataField(10, "first_row_id", new
BigIntType(true))));
private final FileStoreTable storeTable;
- public FilesTable(FileStoreTable storeTable) {
+ public FileKeyRangesTable(FileStoreTable storeTable) {
this.storeTable = storeTable;
}
@Override
public String name() {
- return storeTable.name() + SYSTEM_TABLE_SPLITTER + FILES;
+ return storeTable.name() + SYSTEM_TABLE_SPLITTER + FILE_KEY_RANGES;
}
@Override
@@ -155,20 +129,20 @@ public class FilesTable implements ReadonlyTable {
@Override
public InnerTableScan newScan() {
- return new FilesScan(storeTable);
+ return new FileKeyRangesScan(storeTable);
}
@Override
public InnerTableRead newRead() {
- return new FilesRead(storeTable.schemaManager(), storeTable);
+ return new FileKeyRangesRead(storeTable.schemaManager(), storeTable);
}
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new FilesTable(storeTable.copy(dynamicOptions));
+ return new FileKeyRangesTable(storeTable.copy(dynamicOptions));
}
- private static class FilesScan extends ReadOnceTableScan {
+ private static class FileKeyRangesScan extends ReadOnceTableScan {
@Nullable private LeafPredicate partitionPredicate;
@Nullable private LeafPredicate bucketPredicate;
@@ -176,7 +150,7 @@ public class FilesTable implements ReadonlyTable {
private final FileStoreTable fileStoreTable;
- public FilesScan(FileStoreTable fileStoreTable) {
+ public FileKeyRangesScan(FileStoreTable fileStoreTable) {
this.fileStoreTable = fileStoreTable;
}
@@ -257,7 +231,7 @@ public class FilesTable implements ReadonlyTable {
return () ->
snapshotReader.partitions().stream()
- .map(p -> new FilesSplit(p, bucketPredicate,
levelPredicate))
+ .map(p -> new FilesTable.FilesSplit(p,
bucketPredicate, levelPredicate))
.collect(Collectors.toList());
}
@@ -282,77 +256,7 @@ public class FilesTable implements ReadonlyTable {
}
}
- private static class FilesSplit extends SingletonSplit {
-
- @Nullable private final BinaryRow partition;
- @Nullable private final LeafPredicate bucketPredicate;
- @Nullable private final LeafPredicate levelPredicate;
-
- private FilesSplit(
- @Nullable BinaryRow partition,
- @Nullable LeafPredicate bucketPredicate,
- @Nullable LeafPredicate levelPredicate) {
- this.partition = partition;
- this.bucketPredicate = bucketPredicate;
- this.levelPredicate = levelPredicate;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- FilesSplit that = (FilesSplit) o;
- return Objects.equals(partition, that.partition)
- && Objects.equals(bucketPredicate, that.bucketPredicate)
- && Objects.equals(this.levelPredicate,
that.levelPredicate);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partition, bucketPredicate, levelPredicate);
- }
-
- public List<Split> splits(FileStoreTable storeTable) {
- return tablePlan(storeTable).splits();
- }
-
- private TableScan.Plan tablePlan(FileStoreTable storeTable) {
- InnerTableScan scan = storeTable.newScan();
- if (partition != null) {
- scan.withPartitionFilter(Collections.singletonList(partition));
- }
- if (bucketPredicate != null) {
- scan.withBucketFilter(
- bucket -> {
- // bucket index: 1
- return bucketPredicate.test(GenericRow.of(null,
bucket));
- });
- }
- if (levelPredicate != null) {
- scan.withLevelFilter(
- level -> {
- // level index: 5
- return levelPredicate.test(
- GenericRow.of(null, null, null, null,
null, level));
- });
- } else {
- // avoid that batchScanSkipLevel0 is true
- scan.withLevelFilter(level -> true);
- }
- return scan.plan();
- }
-
- @Override
- public OptionalLong mergedRowCount() {
- return OptionalLong.empty();
- }
- }
-
- private static class FilesRead implements InnerTableRead {
+ private static class FileKeyRangesRead implements InnerTableRead {
private final SchemaManager schemaManager;
@@ -360,7 +264,7 @@ public class FilesTable implements ReadonlyTable {
private RowType readType;
- private FilesRead(SchemaManager schemaManager, FileStoreTable
fileStoreTable) {
+ private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable
fileStoreTable) {
this.schemaManager = schemaManager;
this.storeTable = fileStoreTable;
}
@@ -384,22 +288,15 @@ public class FilesTable implements ReadonlyTable {
@Override
public RecordReader<InternalRow> createReader(Split split) {
- if (!(split instanceof FilesSplit)) {
+ if (!(split instanceof FilesTable.FilesSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- FilesSplit filesSplit = (FilesSplit) split;
+ FilesTable.FilesSplit filesSplit = (FilesTable.FilesSplit) split;
List<Split> splits = filesSplit.splits(storeTable);
if (splits.isEmpty()) {
return new IteratorRecordReader<>(Collections.emptyIterator());
}
- List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
- // dataFilePlan.snapshotId indicates there's no files in the
table, use the newest
- // schema id directly
- SimpleStatsEvolutions simpleStatsEvolutions =
- new SimpleStatsEvolutions(
- sid -> schemaManager.schema(sid).fields(),
storeTable.schema().id());
-
@SuppressWarnings("unchecked")
CastExecutor<InternalRow, BinaryString> partitionCastExecutor =
(CastExecutor<InternalRow, BinaryString>)
@@ -427,6 +324,8 @@ public class FilesTable implements ReadonlyTable {
});
}
};
+
+ List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
for (Split dataSplit : splits) {
iteratorList.add(
Iterators.transform(
@@ -436,8 +335,7 @@ public class FilesTable implements ReadonlyTable {
(DataSplit) dataSplit,
partitionCastExecutor,
keyConverters,
- file,
- simpleStatsEvolutions)));
+ file)));
}
Iterator<InternalRow> rows =
Iterators.concat(iteratorList.iterator());
if (readType != null) {
@@ -445,7 +343,7 @@ public class FilesTable implements ReadonlyTable {
Iterators.transform(
rows,
row ->
- ProjectedRow.from(readType,
FilesTable.TABLE_TYPE)
+ ProjectedRow.from(readType,
FileKeyRangesTable.TABLE_TYPE)
.replaceRow(row));
}
return new IteratorRecordReader<>(rows);
@@ -455,9 +353,7 @@ public class FilesTable implements ReadonlyTable {
DataSplit dataSplit,
CastExecutor<InternalRow, BinaryString> partitionCastExecutor,
Function<Long, RowDataToObjectArrayConverter> keyConverters,
- DataFileMeta file,
- SimpleStatsEvolutions simpleStatsEvolutions) {
- StatsLazyGetter statsGetter = new StatsLazyGetter(file,
simpleStatsEvolutions);
+ DataFileMeta file) {
@SuppressWarnings("unchecked")
Supplier<Object>[] fields =
new Supplier[] {
@@ -496,88 +392,10 @@ public class FilesTable implements ReadonlyTable {
keyConverters
.apply(file.schemaId())
.convert(file.maxKey()))),
- () ->
BinaryString.fromString(statsGetter.nullValueCounts().toString()),
- () ->
BinaryString.fromString(statsGetter.lowerValueBounds().toString()),
- () ->
BinaryString.fromString(statsGetter.upperValueBounds().toString()),
- file::minSequenceNumber,
- file::maxSequenceNumber,
- file::creationTime,
- () -> file.deleteRowCount().orElse(null),
- () ->
- BinaryString.fromString(
-
file.fileSource().map(FileSource::toString).orElse(null)),
- file::firstRowId,
- () -> {
- List<String> writeCols = file.writeCols();
- if (writeCols == null) {
- return null;
- }
- return new GenericArray(
-
writeCols.stream().map(BinaryString::fromString).toArray());
- },
+ file::firstRowId
};
return new LazyGenericRow(fields);
}
}
-
- private static class StatsLazyGetter {
-
- private final DataFileMeta file;
- private final SimpleStatsEvolutions simpleStatsEvolutions;
-
- private Map<String, Long> lazyNullValueCounts;
- private Map<String, Object> lazyLowerValueBounds;
- private Map<String, Object> lazyUpperValueBounds;
-
- private StatsLazyGetter(DataFileMeta file, SimpleStatsEvolutions
simpleStatsEvolutions) {
- this.file = file;
- this.simpleStatsEvolutions = simpleStatsEvolutions;
- }
-
- private void initialize() {
- SimpleStatsEvolution evolution =
simpleStatsEvolutions.getOrCreate(file.schemaId());
- // Create value stats
- SimpleStatsEvolution.Result result =
- evolution.evolution(file.valueStats(), file.rowCount(),
file.valueStatsCols());
- InternalRow min = result.minValues();
- InternalRow max = result.maxValues();
- InternalArray nullCounts = result.nullCounts();
- lazyNullValueCounts = new TreeMap<>();
- lazyLowerValueBounds = new TreeMap<>();
- lazyUpperValueBounds = new TreeMap<>();
- int length =
- Math.min(min.getFieldCount(),
simpleStatsEvolutions.tableDataFields().size());
- for (int i = 0; i < length; i++) {
- DataField field =
simpleStatsEvolutions.tableDataFields().get(i);
- String name = field.name();
- DataType type = field.type();
- lazyNullValueCounts.put(
- name, nullCounts.isNullAt(i) ? null :
nullCounts.getLong(i));
- lazyLowerValueBounds.put(name, InternalRowUtils.get(min, i,
type));
- lazyUpperValueBounds.put(name, InternalRowUtils.get(max, i,
type));
- }
- }
-
- private Map<String, Long> nullValueCounts() {
- if (lazyNullValueCounts == null) {
- initialize();
- }
- return lazyNullValueCounts;
- }
-
- private Map<String, Object> lowerValueBounds() {
- if (lazyLowerValueBounds == null) {
- initialize();
- }
- return lazyLowerValueBounds;
- }
-
- private Map<String, Object> upperValueBounds() {
- if (lazyUpperValueBounds == null) {
- initialize();
- }
- return lazyUpperValueBounds;
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index fcfaafaea5..e0a89d92bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -282,13 +282,13 @@ public class FilesTable implements ReadonlyTable {
}
}
- private static class FilesSplit extends SingletonSplit {
+ static class FilesSplit extends SingletonSplit {
@Nullable private final BinaryRow partition;
@Nullable private final LeafPredicate bucketPredicate;
@Nullable private final LeafPredicate levelPredicate;
- private FilesSplit(
+ FilesSplit(
@Nullable BinaryRow partition,
@Nullable LeafPredicate bucketPredicate,
@Nullable LeafPredicate levelPredicate) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index e1411d290f..93e0453ab7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -42,6 +42,7 @@ import static
org.apache.paimon.table.system.BranchesTable.BRANCHES;
import static org.apache.paimon.table.system.BucketsTable.BUCKETS;
import static
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
+import static
org.apache.paimon.table.system.FileKeyRangesTable.FILE_KEY_RANGES;
import static org.apache.paimon.table.system.FilesTable.FILES;
import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
@@ -67,6 +68,7 @@ public class SystemTableLoader {
.put(BUCKETS, BucketsTable::new)
.put(AUDIT_LOG, AuditLogTable::new)
.put(FILES, FilesTable::new)
+ .put(FILE_KEY_RANGES, FileKeyRangesTable::new)
.put(TAGS, TagsTable::new)
.put(BRANCHES, BranchesTable::new)
.put(CONSUMERS, ConsumersTable::new)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
new file mode 100644
index 0000000000..ce675e2aae
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.predicate.In;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileKeyRangesTable}. */
+public class FileKeyRangesTableTest extends TableTestBase {
+
+ private static final String TABLE_NAME = "MyTable";
+
+ private FileStoreTable table;
+ private FileKeyRangesTable fileKeyRangesTable;
+
+ @BeforeEach
+ public void before() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option("bucket", "2")
+ .build();
+ catalog.createTable(identifier(TABLE_NAME), schema, false);
+ table = (FileStoreTable) catalog.getTable(identifier(TABLE_NAME));
+
+ Identifier fileKeyRangesId =
+ identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER +
FileKeyRangesTable.FILE_KEY_RANGES);
+ fileKeyRangesTable = (FileKeyRangesTable)
catalog.getTable(fileKeyRangesId);
+
+ // snapshot 1: write two partitions
+ write(table, GenericRow.of(1, 1, 10), GenericRow.of(1, 2, 20));
+ // snapshot 2: write more rows
+ write(table, GenericRow.of(2, 1, 30), GenericRow.of(2, 2, 40));
+ }
+
+ @Test
+ public void testReadBasic() throws Exception {
+ List<String> rows = readPartBucketLevel(null);
+ assertThat(rows).isNotEmpty();
+
+ // verify that file_path, min_key and max_key fields are readable
+ ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
+ rb.newRead()
+ .createReader(rb.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ // file_path (index 2) should be non-null
+ assertThat(row.getString(2)).isNotNull();
+ // min_key (index 8) and max_key (index 9) should
be non-null for
+ // primary key tables
+ assertThat(row.getString(8)).isNotNull();
+ assertThat(row.getString(9)).isNotNull();
+ });
+ }
+
+ @Test
+ public void testPartitionFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+ List<String> rows = readPartBucketLevel(builder.equal(0,
BinaryString.fromString("{1}")));
+ assertThat(rows).isNotEmpty();
+ for (String row : rows) {
+ assertThat(row).startsWith("{1}-");
+ }
+ }
+
+ @Test
+ public void testBucketFilter() throws Exception {
+ PredicateBuilder builder = new
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+ List<String> rows = readPartBucketLevel(builder.equal(1, 0));
+ assertThat(rows).isNotEmpty();
+ for (String row : rows) {
+ String[] parts = row.split("-");
+ assertThat(parts[1]).isEqualTo("0");
+ }
+ }
+
+ @Test
+ public void testLevelFilter() throws Exception {
+ // compact to produce level-5 files
+ compact(table, row(1), 0);
+ compact(table, row(2), 0);
+
+ PredicateBuilder builder = new
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+ List<String> rows = readPartBucketLevel(builder.equal(5, 5));
+ assertThat(rows).isNotEmpty();
+ for (String row : rows) {
+ String[] parts = row.split("-");
+ assertThat(parts[2]).isEqualTo("5");
+ }
+ }
+
+ @Test
+ public void testFirstRowId() throws Exception {
+ // first_row_id (index 10) is nullable BigInt; for a primary-key table
without
+ // first_row_id configured it will be null - just verify the field is
accessible
+ ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
+ rb.newRead()
+ .createReader(rb.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ // accessing index 10 should not throw even if null
+ if (!row.isNullAt(10)) {
+
assertThat(row.getLong(10)).isGreaterThanOrEqualTo(0L);
+ }
+ });
+ }
+
+ @Test
+ public void testSystemTableName() throws Exception {
+ Identifier id =
+ identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER +
FileKeyRangesTable.FILE_KEY_RANGES);
+ FileKeyRangesTable t = (FileKeyRangesTable) catalog.getTable(id);
+ assertThat(t.rowType().getFieldCount()).isEqualTo(11);
+ }
+
+ @Test
+ public void testPartitionInFilter() throws Exception {
+ DataField partitionField =
FileKeyRangesTable.TABLE_TYPE.getFields().get(0);
+ Predicate inPredicate =
+ new LeafPredicate(
+ In.INSTANCE,
+ partitionField.type(),
+ 0,
+ partitionField.name(),
+ Arrays.asList(
+ BinaryString.fromString("{1}"),
BinaryString.fromString("{2}")));
+ List<String> rows = readPartBucketLevel(inPredicate);
+ assertThat(rows).isNotEmpty();
+ boolean hasPt1 = false;
+ boolean hasPt2 = false;
+ for (String row : rows) {
+ if (row.startsWith("{1}-")) {
+ hasPt1 = true;
+ }
+ if (row.startsWith("{2}-")) {
+ hasPt2 = true;
+ }
+ }
+ assertThat(hasPt1).isTrue();
+ assertThat(hasPt2).isTrue();
+ }
+
+ private List<String> readPartBucketLevel(Predicate predicate) throws
IOException {
+ ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
+ if (predicate != null) {
+ rb = rb.withFilter(predicate);
+ }
+ List<String> rows = new ArrayList<>();
+ rb.newRead()
+ .createReader(rb.newScan().plan())
+ .forEachRemaining(
+ row ->
+ rows.add(
+ row.getString(0)
+ + "-"
+ + row.getInt(1)
+ + "-"
+ + row.getInt(5)));
+ return rows;
+ }
+}