This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 43337f2 feat(spec, api, catalog): add
Catalog::list_partitions[_paged] for RESTCatalog (#288)
43337f2 is described below
commit 43337f216193b29d9aad6339c5e96426d80e8ec7
Author: Jiajia Li <[email protected]>
AuthorDate: Sat May 9 09:16:57 2026 +0800
feat(spec, api, catalog): add Catalog::list_partitions[_paged] for
RESTCatalog (#288)
---
crates/paimon/src/api/api_response.rs | 23 +++++
crates/paimon/src/api/mod.rs | 3 +-
crates/paimon/src/api/resource_paths.rs | 14 ++++
crates/paimon/src/api/rest_api.rs | 62 +++++++++++++-
crates/paimon/src/catalog/filesystem.rs | 69 +++++++++++++++
crates/paimon/src/catalog/mod.rs | 31 ++++++-
crates/paimon/src/catalog/partition_listing.rs | 105 +++++++++++++++++++++++
crates/paimon/src/catalog/rest/rest_catalog.rs | 42 +++++++++-
crates/paimon/src/spec/mod.rs | 2 +
crates/paimon/src/spec/partition.rs | 111 +++++++++++++++++++++++++
10 files changed, 456 insertions(+), 6 deletions(-)
diff --git a/crates/paimon/src/api/api_response.rs
b/crates/paimon/src/api/api_response.rs
index c3169a2..092008a 100644
--- a/crates/paimon/src/api/api_response.rs
+++ b/crates/paimon/src/api/api_response.rs
@@ -258,6 +258,29 @@ impl ListTablesResponse {
}
}
+/// Response for listing partitions.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListPartitionsResponse {
+ /// List of partitions.
+ pub partitions: Option<Vec<crate::spec::Partition>>,
+ /// Token for the next page.
+ pub next_page_token: Option<String>,
+}
+
+impl ListPartitionsResponse {
+ /// Create a new ListPartitionsResponse.
+ pub fn new(
+ partitions: Option<Vec<crate::spec::Partition>>,
+ next_page_token: Option<String>,
+ ) -> Self {
+ Self {
+ partitions,
+ next_page_token,
+ }
+ }
+}
+
/// A paginated list of elements with an optional next page token.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs
index 6307cf8..8584cb0 100644
--- a/crates/paimon/src/api/mod.rs
+++ b/crates/paimon/src/api/mod.rs
@@ -37,7 +37,8 @@ pub use api_request::{
// Re-export response types
pub use api_response::{
AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse,
GetTableResponse,
- GetTableTokenResponse, ListDatabasesResponse, ListTablesResponse,
PagedList,
+ GetTableTokenResponse, ListDatabasesResponse, ListPartitionsResponse,
ListTablesResponse,
+ PagedList,
};
// Re-export error types
diff --git a/crates/paimon/src/api/resource_paths.rs
b/crates/paimon/src/api/resource_paths.rs
index 78dc8b5..45505da 100644
--- a/crates/paimon/src/api/resource_paths.rs
+++ b/crates/paimon/src/api/resource_paths.rs
@@ -32,6 +32,7 @@ impl ResourcePaths {
const DATABASES: &'static str = "databases";
const TABLES: &'static str = "tables";
const TABLE_DETAILS: &'static str = "table-details";
+ const PARTITIONS: &'static str = "partitions";
/// Create a new ResourcePaths with the given prefix.
pub fn new(prefix: &str) -> Self {
@@ -155,6 +156,19 @@ impl ResourcePaths {
RESTUtil::encode_string(table_name)
)
}
+
+ /// Get the partitions endpoint path for a table.
+ pub fn partitions(&self, database_name: &str, table_name: &str) -> String {
+ format!(
+ "{}/{}/{}/{}/{}/{}",
+ self.base_path,
+ Self::DATABASES,
+ RESTUtil::encode_string(database_name),
+ Self::TABLES,
+ RESTUtil::encode_string(table_name),
+ Self::PARTITIONS
+ )
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/api/rest_api.rs
b/crates/paimon/src/api/rest_api.rs
index dcd3d97..5333fca 100644
--- a/crates/paimon/src/api/rest_api.rs
+++ b/crates/paimon/src/api/rest_api.rs
@@ -25,7 +25,7 @@ use std::collections::HashMap;
use crate::api::rest_client::HttpClient;
use crate::catalog::Identifier;
use crate::common::{CatalogOptions, Options};
-use crate::spec::{PartitionStatistics, Schema, Snapshot};
+use crate::spec::{Partition, PartitionStatistics, Schema, Snapshot};
use crate::Result;
use super::api_request::{
@@ -33,7 +33,7 @@ use super::api_request::{
};
use super::api_response::{
ConfigResponse, GetDatabaseResponse, GetTableResponse,
ListDatabasesResponse,
- ListTablesResponse, PagedList,
+ ListPartitionsResponse, ListTablesResponse, PagedList,
};
use super::auth::{AuthProviderFactory, RESTAuthFunction};
use super::resource_paths::ResourcePaths;
@@ -376,6 +376,64 @@ impl RESTApi {
Ok(())
}
+ // ==================== Partition Operations ====================
+
+ /// List all partitions of a table, paging internally.
+ pub async fn list_partitions(&self, identifier: &Identifier) ->
Result<Vec<Partition>> {
+ let database = identifier.database();
+ let table = identifier.object();
+ validate_non_empty_multi(&[(database, "database name"), (table, "table
name")])?;
+
+ let mut results = Vec::new();
+ let mut page_token: Option<String> = None;
+
+ loop {
+ let paged = self
+ .list_partitions_paged(identifier, None, page_token.as_deref())
+ .await?;
+ let is_empty = paged.elements.is_empty();
+ results.extend(paged.elements);
+ page_token = paged.next_page_token;
+ if page_token.is_none() || is_empty {
+ break;
+ }
+ }
+
+ Ok(results)
+ }
+
+ /// List partitions with pagination.
+ pub async fn list_partitions_paged(
+ &self,
+ identifier: &Identifier,
+ max_results: Option<u32>,
+ page_token: Option<&str>,
+ ) -> Result<PagedList<Partition>> {
+ let database = identifier.database();
+ let table = identifier.object();
+ validate_non_empty_multi(&[(database, "database name"), (table, "table
name")])?;
+ let path = self.resource_paths.partitions(database, table);
+ let mut params: Vec<(&str, String)> = Vec::new();
+
+ if let Some(max) = max_results {
+ params.push((Self::MAX_RESULTS, max.to_string()));
+ }
+ if let Some(token) = page_token {
+ params.push((Self::PAGE_TOKEN, token.to_string()));
+ }
+
+ let response: ListPartitionsResponse = if params.is_empty() {
+ self.client.get(&path, None::<&[(&str, &str)]>).await?
+ } else {
+ self.client.get(&path, Some(¶ms)).await?
+ };
+
+ Ok(PagedList::new(
+ response.partitions.unwrap_or_default(),
+ response.next_page_token,
+ ))
+ }
+
// ==================== Token Operations ====================
/// Load table token for data access.
diff --git a/crates/paimon/src/catalog/filesystem.rs
b/crates/paimon/src/catalog/filesystem.rs
index 4dbe8bd..c544a15 100644
--- a/crates/paimon/src/catalog/filesystem.rs
+++ b/crates/paimon/src/catalog/filesystem.rs
@@ -567,4 +567,73 @@ mod tests {
assert_eq!(tables.len(), 2);
assert!(!tables.contains(&"table1".to_string()));
}
+
+ #[tokio::test]
+ async fn test_list_partitions_default_table_not_found_errors() {
+ let (_temp_dir, catalog) = create_test_catalog();
+ let id = Identifier::new("nope_db", "nope_table");
+ let result = catalog.list_partitions(&id).await;
+ assert!(
+ matches!(
+ result,
+ Err(Error::DatabaseNotExist { .. } | Error::TableNotExist { ..
})
+ ),
+ "expected TableNotExist/DatabaseNotExist, got {result:?}"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_partitions_default_empty_table_returns_empty() {
+ let (_temp_dir, catalog) = create_test_catalog();
+ catalog
+ .create_database("db1", false, HashMap::new())
+ .await
+ .unwrap();
+ let id = Identifier::new("db1", "t1");
+ catalog
+ .create_table(&id, testing_schema(), false)
+ .await
+ .unwrap();
+ let parts = catalog.list_partitions(&id).await.unwrap();
+ assert!(
+ parts.is_empty(),
+ "table without snapshots should yield no partitions"
+ );
+ }
+
+ /// Mirrors Java `CatalogTestBase.testListPartitionsPaged`: the default
impl
+ /// returns the same full result regardless of `max_results` /
`page_token`.
+ #[tokio::test]
+ async fn test_list_partitions_paged_default_ignores_max_and_token() {
+ let (_temp_dir, catalog) = create_test_catalog();
+ catalog
+ .create_database("db1", false, HashMap::new())
+ .await
+ .unwrap();
+ let id = Identifier::new("db1", "t1");
+ catalog
+ .create_table(&id, testing_schema(), false)
+ .await
+ .unwrap();
+ for (max_results, page_token) in [
+ (None, None),
+ (Some(2), None),
+ (Some(2), Some("dt=20250101")),
+ (Some(8), None),
+ (Some(8), Some("dt=20250101")),
+ ] {
+ let page = catalog
+ .list_partitions_paged(&id, max_results, page_token)
+ .await
+ .unwrap();
+ assert!(
+ page.elements.is_empty(),
+ "empty table → empty page for max_results={max_results:?},
page_token={page_token:?}"
+ );
+ assert!(
+ page.next_page_token.is_none(),
+ "default impl never paginates"
+ );
+ }
+ }
}
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index 3a9fee5..0c944a4 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -23,6 +23,7 @@
mod database;
mod factory;
mod filesystem;
+mod partition_listing;
mod rest;
use std::collections::HashMap;
@@ -31,6 +32,7 @@ use std::fmt;
pub use database::*;
pub use factory::*;
pub use filesystem::*;
+pub use partition_listing::list_partitions_from_file_system;
pub use rest::*;
use serde::{Deserialize, Serialize};
@@ -114,7 +116,8 @@ impl fmt::Debug for Identifier {
use async_trait::async_trait;
-use crate::spec::{Schema, SchemaChange};
+use crate::api::PagedList;
+use crate::spec::{Partition, Schema, SchemaChange};
use crate::table::Table;
use crate::Result;
@@ -227,4 +230,30 @@ pub trait Catalog: Send + Sync {
changes: Vec<SchemaChange>,
ignore_if_not_exists: bool,
) -> Result<()>;
+
+ /// List partitions for a table.
+ ///
+ /// Default impl scans the table's manifest entries via
+ /// [`list_partitions_from_file_system`], matching Java
+ /// `AbstractCatalog.listPartitions`. Catalogs with metastore-tracked
+ /// partitions (e.g. `RESTCatalog`) override to return audit fields too.
+ async fn list_partitions(&self, identifier: &Identifier) ->
Result<Vec<Partition>> {
+ let table = self.get_table(identifier).await?;
+ list_partitions_from_file_system(&table).await
+ }
+
+ /// Like [`Self::list_partitions`] but paged. Default impl ignores
+ /// `max_results` and `page_token`, returning all partitions in a single
page.
+ /// Catalogs that need true pagination (e.g. `RESTCatalog`) override this.
+ async fn list_partitions_paged(
+ &self,
+ identifier: &Identifier,
+ _max_results: Option<u32>,
+ _page_token: Option<&str>,
+ ) -> Result<PagedList<Partition>> {
+ Ok(PagedList::new(
+ self.list_partitions(identifier).await?,
+ None,
+ ))
+ }
}
diff --git a/crates/paimon/src/catalog/partition_listing.rs
b/crates/paimon/src/catalog/partition_listing.rs
new file mode 100644
index 0000000..b71e30d
--- /dev/null
+++ b/crates/paimon/src/catalog/partition_listing.rs
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java
[CatalogUtils.listPartitionsFromFileSystem](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java).
+//!
+//! Used as the catalog-side fallback when a backend doesn't track partitions
+//! (e.g. `FileSystemCatalog`, or `RESTCatalog` against a non-metastore
server).
+
+use std::collections::{BTreeMap, HashMap};
+
+use crate::spec::{BinaryRow, CoreOptions, Manifest, ManifestList, Partition,
PartitionComputer};
+use crate::table::{SnapshotManager, Table};
+use crate::Result;
+
+/// Scan a table's manifest entries and aggregate them into [`Partition`] rows,
+/// matching the shape catalogs would otherwise return from a metastore.
+pub async fn list_partitions_from_file_system(table: &Table) ->
Result<Vec<Partition>> {
+ let file_io = table.file_io();
+ let sm = SnapshotManager::new(file_io.clone(),
table.location().to_string());
+ let snapshot = match sm.get_latest_snapshot().await? {
+ Some(s) => s,
+ None => return Ok(Vec::new()),
+ };
+
+ let base_path = sm.manifest_path(snapshot.base_manifest_list());
+ let delta_path = sm.manifest_path(snapshot.delta_manifest_list());
+ let (base_metas, delta_metas) = futures::try_join!(
+ ManifestList::read(file_io, &base_path),
+ ManifestList::read(file_io, &delta_path),
+ )?;
+
+ let mut all_entries = Vec::new();
+ for meta in base_metas.into_iter().chain(delta_metas) {
+ let manifest_path = sm.manifest_path(meta.file_name());
+ let entries = Manifest::read(file_io, &manifest_path).await?;
+ all_entries.extend(entries);
+ }
+
+ let schema = table.schema();
+ let core = CoreOptions::new(schema.options());
+ let computer = PartitionComputer::new(
+ schema.partition_keys(),
+ schema.fields(),
+ core.partition_default_name(),
+ core.legacy_partition_name(),
+ )?;
+
+ #[derive(Default)]
+ struct Agg {
+ record_count: i64,
+ file_size: i64,
+ file_count: i64,
+ last_file_creation_time: i64,
+ }
+ let mut buckets: BTreeMap<Vec<u8>, Agg> = BTreeMap::new();
+ for entry in &all_entries {
+ let file = entry.file();
+ let agg = buckets.entry(entry.partition().to_vec()).or_default();
+ agg.record_count += file.row_count;
+ agg.file_size += file.file_size;
+ agg.file_count += 1;
+ if let Some(ct) = file.creation_time {
+ agg.last_file_creation_time =
agg.last_file_creation_time.max(ct.timestamp_millis());
+ }
+ }
+
+ let mut result = Vec::with_capacity(buckets.len());
+ for (bytes, agg) in buckets {
+ let spec = if bytes.is_empty() {
+ HashMap::new()
+ } else {
+ let row = BinaryRow::from_serialized_bytes(&bytes)?;
+ computer.generate_part_values(&row)?.into_iter().collect()
+ };
+ result.push(Partition {
+ spec,
+ record_count: agg.record_count,
+ file_size_in_bytes: agg.file_size,
+ file_count: agg.file_count,
+ last_file_creation_time: agg.last_file_creation_time,
+ total_buckets: 0,
+ done: false,
+ created_at: None,
+ created_by: None,
+ updated_at: None,
+ updated_by: None,
+ options: None,
+ });
+ }
+ Ok(result)
+}
diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs
b/crates/paimon/src/catalog/rest/rest_catalog.rs
index 889b720..09b9354 100644
--- a/crates/paimon/src/catalog/rest/rest_catalog.rs
+++ b/crates/paimon/src/catalog/rest/rest_catalog.rs
@@ -28,11 +28,13 @@ use async_trait::async_trait;
use crate::api::rest_api::RESTApi;
use crate::api::rest_error::RestError;
use crate::api::PagedList;
-use crate::catalog::{Catalog, Database, Identifier, DB_LOCATION_PROP};
+use crate::catalog::{
+ list_partitions_from_file_system, Catalog, Database, Identifier,
DB_LOCATION_PROP,
+};
use crate::common::{CatalogOptions, Options};
use crate::error::Error;
use crate::io::FileIO;
-use crate::spec::{Schema, SchemaChange, TableSchema};
+use crate::spec::{Partition, Schema, SchemaChange, TableSchema};
use crate::table::{RESTEnv, Table};
use crate::Result;
@@ -337,6 +339,42 @@ impl Catalog for RESTCatalog {
message: "Alter table is not yet implemented for REST
catalog".to_string(),
})
}
+
+ async fn list_partitions(&self, identifier: &Identifier) ->
Result<Vec<Partition>> {
+ match self.api.list_partitions(identifier).await {
+ Ok(parts) => Ok(parts),
+ Err(Error::RestApi {
+ source: RestError::NotImplemented { .. },
+ }) => {
+ let table = self.get_table(identifier).await?;
+ list_partitions_from_file_system(&table).await
+ }
+ Err(e) => Err(map_rest_error_for_table(e, identifier)),
+ }
+ }
+
+ async fn list_partitions_paged(
+ &self,
+ identifier: &Identifier,
+ max_results: Option<u32>,
+ page_token: Option<&str>,
+ ) -> Result<PagedList<Partition>> {
+ match self
+ .api
+ .list_partitions_paged(identifier, max_results, page_token)
+ .await
+ {
+ Ok(page) => Ok(page),
+ Err(Error::RestApi {
+ source: RestError::NotImplemented { .. },
+ }) => {
+ let table = self.get_table(identifier).await?;
+ let parts = list_partitions_from_file_system(&table).await?;
+ Ok(PagedList::new(parts, None))
+ }
+ Err(e) => Err(map_rest_error_for_table(e, identifier)),
+ }
+ }
}
// ============================================================================
// Error mapping helpers
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index e9bc085..6ad6533 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -65,6 +65,8 @@ pub(crate) mod avro;
pub(crate) mod stats;
mod types;
pub use types::*;
+mod partition;
+pub use partition::Partition;
mod partition_utils;
pub(crate) use partition_utils::PartitionComputer;
mod predicate;
diff --git a/crates/paimon/src/spec/partition.rs
b/crates/paimon/src/spec/partition.rs
new file mode 100644
index 0000000..478fa0b
--- /dev/null
+++ b/crates/paimon/src/spec/partition.rs
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java
[Partition](https://github.com/apache/paimon/blob/release-1.4/paimon-api/src/main/java/org/apache/paimon/partition/Partition.java)
+//! and its
[PartitionStatistics](https://github.com/apache/paimon/blob/release-1.4/paimon-api/src/main/java/org/apache/paimon/partition/PartitionStatistics.java)
base.
+
+use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
+
+/// A partition with aggregate statistics and audit metadata, as tracked by
the catalog.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Partition {
+ pub spec: HashMap<String, String>,
+ pub record_count: i64,
+ pub file_size_in_bytes: i64,
+ pub file_count: i64,
+ pub last_file_creation_time: i64,
+ #[serde(default)]
+ pub total_buckets: i32,
+ #[serde(default)]
+ pub done: bool,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub created_at: Option<i64>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub created_by: Option<String>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub updated_at: Option<i64>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub updated_by: Option<String>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub options: Option<HashMap<String, String>>,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_partition_roundtrip_minimal() {
+ let original = Partition {
+ spec: HashMap::from([("dt".to_string(),
"2024-01-01".to_string())]),
+ record_count: 100,
+ file_size_in_bytes: 2048,
+ file_count: 3,
+ last_file_creation_time: 1700000000000,
+ total_buckets: 4,
+ done: false,
+ created_at: None,
+ created_by: None,
+ updated_at: None,
+ updated_by: None,
+ options: None,
+ };
+ let json = serde_json::to_string(&original).unwrap();
+ let decoded: Partition = serde_json::from_str(&json).unwrap();
+ assert_eq!(original, decoded);
+ }
+
+ #[test]
+ fn test_partition_roundtrip_full() {
+ let original = Partition {
+ spec: HashMap::from([
+ ("dt".to_string(), "2024-01-01".to_string()),
+ ("hr".to_string(), "12".to_string()),
+ ]),
+ record_count: 100,
+ file_size_in_bytes: 2048,
+ file_count: 3,
+ last_file_creation_time: 1700000000000,
+ total_buckets: 4,
+ done: true,
+ created_at: Some(1699000000000),
+ created_by: Some("user-a".to_string()),
+ updated_at: Some(1700000000000),
+ updated_by: Some("user-b".to_string()),
+ options: Some(HashMap::from([("k".to_string(), "v".to_string())])),
+ };
+ let json = serde_json::to_string(&original).unwrap();
+ let decoded: Partition = serde_json::from_str(&json).unwrap();
+ assert_eq!(original, decoded);
+ }
+
+ #[test]
+ fn test_partition_decode_with_unknown_fields() {
+ let json = r#"{
+ "spec": {"dt": "2024-01-01"},
+ "recordCount": 1,
+ "fileSizeInBytes": 1,
+ "fileCount": 1,
+ "lastFileCreationTime": 0,
+ "newField": "ignored"
+ }"#;
+ let decoded: Partition = serde_json::from_str(json).unwrap();
+ assert_eq!(decoded.spec.get("dt"), Some(&"2024-01-01".to_string()));
+ }
+}