This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d9597f7 feat(datafusion): Add $table_indexes system table (#327)
d9597f7 is described below
commit d9597f78bcf297b38181a9914245670203b95e2d
Author: jerry <[email protected]>
AuthorDate: Tue May 19 15:51:00 2026 +0800
feat(datafusion): Add $table_indexes system table (#327)
---
.../datafusion/src/system_tables/mod.rs | 6 +
.../datafusion/src/system_tables/table_indexes.rs | 311 +++++++++++++++++++++
.../integrations/datafusion/tests/system_tables.rs | 176 +++++++++++-
3 files changed, 491 insertions(+), 2 deletions(-)
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs
b/crates/integrations/datafusion/src/system_tables/mod.rs
index 34587f8..af3b2fe 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -38,6 +38,7 @@ mod referenced_files_size;
mod row_string_cast;
mod schemas;
mod snapshots;
+mod table_indexes;
mod tags;
type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
@@ -53,6 +54,7 @@ const TABLES: &[(&str, Builder)] = &[
("referenced_files_size", referenced_files_size::build),
("schemas", schemas::build),
("snapshots", snapshots::build),
+ ("table_indexes", table_indexes::build),
("tags", tags::build),
];
@@ -65,6 +67,7 @@ const SYSTEM_TABLE_NAMES: &[&str] = &[
"referenced_files_size",
"schemas",
"snapshots",
+ "table_indexes",
"tags",
];
@@ -190,6 +193,9 @@ mod tests {
assert!(is_registered("manifests"));
assert!(is_registered("Manifests"));
assert!(is_registered("MANIFESTS"));
+ assert!(is_registered("table_indexes"));
+ assert!(is_registered("Table_Indexes"));
+ assert!(is_registered("TABLE_INDEXES"));
assert!(is_registered("partitions"));
assert!(is_registered("Partitions"));
assert!(is_registered("PARTITIONS"));
diff --git a/crates/integrations/datafusion/src/system_tables/table_indexes.rs
b/crates/integrations/datafusion/src/system_tables/table_indexes.rs
new file mode 100644
index 0000000..25bdd3b
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/table_indexes.rs
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java
[TableIndexesTable](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java).
+
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::builder::{
+ ArrayBuilder, Int32Builder, Int64Builder, ListBuilder, StringBuilder,
StructBuilder,
+};
+use datafusion::arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch,
StringArray};
+use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::spec::{
+ BinaryRow, DataField, DeletionVectorMeta, FileKind, IndexManifest,
IndexManifestEntry,
+};
+use paimon::table::{SnapshotManager, Table};
+
+use super::row_string_cast::format_row_as_java_cast_string;
+use crate::error::to_datafusion_error;
+
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+ Ok(Arc::new(TableIndexesTable { table }))
+}
+
+fn table_indexes_schema() -> SchemaRef {
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("partition", DataType::Utf8, true),
+ Field::new("bucket", DataType::Int32, false),
+ Field::new("index_type", DataType::Utf8, false),
+ Field::new("file_name", DataType::Utf8, false),
+ Field::new("file_size", DataType::Int64, false),
+ Field::new("row_count", DataType::Int64, false),
+ Field::new("dv_ranges", dv_ranges_data_type(), true),
+ Field::new("row_range_start", DataType::Int64, true),
+ Field::new("row_range_end", DataType::Int64, true),
+ Field::new("index_field_id", DataType::Int32, true),
+ Field::new("index_field_name", DataType::Utf8, true),
+ ]))
+ })
+ .clone()
+}
+
+fn dv_ranges_data_type() -> DataType {
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Struct(dv_meta_fields()),
+ true,
+ )))
+}
+
+fn dv_meta_fields() -> Fields {
+ vec![
+ Arc::new(Field::new("f0", DataType::Utf8, false)),
+ Arc::new(Field::new("f1", DataType::Int32, false)),
+ Arc::new(Field::new("f2", DataType::Int32, false)),
+ Arc::new(Field::new("_CARDINALITY", DataType::Int64, true)),
+ ]
+ .into()
+}
+
+#[derive(Debug)]
+struct TableIndexesTable {
+ table: Table,
+}
+
+#[async_trait]
+impl TableProvider for TableIndexesTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ table_indexes_schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let table = self.table.clone();
+ let entries =
+ crate::runtime::await_with_runtime(async move {
collect_index_entries(&table).await })
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let partition_fields = self.table.schema().partition_fields();
+ let fields = self.table.schema().fields();
+ let n = entries.len();
+ let mut partitions: Vec<Option<String>> = Vec::with_capacity(n);
+ let mut buckets = Vec::with_capacity(n);
+ let mut index_types = Vec::with_capacity(n);
+ let mut file_names = Vec::with_capacity(n);
+ let mut file_sizes = Vec::with_capacity(n);
+ let mut row_counts = Vec::with_capacity(n);
+ let mut dv_ranges = dv_ranges_builder();
+ let mut row_range_starts: Vec<Option<i64>> = Vec::with_capacity(n);
+ let mut row_range_ends: Vec<Option<i64>> = Vec::with_capacity(n);
+ let mut index_field_ids: Vec<Option<i32>> = Vec::with_capacity(n);
+ let mut index_field_names: Vec<Option<String>> = Vec::with_capacity(n);
+
+ for entry in &entries {
+ let index_file = &entry.index_file;
+ partitions.push(Some(format_partition(&entry.partition,
&partition_fields)?));
+ buckets.push(entry.bucket);
+ index_types.push(index_file.index_type.as_str());
+ file_names.push(index_file.file_name.as_str());
+ file_sizes.push(i64::from(index_file.file_size));
+ row_counts.push(i64::from(index_file.row_count));
+ append_dv_ranges(
+ &mut dv_ranges,
+ index_file
+ .deletion_vectors_ranges
+ .as_ref()
+ .map(|ranges| ranges.iter()),
+ );
+
+ if let Some(global_meta) = &index_file.global_index_meta {
+ row_range_starts.push(Some(global_meta.row_range_start));
+ row_range_ends.push(Some(global_meta.row_range_end));
+ index_field_ids.push(Some(global_meta.index_field_id));
+ index_field_names.push(
+ fields
+ .iter()
+ .find(|field| field.id() == global_meta.index_field_id)
+ .map(|field| field.name().to_string()),
+ );
+ } else {
+ row_range_starts.push(None);
+ row_range_ends.push(None);
+ index_field_ids.push(None);
+ index_field_names.push(None);
+ }
+ }
+
+ let schema = table_indexes_schema();
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(StringArray::from(partitions)),
+ Arc::new(Int32Array::from(buckets)),
+ Arc::new(StringArray::from(index_types)),
+ Arc::new(StringArray::from(file_names)),
+ Arc::new(Int64Array::from(file_sizes)),
+ Arc::new(Int64Array::from(row_counts)),
+ Arc::new(dv_ranges.finish()) as ArrayRef,
+ Arc::new(Int64Array::from(row_range_starts)),
+ Arc::new(Int64Array::from(row_range_ends)),
+ Arc::new(Int32Array::from(index_field_ids)),
+ Arc::new(StringArray::from(index_field_names)),
+ ],
+ )?;
+
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema,
+ projection.cloned(),
+ )?)
+ }
+}
+
+async fn collect_index_entries(table: &Table) ->
paimon::Result<Vec<IndexManifestEntry>> {
+ let file_io = table.file_io();
+ let sm = SnapshotManager::new(file_io.clone(),
table.location().to_string());
+ let snapshot = match sm.get_latest_snapshot().await? {
+ Some(s) => s,
+ None => return Ok(Vec::new()),
+ };
+ let Some(index_manifest_name) = snapshot.index_manifest() else {
+ return Ok(Vec::new());
+ };
+
+ let path = sm.manifest_path(index_manifest_name);
+ if !file_io.exists(&path).await? {
+ return Ok(Vec::new());
+ }
+
+ let entries = IndexManifest::read(file_io, &path).await?;
+ Ok(visible_index_entries(entries))
+}
+
+fn visible_index_entries(entries: Vec<IndexManifestEntry>) ->
Vec<IndexManifestEntry> {
+ entries
+ .into_iter()
+ .filter(|entry| entry.kind == FileKind::Add)
+ .collect()
+}
+
+fn format_partition(partition: &[u8], partition_fields: &[DataField]) ->
DFResult<String> {
+ let row =
BinaryRow::from_serialized_bytes(partition).map_err(to_datafusion_error)?;
+ format_row_as_java_cast_string(&row,
partition_fields).map_err(to_datafusion_error)
+}
+
+fn dv_ranges_builder() -> ListBuilder<StructBuilder> {
+ let fields = dv_meta_fields();
+ let element_field = Arc::new(Field::new("item",
DataType::Struct(fields.clone()), true));
+ let struct_builder = StructBuilder::new(
+ fields,
+ vec![
+ Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
+ Box::new(Int32Builder::new()) as Box<dyn ArrayBuilder>,
+ Box::new(Int32Builder::new()) as Box<dyn ArrayBuilder>,
+ Box::new(Int64Builder::new()) as Box<dyn ArrayBuilder>,
+ ],
+ );
+ ListBuilder::new(struct_builder).with_field(element_field)
+}
+
+fn append_dv_ranges<'a, I>(builder: &mut ListBuilder<StructBuilder>, ranges:
Option<I>)
+where
+ I: IntoIterator<Item = (&'a String, &'a DeletionVectorMeta)>,
+{
+ let Some(ranges) = ranges else {
+ builder.append(false);
+ return;
+ };
+
+ for (data_file_name, meta) in ranges {
+ let values = builder.values();
+ values
+ .field_builder::<StringBuilder>(0)
+ .expect("dv f0 builder")
+ .append_value(data_file_name);
+ values
+ .field_builder::<Int32Builder>(1)
+ .expect("dv f1 builder")
+ .append_value(meta.offset);
+ values
+ .field_builder::<Int32Builder>(2)
+ .expect("dv f2 builder")
+ .append_value(meta.length);
+ let cardinality_builder = values
+ .field_builder::<Int64Builder>(3)
+ .expect("dv _CARDINALITY builder");
+ if let Some(cardinality) = meta.cardinality {
+ cardinality_builder.append_value(cardinality);
+ } else {
+ cardinality_builder.append_null();
+ }
+ values.append(true);
+ }
+ builder.append(true);
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use paimon::spec::IndexFileMeta;
+
+ fn index_entry(kind: FileKind, file_name: &str) -> IndexManifestEntry {
+ IndexManifestEntry {
+ kind,
+ partition: vec![],
+ bucket: 0,
+ index_file: IndexFileMeta {
+ index_type: "DELETION_VECTORS".to_string(),
+ file_name: file_name.to_string(),
+ file_size: 1,
+ row_count: 1,
+ deletion_vectors_ranges: None,
+ global_index_meta: None,
+ },
+ version: 1,
+ }
+ }
+
+ #[test]
+ fn visible_index_entries_skips_delete_entries() {
+ let entries = vec![
+ index_entry(FileKind::Add, "live.idx"),
+ index_entry(FileKind::Delete, "deleted.idx"),
+ ];
+
+ let visible = visible_index_entries(entries);
+
+ assert_eq!(visible.len(), 1);
+ assert_eq!(visible[0].kind, FileKind::Add);
+ assert_eq!(visible[0].index_file.file_name, "live.idx");
+ }
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs
b/crates/integrations/datafusion/tests/system_tables.rs
index 078f18b..e6d1717 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -21,8 +21,10 @@ mod common;
use std::sync::Arc;
-use datafusion::arrow::array::{Array, BooleanArray, Int32Array, Int64Array,
StringArray};
-use datafusion::arrow::datatypes::{DataType, TimeUnit};
+use datafusion::arrow::array::{
+ Array, BooleanArray, Int32Array, Int64Array, ListArray, StringArray,
+};
+use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use paimon::catalog::Identifier;
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
@@ -115,6 +117,176 @@ async fn test_options_system_table() {
assert_eq!(actual, expected, "$options rows should match table options");
}
+#[tokio::test]
+async fn test_table_indexes_system_table() {
+ let (ctx, catalog, _tmp) = create_context().await;
+ let sql = format!("SELECT * FROM
paimon.default.{FIXTURE_TABLE}$table_indexes");
+ let batches = run_sql(&ctx, &sql).await;
+
+ assert!(!batches.is_empty(), "$table_indexes should return ≥1 batch");
+
+ let dv_meta_fields = vec![
+ Arc::new(Field::new("f0", DataType::Utf8, false)),
+ Arc::new(Field::new("f1", DataType::Int32, false)),
+ Arc::new(Field::new("f2", DataType::Int32, false)),
+ Arc::new(Field::new("_CARDINALITY", DataType::Int64, true)),
+ ]
+ .into();
+ let expected_columns = [
+ ("partition", DataType::Utf8),
+ ("bucket", DataType::Int32),
+ ("index_type", DataType::Utf8),
+ ("file_name", DataType::Utf8),
+ ("file_size", DataType::Int64),
+ ("row_count", DataType::Int64),
+ (
+ "dv_ranges",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Struct(dv_meta_fields),
+ true,
+ ))),
+ ),
+ ("row_range_start", DataType::Int64),
+ ("row_range_end", DataType::Int64),
+ ("index_field_id", DataType::Int32),
+ ("index_field_name", DataType::Utf8),
+ ];
+ let arrow_schema = batches[0].schema();
+ for (i, (name, dtype)) in expected_columns.iter().enumerate() {
+ let field = arrow_schema.field(i);
+ assert_eq!(field.name(), name, "column {i} name");
+ assert_eq!(field.data_type(), dtype, "column {i} type");
+ }
+
+ let identifier = Identifier::new("default".to_string(),
FIXTURE_TABLE.to_string());
+ let table = catalog
+ .get_table(&identifier)
+ .await
+ .expect("fixture table should load");
+ let sm =
+ paimon::table::SnapshotManager::new(table.file_io().clone(),
table.location().to_string());
+ let latest = sm
+ .get_latest_snapshot()
+ .await
+ .unwrap()
+ .expect("fixture has snapshots");
+ let index_manifest = latest
+ .index_manifest()
+ .expect("fixture should have an index manifest");
+ let expected_entries =
+ paimon::spec::IndexManifest::read(table.file_io(),
&sm.manifest_path(index_manifest))
+ .await
+ .unwrap()
+ .into_iter()
+ .filter(|entry| entry.kind == paimon::spec::FileKind::Add)
+ .collect::<Vec<_>>();
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(
+ total_rows,
+ expected_entries.len(),
+ "$table_indexes rows should match the latest index manifest entries"
+ );
+
+ let batch = &batches[0];
+ let partitions = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("partition is Utf8");
+ let buckets = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("bucket is Int32");
+ let index_types = batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("index_type is Utf8");
+ let file_names = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("file_name is Utf8");
+ let file_sizes = batch
+ .column(4)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("file_size is Int64");
+ let row_counts = batch
+ .column(5)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("row_count is Int64");
+ let dv_ranges = batch
+ .column(6)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .expect("dv_ranges is ListArray");
+ let row_range_starts = batch
+ .column(7)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("row_range_start is Int64");
+ let row_range_ends = batch
+ .column(8)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("row_range_end is Int64");
+ let index_field_ids = batch
+ .column(9)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("index_field_id is Int32");
+ let index_field_names = batch
+ .column(10)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("index_field_name is Utf8");
+
+ for (row, expected) in expected_entries.iter().enumerate() {
+ assert!(!partitions.is_null(row), "partition should be non-null");
+ assert_eq!(buckets.value(row), expected.bucket);
+ assert_eq!(index_types.value(row), expected.index_file.index_type);
+ assert_eq!(file_names.value(row), expected.index_file.file_name);
+ assert_eq!(
+ file_sizes.value(row),
+ i64::from(expected.index_file.file_size)
+ );
+ assert_eq!(
+ row_counts.value(row),
+ i64::from(expected.index_file.row_count)
+ );
+ assert_eq!(
+ dv_ranges.is_null(row),
+ expected.index_file.deletion_vectors_ranges.is_none()
+ );
+
+ if let Some(global_meta) = &expected.index_file.global_index_meta {
+ assert_eq!(row_range_starts.value(row),
global_meta.row_range_start);
+ assert_eq!(row_range_ends.value(row), global_meta.row_range_end);
+ assert_eq!(index_field_ids.value(row), global_meta.index_field_id);
+ let expected_field_name = table
+ .schema()
+ .fields()
+ .iter()
+ .find(|field| field.id() == global_meta.index_field_id)
+ .map(|field| field.name());
+ assert_eq!(
+ (!index_field_names.is_null(row)).then(||
index_field_names.value(row)),
+ expected_field_name
+ );
+ } else {
+ assert!(row_range_starts.is_null(row));
+ assert!(row_range_ends.is_null(row));
+ assert!(index_field_ids.is_null(row));
+ assert!(index_field_names.is_null(row));
+ }
+ }
+}
+
#[tokio::test]
async fn test_unknown_system_table_name_returns_not_found() {
let (ctx, _catalog, _tmp) = create_context().await;