This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ea394c9ec6 [core] Add file_key_ranges system table (#7500)
ea394c9ec6 is described below

commit ea394c9ec6c363781c8490db5b03a31ba5cf108c
Author: sanshi <[email protected]>
AuthorDate: Sun Mar 22 08:26:00 2026 +0800

    [core] Add file_key_ranges system table (#7500)
    
      Add a new system table `$file_key_ranges` that exposes per-file key
      ranges (min_key, max_key), file path, and first_row_id for each data
      file. This enables users and DBAs to diagnose data distribution and
      Global Index (PIP-41) coverage via SQL queries.
    
      Schema (11 fields): partition, bucket, file_path, file_format,
      schema_id, level, record_count, file_size_in_bytes, min_key, max_key,
      first_row_id.
---
 .../{FilesTable.java => FileKeyRangesTable.java}   | 232 +++------------------
 .../org/apache/paimon/table/system/FilesTable.java |   4 +-
 .../paimon/table/system/SystemTableLoader.java     |   2 +
 .../table/system/FileKeyRangesTableTest.java       | 204 ++++++++++++++++++
 4 files changed, 233 insertions(+), 209 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
similarity index 63%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
index fcfaafaea5..feeedbc7b5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java
@@ -20,18 +20,13 @@ package org.apache.paimon.table.system;
 
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericArray;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.LazyGenericRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.In;
 import org.apache.paimon.predicate.LeafBinaryFunction;
@@ -42,8 +37,6 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.stats.SimpleStatsEvolution;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
@@ -51,18 +44,13 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
-import org.apache.paimon.table.source.SingletonSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.InternalRowUtils;
 import org.apache.paimon.utils.IteratorRecordReader;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
@@ -81,21 +69,21 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.OptionalLong;
-import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
 
-/** A {@link Table} for showing files of a snapshot in specific table. */
-public class FilesTable implements ReadonlyTable {
+/**
+ * A {@link Table} for showing key ranges and file path information for each 
data file, supporting
+ * diagnosis of data distribution and Global Index (PIP-41) coverage.
+ */
+public class FileKeyRangesTable implements ReadonlyTable {
 
     private static final long serialVersionUID = 1L;
 
-    public static final String FILES = "files";
+    public static final String FILE_KEY_RANGES = "file_key_ranges";
 
     public static final RowType TABLE_TYPE =
             new RowType(
@@ -111,31 +99,17 @@ public class FilesTable implements ReadonlyTable {
                             new DataField(7, "file_size_in_bytes", new 
BigIntType(false)),
                             new DataField(8, "min_key", 
SerializationUtils.newStringType(true)),
                             new DataField(9, "max_key", 
SerializationUtils.newStringType(true)),
-                            new DataField(
-                                    10,
-                                    "null_value_counts",
-                                    SerializationUtils.newStringType(false)),
-                            new DataField(
-                                    11, "min_value_stats", 
SerializationUtils.newStringType(false)),
-                            new DataField(
-                                    12, "max_value_stats", 
SerializationUtils.newStringType(false)),
-                            new DataField(13, "min_sequence_number", new 
BigIntType(true)),
-                            new DataField(14, "max_sequence_number", new 
BigIntType(true)),
-                            new DataField(15, "creation_time", 
DataTypes.TIMESTAMP_MILLIS()),
-                            new DataField(16, "deleteRowCount", 
DataTypes.BIGINT()),
-                            new DataField(17, "file_source", 
DataTypes.STRING()),
-                            new DataField(18, "first_row_id", 
DataTypes.BIGINT()),
-                            new DataField(19, "write_cols", 
DataTypes.ARRAY(DataTypes.STRING()))));
+                            new DataField(10, "first_row_id", new 
BigIntType(true))));
 
     private final FileStoreTable storeTable;
 
-    public FilesTable(FileStoreTable storeTable) {
+    public FileKeyRangesTable(FileStoreTable storeTable) {
         this.storeTable = storeTable;
     }
 
     @Override
     public String name() {
-        return storeTable.name() + SYSTEM_TABLE_SPLITTER + FILES;
+        return storeTable.name() + SYSTEM_TABLE_SPLITTER + FILE_KEY_RANGES;
     }
 
     @Override
@@ -155,20 +129,20 @@ public class FilesTable implements ReadonlyTable {
 
     @Override
     public InnerTableScan newScan() {
-        return new FilesScan(storeTable);
+        return new FileKeyRangesScan(storeTable);
     }
 
     @Override
     public InnerTableRead newRead() {
-        return new FilesRead(storeTable.schemaManager(), storeTable);
+        return new FileKeyRangesRead(storeTable.schemaManager(), storeTable);
     }
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new FilesTable(storeTable.copy(dynamicOptions));
+        return new FileKeyRangesTable(storeTable.copy(dynamicOptions));
     }
 
-    private static class FilesScan extends ReadOnceTableScan {
+    private static class FileKeyRangesScan extends ReadOnceTableScan {
 
         @Nullable private LeafPredicate partitionPredicate;
         @Nullable private LeafPredicate bucketPredicate;
@@ -176,7 +150,7 @@ public class FilesTable implements ReadonlyTable {
 
         private final FileStoreTable fileStoreTable;
 
-        public FilesScan(FileStoreTable fileStoreTable) {
+        public FileKeyRangesScan(FileStoreTable fileStoreTable) {
             this.fileStoreTable = fileStoreTable;
         }
 
@@ -257,7 +231,7 @@ public class FilesTable implements ReadonlyTable {
 
             return () ->
                     snapshotReader.partitions().stream()
-                            .map(p -> new FilesSplit(p, bucketPredicate, 
levelPredicate))
+                            .map(p -> new FilesTable.FilesSplit(p, 
bucketPredicate, levelPredicate))
                             .collect(Collectors.toList());
         }
 
@@ -282,77 +256,7 @@ public class FilesTable implements ReadonlyTable {
         }
     }
 
-    private static class FilesSplit extends SingletonSplit {
-
-        @Nullable private final BinaryRow partition;
-        @Nullable private final LeafPredicate bucketPredicate;
-        @Nullable private final LeafPredicate levelPredicate;
-
-        private FilesSplit(
-                @Nullable BinaryRow partition,
-                @Nullable LeafPredicate bucketPredicate,
-                @Nullable LeafPredicate levelPredicate) {
-            this.partition = partition;
-            this.bucketPredicate = bucketPredicate;
-            this.levelPredicate = levelPredicate;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            FilesSplit that = (FilesSplit) o;
-            return Objects.equals(partition, that.partition)
-                    && Objects.equals(bucketPredicate, that.bucketPredicate)
-                    && Objects.equals(this.levelPredicate, 
that.levelPredicate);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(partition, bucketPredicate, levelPredicate);
-        }
-
-        public List<Split> splits(FileStoreTable storeTable) {
-            return tablePlan(storeTable).splits();
-        }
-
-        private TableScan.Plan tablePlan(FileStoreTable storeTable) {
-            InnerTableScan scan = storeTable.newScan();
-            if (partition != null) {
-                scan.withPartitionFilter(Collections.singletonList(partition));
-            }
-            if (bucketPredicate != null) {
-                scan.withBucketFilter(
-                        bucket -> {
-                            // bucket index: 1
-                            return bucketPredicate.test(GenericRow.of(null, 
bucket));
-                        });
-            }
-            if (levelPredicate != null) {
-                scan.withLevelFilter(
-                        level -> {
-                            // level index: 5
-                            return levelPredicate.test(
-                                    GenericRow.of(null, null, null, null, 
null, level));
-                        });
-            } else {
-                // avoid that batchScanSkipLevel0 is true
-                scan.withLevelFilter(level -> true);
-            }
-            return scan.plan();
-        }
-
-        @Override
-        public OptionalLong mergedRowCount() {
-            return OptionalLong.empty();
-        }
-    }
-
-    private static class FilesRead implements InnerTableRead {
+    private static class FileKeyRangesRead implements InnerTableRead {
 
         private final SchemaManager schemaManager;
 
@@ -360,7 +264,7 @@ public class FilesTable implements ReadonlyTable {
 
         private RowType readType;
 
-        private FilesRead(SchemaManager schemaManager, FileStoreTable 
fileStoreTable) {
+        private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable 
fileStoreTable) {
             this.schemaManager = schemaManager;
             this.storeTable = fileStoreTable;
         }
@@ -384,22 +288,15 @@ public class FilesTable implements ReadonlyTable {
 
         @Override
         public RecordReader<InternalRow> createReader(Split split) {
-            if (!(split instanceof FilesSplit)) {
+            if (!(split instanceof FilesTable.FilesSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            FilesSplit filesSplit = (FilesSplit) split;
+            FilesTable.FilesSplit filesSplit = (FilesTable.FilesSplit) split;
             List<Split> splits = filesSplit.splits(storeTable);
             if (splits.isEmpty()) {
                 return new IteratorRecordReader<>(Collections.emptyIterator());
             }
 
-            List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
-            // dataFilePlan.snapshotId indicates there's no files in the 
table, use the newest
-            // schema id directly
-            SimpleStatsEvolutions simpleStatsEvolutions =
-                    new SimpleStatsEvolutions(
-                            sid -> schemaManager.schema(sid).fields(), 
storeTable.schema().id());
-
             @SuppressWarnings("unchecked")
             CastExecutor<InternalRow, BinaryString> partitionCastExecutor =
                     (CastExecutor<InternalRow, BinaryString>)
@@ -427,6 +324,8 @@ public class FilesTable implements ReadonlyTable {
                                     });
                         }
                     };
+
+            List<Iterator<InternalRow>> iteratorList = new ArrayList<>();
             for (Split dataSplit : splits) {
                 iteratorList.add(
                         Iterators.transform(
@@ -436,8 +335,7 @@ public class FilesTable implements ReadonlyTable {
                                                 (DataSplit) dataSplit,
                                                 partitionCastExecutor,
                                                 keyConverters,
-                                                file,
-                                                simpleStatsEvolutions)));
+                                                file)));
             }
             Iterator<InternalRow> rows = 
Iterators.concat(iteratorList.iterator());
             if (readType != null) {
@@ -445,7 +343,7 @@ public class FilesTable implements ReadonlyTable {
                         Iterators.transform(
                                 rows,
                                 row ->
-                                        ProjectedRow.from(readType, 
FilesTable.TABLE_TYPE)
+                                        ProjectedRow.from(readType, 
FileKeyRangesTable.TABLE_TYPE)
                                                 .replaceRow(row));
             }
             return new IteratorRecordReader<>(rows);
@@ -455,9 +353,7 @@ public class FilesTable implements ReadonlyTable {
                 DataSplit dataSplit,
                 CastExecutor<InternalRow, BinaryString> partitionCastExecutor,
                 Function<Long, RowDataToObjectArrayConverter> keyConverters,
-                DataFileMeta file,
-                SimpleStatsEvolutions simpleStatsEvolutions) {
-            StatsLazyGetter statsGetter = new StatsLazyGetter(file, 
simpleStatsEvolutions);
+                DataFileMeta file) {
             @SuppressWarnings("unchecked")
             Supplier<Object>[] fields =
                     new Supplier[] {
@@ -496,88 +392,10 @@ public class FilesTable implements ReadonlyTable {
                                                         keyConverters
                                                                 
.apply(file.schemaId())
                                                                 
.convert(file.maxKey()))),
-                        () -> 
BinaryString.fromString(statsGetter.nullValueCounts().toString()),
-                        () -> 
BinaryString.fromString(statsGetter.lowerValueBounds().toString()),
-                        () -> 
BinaryString.fromString(statsGetter.upperValueBounds().toString()),
-                        file::minSequenceNumber,
-                        file::maxSequenceNumber,
-                        file::creationTime,
-                        () -> file.deleteRowCount().orElse(null),
-                        () ->
-                                BinaryString.fromString(
-                                        
file.fileSource().map(FileSource::toString).orElse(null)),
-                        file::firstRowId,
-                        () -> {
-                            List<String> writeCols = file.writeCols();
-                            if (writeCols == null) {
-                                return null;
-                            }
-                            return new GenericArray(
-                                    
writeCols.stream().map(BinaryString::fromString).toArray());
-                        },
+                        file::firstRowId
                     };
 
             return new LazyGenericRow(fields);
         }
     }
-
-    private static class StatsLazyGetter {
-
-        private final DataFileMeta file;
-        private final SimpleStatsEvolutions simpleStatsEvolutions;
-
-        private Map<String, Long> lazyNullValueCounts;
-        private Map<String, Object> lazyLowerValueBounds;
-        private Map<String, Object> lazyUpperValueBounds;
-
-        private StatsLazyGetter(DataFileMeta file, SimpleStatsEvolutions 
simpleStatsEvolutions) {
-            this.file = file;
-            this.simpleStatsEvolutions = simpleStatsEvolutions;
-        }
-
-        private void initialize() {
-            SimpleStatsEvolution evolution = 
simpleStatsEvolutions.getOrCreate(file.schemaId());
-            // Create value stats
-            SimpleStatsEvolution.Result result =
-                    evolution.evolution(file.valueStats(), file.rowCount(), 
file.valueStatsCols());
-            InternalRow min = result.minValues();
-            InternalRow max = result.maxValues();
-            InternalArray nullCounts = result.nullCounts();
-            lazyNullValueCounts = new TreeMap<>();
-            lazyLowerValueBounds = new TreeMap<>();
-            lazyUpperValueBounds = new TreeMap<>();
-            int length =
-                    Math.min(min.getFieldCount(), 
simpleStatsEvolutions.tableDataFields().size());
-            for (int i = 0; i < length; i++) {
-                DataField field = 
simpleStatsEvolutions.tableDataFields().get(i);
-                String name = field.name();
-                DataType type = field.type();
-                lazyNullValueCounts.put(
-                        name, nullCounts.isNullAt(i) ? null : 
nullCounts.getLong(i));
-                lazyLowerValueBounds.put(name, InternalRowUtils.get(min, i, 
type));
-                lazyUpperValueBounds.put(name, InternalRowUtils.get(max, i, 
type));
-            }
-        }
-
-        private Map<String, Long> nullValueCounts() {
-            if (lazyNullValueCounts == null) {
-                initialize();
-            }
-            return lazyNullValueCounts;
-        }
-
-        private Map<String, Object> lowerValueBounds() {
-            if (lazyLowerValueBounds == null) {
-                initialize();
-            }
-            return lazyLowerValueBounds;
-        }
-
-        private Map<String, Object> upperValueBounds() {
-            if (lazyUpperValueBounds == null) {
-                initialize();
-            }
-            return lazyUpperValueBounds;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index fcfaafaea5..e0a89d92bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -282,13 +282,13 @@ public class FilesTable implements ReadonlyTable {
         }
     }
 
-    private static class FilesSplit extends SingletonSplit {
+    static class FilesSplit extends SingletonSplit {
 
         @Nullable private final BinaryRow partition;
         @Nullable private final LeafPredicate bucketPredicate;
         @Nullable private final LeafPredicate levelPredicate;
 
-        private FilesSplit(
+        FilesSplit(
                 @Nullable BinaryRow partition,
                 @Nullable LeafPredicate bucketPredicate,
                 @Nullable LeafPredicate levelPredicate) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
index e1411d290f..93e0453ab7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java
@@ -42,6 +42,7 @@ import static 
org.apache.paimon.table.system.BranchesTable.BRANCHES;
 import static org.apache.paimon.table.system.BucketsTable.BUCKETS;
 import static 
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
 import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
+import static 
org.apache.paimon.table.system.FileKeyRangesTable.FILE_KEY_RANGES;
 import static org.apache.paimon.table.system.FilesTable.FILES;
 import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS;
 import static org.apache.paimon.table.system.OptionsTable.OPTIONS;
@@ -67,6 +68,7 @@ public class SystemTableLoader {
                     .put(BUCKETS, BucketsTable::new)
                     .put(AUDIT_LOG, AuditLogTable::new)
                     .put(FILES, FilesTable::new)
+                    .put(FILE_KEY_RANGES, FileKeyRangesTable::new)
                     .put(TAGS, TagsTable::new)
                     .put(BRANCHES, BranchesTable::new)
                     .put(CONSUMERS, ConsumersTable::new)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
new file mode 100644
index 0000000000..ce675e2aae
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.predicate.In;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileKeyRangesTable}. */
+public class FileKeyRangesTableTest extends TableTestBase {
+
+    private static final String TABLE_NAME = "MyTable";
+
+    private FileStoreTable table;
+    private FileKeyRangesTable fileKeyRangesTable;
+
+    @BeforeEach
+    public void before() throws Exception {
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option("bucket", "2")
+                        .build();
+        catalog.createTable(identifier(TABLE_NAME), schema, false);
+        table = (FileStoreTable) catalog.getTable(identifier(TABLE_NAME));
+
+        Identifier fileKeyRangesId =
+                identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER + 
FileKeyRangesTable.FILE_KEY_RANGES);
+        fileKeyRangesTable = (FileKeyRangesTable) 
catalog.getTable(fileKeyRangesId);
+
+        // snapshot 1: write two partitions
+        write(table, GenericRow.of(1, 1, 10), GenericRow.of(1, 2, 20));
+        // snapshot 2: write more rows
+        write(table, GenericRow.of(2, 1, 30), GenericRow.of(2, 2, 40));
+    }
+
+    @Test
+    public void testReadBasic() throws Exception {
+        List<String> rows = readPartBucketLevel(null);
+        assertThat(rows).isNotEmpty();
+
+        // verify that file_path, min_key and max_key fields are readable
+        ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
+        rb.newRead()
+                .createReader(rb.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            // file_path (index 2) should be non-null
+                            assertThat(row.getString(2)).isNotNull();
+                            // min_key (index 8) and max_key (index 9) should 
be non-null for
+                            // primary key tables
+                            assertThat(row.getString(8)).isNotNull();
+                            assertThat(row.getString(9)).isNotNull();
+                        });
+    }
+
+    @Test
+    public void testPartitionFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+        List<String> rows = readPartBucketLevel(builder.equal(0, 
BinaryString.fromString("{1}")));
+        assertThat(rows).isNotEmpty();
+        for (String row : rows) {
+            assertThat(row).startsWith("{1}-");
+        }
+    }
+
+    @Test
+    public void testBucketFilter() throws Exception {
+        PredicateBuilder builder = new 
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+        List<String> rows = readPartBucketLevel(builder.equal(1, 0));
+        assertThat(rows).isNotEmpty();
+        for (String row : rows) {
+            String[] parts = row.split("-");
+            assertThat(parts[1]).isEqualTo("0");
+        }
+    }
+
+    @Test
+    public void testLevelFilter() throws Exception {
+        // compact to produce level-5 files
+        compact(table, row(1), 0);
+        compact(table, row(2), 0);
+
+        PredicateBuilder builder = new 
PredicateBuilder(FileKeyRangesTable.TABLE_TYPE);
+        List<String> rows = readPartBucketLevel(builder.equal(5, 5));
+        assertThat(rows).isNotEmpty();
+        for (String row : rows) {
+            String[] parts = row.split("-");
+            assertThat(parts[2]).isEqualTo("5");
+        }
+    }
+
+    @Test
+    public void testFirstRowId() throws Exception {
+        // first_row_id (index 10) is nullable BigInt; for a primary-key table 
without
+        // first_row_id configured it will be null - just verify the field is 
accessible
+        ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
+        rb.newRead()
+                .createReader(rb.newScan().plan())
+                .forEachRemaining(
+                        row -> {
+                            // accessing index 10 should not throw even if null
+                            if (!row.isNullAt(10)) {
+                                
assertThat(row.getLong(10)).isGreaterThanOrEqualTo(0L);
+                            }
+                        });
+    }
+
+    @Test
+    public void testSystemTableName() throws Exception {
+        Identifier id =
+                identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER + 
FileKeyRangesTable.FILE_KEY_RANGES);
+        FileKeyRangesTable t = (FileKeyRangesTable) catalog.getTable(id);
+        assertThat(t.rowType().getFieldCount()).isEqualTo(11);
+    }
+
+    @Test
+    public void testPartitionInFilter() throws Exception {
+        DataField partitionField = 
FileKeyRangesTable.TABLE_TYPE.getFields().get(0);
+        Predicate inPredicate =
+                new LeafPredicate(
+                        In.INSTANCE,
+                        partitionField.type(),
+                        0,
+                        partitionField.name(),
+                        Arrays.asList(
+                                BinaryString.fromString("{1}"), 
BinaryString.fromString("{2}")));
+        List<String> rows = readPartBucketLevel(inPredicate);
+        assertThat(rows).isNotEmpty();
+        boolean hasPt1 = false;
+        boolean hasPt2 = false;
+        for (String row : rows) {
+            if (row.startsWith("{1}-")) {
+                hasPt1 = true;
+            }
+            if (row.startsWith("{2}-")) {
+                hasPt2 = true;
+            }
+        }
+        assertThat(hasPt1).isTrue();
+        assertThat(hasPt2).isTrue();
+    }
+
+    private List<String> readPartBucketLevel(Predicate predicate) throws 
IOException {
+        ReadBuilder rb = fileKeyRangesTable.newReadBuilder();
+        if (predicate != null) {
+            rb = rb.withFilter(predicate);
+        }
+        List<String> rows = new ArrayList<>();
+        rb.newRead()
+                .createReader(rb.newScan().plan())
+                .forEachRemaining(
+                        row ->
+                                rows.add(
+                                        row.getString(0)
+                                                + "-"
+                                                + row.getInt(1)
+                                                + "-"
+                                                + row.getInt(5)));
+        return rows;
+    }
+}

Reply via email to