This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d9597f7  feat(datafusion): Add $table_indexes system table (#327)
d9597f7 is described below

commit d9597f78bcf297b38181a9914245670203b95e2d
Author: jerry <[email protected]>
AuthorDate: Tue May 19 15:51:00 2026 +0800

    feat(datafusion): Add $table_indexes system table (#327)
---
 .../datafusion/src/system_tables/mod.rs            |   6 +
 .../datafusion/src/system_tables/table_indexes.rs  | 311 +++++++++++++++++++++
 .../integrations/datafusion/tests/system_tables.rs | 176 +++++++++++-
 3 files changed, 491 insertions(+), 2 deletions(-)

diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs 
b/crates/integrations/datafusion/src/system_tables/mod.rs
index 34587f8..af3b2fe 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -38,6 +38,7 @@ mod referenced_files_size;
 mod row_string_cast;
 mod schemas;
 mod snapshots;
+mod table_indexes;
 mod tags;
 
 type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
@@ -53,6 +54,7 @@ const TABLES: &[(&str, Builder)] = &[
     ("referenced_files_size", referenced_files_size::build),
     ("schemas", schemas::build),
     ("snapshots", snapshots::build),
+    ("table_indexes", table_indexes::build),
     ("tags", tags::build),
 ];
 
@@ -65,6 +67,7 @@ const SYSTEM_TABLE_NAMES: &[&str] = &[
     "referenced_files_size",
     "schemas",
     "snapshots",
+    "table_indexes",
     "tags",
 ];
 
@@ -190,6 +193,9 @@ mod tests {
         assert!(is_registered("manifests"));
         assert!(is_registered("Manifests"));
         assert!(is_registered("MANIFESTS"));
+        assert!(is_registered("table_indexes"));
+        assert!(is_registered("Table_Indexes"));
+        assert!(is_registered("TABLE_INDEXES"));
         assert!(is_registered("partitions"));
         assert!(is_registered("Partitions"));
         assert!(is_registered("PARTITIONS"));
diff --git a/crates/integrations/datafusion/src/system_tables/table_indexes.rs 
b/crates/integrations/datafusion/src/system_tables/table_indexes.rs
new file mode 100644
index 0000000..25bdd3b
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/table_indexes.rs
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java 
[TableIndexesTable](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java).
+
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::builder::{
+    ArrayBuilder, Int32Builder, Int64Builder, ListBuilder, StringBuilder, 
StructBuilder,
+};
+use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, 
StringArray};
+use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::spec::{
+    BinaryRow, DataField, DeletionVectorMeta, FileKind, IndexManifest, 
IndexManifestEntry,
+};
+use paimon::table::{SnapshotManager, Table};
+
+use super::row_string_cast::format_row_as_java_cast_string;
+use crate::error::to_datafusion_error;
+
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+    Ok(Arc::new(TableIndexesTable { table }))
+}
+
+fn table_indexes_schema() -> SchemaRef {
+    static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+    SCHEMA
+        .get_or_init(|| {
+            Arc::new(Schema::new(vec![
+                Field::new("partition", DataType::Utf8, true),
+                Field::new("bucket", DataType::Int32, false),
+                Field::new("index_type", DataType::Utf8, false),
+                Field::new("file_name", DataType::Utf8, false),
+                Field::new("file_size", DataType::Int64, false),
+                Field::new("row_count", DataType::Int64, false),
+                Field::new("dv_ranges", dv_ranges_data_type(), true),
+                Field::new("row_range_start", DataType::Int64, true),
+                Field::new("row_range_end", DataType::Int64, true),
+                Field::new("index_field_id", DataType::Int32, true),
+                Field::new("index_field_name", DataType::Utf8, true),
+            ]))
+        })
+        .clone()
+}
+
+fn dv_ranges_data_type() -> DataType {
+    DataType::List(Arc::new(Field::new(
+        "item",
+        DataType::Struct(dv_meta_fields()),
+        true,
+    )))
+}
+
+fn dv_meta_fields() -> Fields {
+    vec![
+        Arc::new(Field::new("f0", DataType::Utf8, false)),
+        Arc::new(Field::new("f1", DataType::Int32, false)),
+        Arc::new(Field::new("f2", DataType::Int32, false)),
+        Arc::new(Field::new("_CARDINALITY", DataType::Int64, true)),
+    ]
+    .into()
+}
+
+#[derive(Debug)]
+struct TableIndexesTable {
+    table: Table,
+}
+
+#[async_trait]
+impl TableProvider for TableIndexesTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        table_indexes_schema()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        let table = self.table.clone();
+        let entries =
+            crate::runtime::await_with_runtime(async move { 
collect_index_entries(&table).await })
+                .await
+                .map_err(to_datafusion_error)?;
+
+        let partition_fields = self.table.schema().partition_fields();
+        let fields = self.table.schema().fields();
+        let n = entries.len();
+        let mut partitions: Vec<Option<String>> = Vec::with_capacity(n);
+        let mut buckets = Vec::with_capacity(n);
+        let mut index_types = Vec::with_capacity(n);
+        let mut file_names = Vec::with_capacity(n);
+        let mut file_sizes = Vec::with_capacity(n);
+        let mut row_counts = Vec::with_capacity(n);
+        let mut dv_ranges = dv_ranges_builder();
+        let mut row_range_starts: Vec<Option<i64>> = Vec::with_capacity(n);
+        let mut row_range_ends: Vec<Option<i64>> = Vec::with_capacity(n);
+        let mut index_field_ids: Vec<Option<i32>> = Vec::with_capacity(n);
+        let mut index_field_names: Vec<Option<String>> = Vec::with_capacity(n);
+
+        for entry in &entries {
+            let index_file = &entry.index_file;
+            partitions.push(Some(format_partition(&entry.partition, 
&partition_fields)?));
+            buckets.push(entry.bucket);
+            index_types.push(index_file.index_type.as_str());
+            file_names.push(index_file.file_name.as_str());
+            file_sizes.push(i64::from(index_file.file_size));
+            row_counts.push(i64::from(index_file.row_count));
+            append_dv_ranges(
+                &mut dv_ranges,
+                index_file
+                    .deletion_vectors_ranges
+                    .as_ref()
+                    .map(|ranges| ranges.iter()),
+            );
+
+            if let Some(global_meta) = &index_file.global_index_meta {
+                row_range_starts.push(Some(global_meta.row_range_start));
+                row_range_ends.push(Some(global_meta.row_range_end));
+                index_field_ids.push(Some(global_meta.index_field_id));
+                index_field_names.push(
+                    fields
+                        .iter()
+                        .find(|field| field.id() == global_meta.index_field_id)
+                        .map(|field| field.name().to_string()),
+                );
+            } else {
+                row_range_starts.push(None);
+                row_range_ends.push(None);
+                index_field_ids.push(None);
+                index_field_names.push(None);
+            }
+        }
+
+        let schema = table_indexes_schema();
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(StringArray::from(partitions)),
+                Arc::new(Int32Array::from(buckets)),
+                Arc::new(StringArray::from(index_types)),
+                Arc::new(StringArray::from(file_names)),
+                Arc::new(Int64Array::from(file_sizes)),
+                Arc::new(Int64Array::from(row_counts)),
+                Arc::new(dv_ranges.finish()) as ArrayRef,
+                Arc::new(Int64Array::from(row_range_starts)),
+                Arc::new(Int64Array::from(row_range_ends)),
+                Arc::new(Int32Array::from(index_field_ids)),
+                Arc::new(StringArray::from(index_field_names)),
+            ],
+        )?;
+
+        Ok(MemorySourceConfig::try_new_exec(
+            &[vec![batch]],
+            schema,
+            projection.cloned(),
+        )?)
+    }
+}
+
+async fn collect_index_entries(table: &Table) -> 
paimon::Result<Vec<IndexManifestEntry>> {
+    let file_io = table.file_io();
+    let sm = SnapshotManager::new(file_io.clone(), 
table.location().to_string());
+    let snapshot = match sm.get_latest_snapshot().await? {
+        Some(s) => s,
+        None => return Ok(Vec::new()),
+    };
+    let Some(index_manifest_name) = snapshot.index_manifest() else {
+        return Ok(Vec::new());
+    };
+
+    let path = sm.manifest_path(index_manifest_name);
+    if !file_io.exists(&path).await? {
+        return Ok(Vec::new());
+    }
+
+    let entries = IndexManifest::read(file_io, &path).await?;
+    Ok(visible_index_entries(entries))
+}
+
+fn visible_index_entries(entries: Vec<IndexManifestEntry>) -> 
Vec<IndexManifestEntry> {
+    entries
+        .into_iter()
+        .filter(|entry| entry.kind == FileKind::Add)
+        .collect()
+}
+
+fn format_partition(partition: &[u8], partition_fields: &[DataField]) -> 
DFResult<String> {
+    let row = 
BinaryRow::from_serialized_bytes(partition).map_err(to_datafusion_error)?;
+    format_row_as_java_cast_string(&row, 
partition_fields).map_err(to_datafusion_error)
+}
+
+fn dv_ranges_builder() -> ListBuilder<StructBuilder> {
+    let fields = dv_meta_fields();
+    let element_field = Arc::new(Field::new("item", 
DataType::Struct(fields.clone()), true));
+    let struct_builder = StructBuilder::new(
+        fields,
+        vec![
+            Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
+            Box::new(Int32Builder::new()) as Box<dyn ArrayBuilder>,
+            Box::new(Int32Builder::new()) as Box<dyn ArrayBuilder>,
+            Box::new(Int64Builder::new()) as Box<dyn ArrayBuilder>,
+        ],
+    );
+    ListBuilder::new(struct_builder).with_field(element_field)
+}
+
+fn append_dv_ranges<'a, I>(builder: &mut ListBuilder<StructBuilder>, ranges: 
Option<I>)
+where
+    I: IntoIterator<Item = (&'a String, &'a DeletionVectorMeta)>,
+{
+    let Some(ranges) = ranges else {
+        builder.append(false);
+        return;
+    };
+
+    for (data_file_name, meta) in ranges {
+        let values = builder.values();
+        values
+            .field_builder::<StringBuilder>(0)
+            .expect("dv f0 builder")
+            .append_value(data_file_name);
+        values
+            .field_builder::<Int32Builder>(1)
+            .expect("dv f1 builder")
+            .append_value(meta.offset);
+        values
+            .field_builder::<Int32Builder>(2)
+            .expect("dv f2 builder")
+            .append_value(meta.length);
+        let cardinality_builder = values
+            .field_builder::<Int64Builder>(3)
+            .expect("dv _CARDINALITY builder");
+        if let Some(cardinality) = meta.cardinality {
+            cardinality_builder.append_value(cardinality);
+        } else {
+            cardinality_builder.append_null();
+        }
+        values.append(true);
+    }
+    builder.append(true);
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use paimon::spec::IndexFileMeta;
+
+    fn index_entry(kind: FileKind, file_name: &str) -> IndexManifestEntry {
+        IndexManifestEntry {
+            kind,
+            partition: vec![],
+            bucket: 0,
+            index_file: IndexFileMeta {
+                index_type: "DELETION_VECTORS".to_string(),
+                file_name: file_name.to_string(),
+                file_size: 1,
+                row_count: 1,
+                deletion_vectors_ranges: None,
+                global_index_meta: None,
+            },
+            version: 1,
+        }
+    }
+
+    #[test]
+    fn visible_index_entries_skips_delete_entries() {
+        let entries = vec![
+            index_entry(FileKind::Add, "live.idx"),
+            index_entry(FileKind::Delete, "deleted.idx"),
+        ];
+
+        let visible = visible_index_entries(entries);
+
+        assert_eq!(visible.len(), 1);
+        assert_eq!(visible[0].kind, FileKind::Add);
+        assert_eq!(visible[0].index_file.file_name, "live.idx");
+    }
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs 
b/crates/integrations/datafusion/tests/system_tables.rs
index 078f18b..e6d1717 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -21,8 +21,10 @@ mod common;
 
 use std::sync::Arc;
 
-use datafusion::arrow::array::{Array, BooleanArray, Int32Array, Int64Array, 
StringArray};
-use datafusion::arrow::datatypes::{DataType, TimeUnit};
+use datafusion::arrow::array::{
+    Array, BooleanArray, Int32Array, Int64Array, ListArray, StringArray,
+};
+use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
 use datafusion::arrow::record_batch::RecordBatch;
 use paimon::catalog::Identifier;
 use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
@@ -115,6 +117,176 @@ async fn test_options_system_table() {
     assert_eq!(actual, expected, "$options rows should match table options");
 }
 
+#[tokio::test]
+async fn test_table_indexes_system_table() {
+    let (ctx, catalog, _tmp) = create_context().await;
+    let sql = format!("SELECT * FROM 
paimon.default.{FIXTURE_TABLE}$table_indexes");
+    let batches = run_sql(&ctx, &sql).await;
+
+    assert!(!batches.is_empty(), "$table_indexes should return ≥1 batch");
+
+    let dv_meta_fields = vec![
+        Arc::new(Field::new("f0", DataType::Utf8, false)),
+        Arc::new(Field::new("f1", DataType::Int32, false)),
+        Arc::new(Field::new("f2", DataType::Int32, false)),
+        Arc::new(Field::new("_CARDINALITY", DataType::Int64, true)),
+    ]
+    .into();
+    let expected_columns = [
+        ("partition", DataType::Utf8),
+        ("bucket", DataType::Int32),
+        ("index_type", DataType::Utf8),
+        ("file_name", DataType::Utf8),
+        ("file_size", DataType::Int64),
+        ("row_count", DataType::Int64),
+        (
+            "dv_ranges",
+            DataType::List(Arc::new(Field::new(
+                "item",
+                DataType::Struct(dv_meta_fields),
+                true,
+            ))),
+        ),
+        ("row_range_start", DataType::Int64),
+        ("row_range_end", DataType::Int64),
+        ("index_field_id", DataType::Int32),
+        ("index_field_name", DataType::Utf8),
+    ];
+    let arrow_schema = batches[0].schema();
+    for (i, (name, dtype)) in expected_columns.iter().enumerate() {
+        let field = arrow_schema.field(i);
+        assert_eq!(field.name(), name, "column {i} name");
+        assert_eq!(field.data_type(), dtype, "column {i} type");
+    }
+
+    let identifier = Identifier::new("default".to_string(), 
FIXTURE_TABLE.to_string());
+    let table = catalog
+        .get_table(&identifier)
+        .await
+        .expect("fixture table should load");
+    let sm =
+        paimon::table::SnapshotManager::new(table.file_io().clone(), 
table.location().to_string());
+    let latest = sm
+        .get_latest_snapshot()
+        .await
+        .unwrap()
+        .expect("fixture has snapshots");
+    let index_manifest = latest
+        .index_manifest()
+        .expect("fixture should have an index manifest");
+    let expected_entries =
+        paimon::spec::IndexManifest::read(table.file_io(), 
&sm.manifest_path(index_manifest))
+            .await
+            .unwrap()
+            .into_iter()
+            .filter(|entry| entry.kind == paimon::spec::FileKind::Add)
+            .collect::<Vec<_>>();
+
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    assert_eq!(
+        total_rows,
+        expected_entries.len(),
+        "$table_indexes rows should match the latest index manifest entries"
+    );
+
+    let batch = &batches[0];
+    let partitions = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .expect("partition is Utf8");
+    let buckets = batch
+        .column(1)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .expect("bucket is Int32");
+    let index_types = batch
+        .column(2)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .expect("index_type is Utf8");
+    let file_names = batch
+        .column(3)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .expect("file_name is Utf8");
+    let file_sizes = batch
+        .column(4)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .expect("file_size is Int64");
+    let row_counts = batch
+        .column(5)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .expect("row_count is Int64");
+    let dv_ranges = batch
+        .column(6)
+        .as_any()
+        .downcast_ref::<ListArray>()
+        .expect("dv_ranges is ListArray");
+    let row_range_starts = batch
+        .column(7)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .expect("row_range_start is Int64");
+    let row_range_ends = batch
+        .column(8)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .expect("row_range_end is Int64");
+    let index_field_ids = batch
+        .column(9)
+        .as_any()
+        .downcast_ref::<Int32Array>()
+        .expect("index_field_id is Int32");
+    let index_field_names = batch
+        .column(10)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .expect("index_field_name is Utf8");
+
+    for (row, expected) in expected_entries.iter().enumerate() {
+        assert!(!partitions.is_null(row), "partition should be non-null");
+        assert_eq!(buckets.value(row), expected.bucket);
+        assert_eq!(index_types.value(row), expected.index_file.index_type);
+        assert_eq!(file_names.value(row), expected.index_file.file_name);
+        assert_eq!(
+            file_sizes.value(row),
+            i64::from(expected.index_file.file_size)
+        );
+        assert_eq!(
+            row_counts.value(row),
+            i64::from(expected.index_file.row_count)
+        );
+        assert_eq!(
+            dv_ranges.is_null(row),
+            expected.index_file.deletion_vectors_ranges.is_none()
+        );
+
+        if let Some(global_meta) = &expected.index_file.global_index_meta {
+            assert_eq!(row_range_starts.value(row), 
global_meta.row_range_start);
+            assert_eq!(row_range_ends.value(row), global_meta.row_range_end);
+            assert_eq!(index_field_ids.value(row), global_meta.index_field_id);
+            let expected_field_name = table
+                .schema()
+                .fields()
+                .iter()
+                .find(|field| field.id() == global_meta.index_field_id)
+                .map(|field| field.name());
+            assert_eq!(
+                (!index_field_names.is_null(row)).then(|| 
index_field_names.value(row)),
+                expected_field_name
+            );
+        } else {
+            assert!(row_range_starts.is_null(row));
+            assert!(row_range_ends.is_null(row));
+            assert!(index_field_ids.is_null(row));
+            assert!(index_field_names.is_null(row));
+        }
+    }
+}
+
 #[tokio::test]
 async fn test_unknown_system_table_name_returns_not_found() {
     let (ctx, _catalog, _tmp) = create_context().await;

Reply via email to