This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 75c5f34d Handle pagination via `next-page-token` in REST Catalog
(#1097)
75c5f34d is described below
commit 75c5f34d034223a2e3b3268782816b07b028e2eb
Author: Phillip LeBlanc <[email protected]>
AuthorDate: Tue Mar 18 18:37:14 2025 +0900
Handle pagination via `next-page-token` in REST Catalog (#1097)
## Which issue does this PR close?
- Closes #1096
## What changes are included in this PR?
Implements pagination on the `GET /v1/namespaces` and `GET
/v1/namespaces/my_namespace/tables` APIs by looking for a
`next-page-token` and adding a `pageToken` query parameter on the next
request with that token.
This is allowed on the Iceberg Catalog REST API spec:
ListTablesResponse:
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L3898
ListNamespacesResponse:
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L3909
PageToken:
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L2009C5-L2029C40
> PageToken:
> description:
> An opaque token that allows clients to make use of pagination for list
APIs
> (e.g. ListTables). Clients may initiate the first paginated request by
sending an empty
> query parameter `pageToken` to the server.
> Servers that support pagination should identify the `pageToken`
parameter and return a
> `next-page-token` in the response if there are more results available.
After the initial
> request, the value of `next-page-token` from each response must be
used as the `pageToken`
> parameter value for the next request. The server must return `null`
value for the
> `next-page-token` in the last response.
> Servers that support pagination must return all results in a single
response with the value
> of `next-page-token` set to `null` if the query parameter `pageToken`
is not set in the
> request.
> Servers that do not support pagination should ignore the `pageToken`
parameter and return
> all results in a single response. The `next-page-token` must be
omitted from the response.
> Clients must interpret either `null` or missing response value of
`next-page-token` as
> the end of the listing results.
## Are these changes tested?
Yes, unit tests have been added to cover pagination with one page and
multiple pages for both APIs
---
crates/catalog/rest/src/catalog.rs | 479 +++++++++++++++++++++++++++++++++----
crates/catalog/rest/src/types.rs | 6 +
2 files changed, 445 insertions(+), 40 deletions(-)
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index 8df5a4bd..e15fbb8b 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -321,34 +321,54 @@ impl Catalog for RestCatalog {
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
let context = self.context().await?;
+ let endpoint = context.config.namespaces_endpoint();
+ let mut namespaces = Vec::new();
+ let mut next_token = None;
- let mut request_builder = context
- .client
- .request(Method::GET, context.config.namespaces_endpoint());
- // Filter on `parent={namespace}` if a parent namespace exists.
- if let Some(namespace) = parent {
- request_builder = request_builder.query(&[("parent",
namespace.to_url_string())]);
- }
- let request = request_builder.build()?;
+ loop {
+ let mut request = context.client.request(Method::GET,
endpoint.clone());
- let http_response = context.client.query_catalog(request).await?;
+ // Filter on `parent={namespace}` if a parent namespace exists.
+ if let Some(ns) = parent {
+ request = request.query(&[("parent", ns.to_url_string())]);
+ }
- match http_response.status() {
- StatusCode::OK => {
- let response =
-
deserialize_catalog_response::<ListNamespaceResponse>(http_response).await?;
- response
- .namespaces
- .into_iter()
- .map(NamespaceIdent::from_vec)
- .collect::<Result<Vec<NamespaceIdent>>>()
+ if let Some(token) = next_token {
+ request = request.query(&[("pageToken", token)]);
+ }
+
+ let http_response =
context.client.query_catalog(request.build()?).await?;
+
+ match http_response.status() {
+ StatusCode::OK => {
+ let response =
+
deserialize_catalog_response::<ListNamespaceResponse>(http_response)
+ .await?;
+
+ let ns_identifiers = response
+ .namespaces
+ .into_iter()
+ .map(NamespaceIdent::from_vec)
+ .collect::<Result<Vec<NamespaceIdent>>>()?;
+
+ namespaces.extend(ns_identifiers);
+
+ match response.next_page_token {
+ Some(token) => next_token = Some(token),
+ None => break,
+ }
+ }
+ StatusCode::NOT_FOUND => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "The parent parameter of the namespace provided does
not exist",
+ ));
+ }
+ _ => return
Err(deserialize_unexpected_catalog_error(http_response).await),
}
- StatusCode::NOT_FOUND => Err(Error::new(
- ErrorKind::Unexpected,
- "The parent parameter of the namespace provided does not
exist",
- )),
- _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
}
+
+ Ok(namespaces)
}
async fn create_namespace(
@@ -457,26 +477,42 @@ impl Catalog for RestCatalog {
async fn list_tables(&self, namespace: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
let context = self.context().await?;
+ let endpoint = context.config.tables_endpoint(namespace);
+ let mut identifiers = Vec::new();
+ let mut next_token = None;
- let request = context
- .client
- .request(Method::GET, context.config.tables_endpoint(namespace))
- .build()?;
+ loop {
+ let mut request = context.client.request(Method::GET,
endpoint.clone());
- let http_response = context.client.query_catalog(request).await?;
+ if let Some(token) = next_token {
+ request = request.query(&[("pageToken", token)]);
+ }
- match http_response.status() {
- StatusCode::OK => Ok(
-
deserialize_catalog_response::<ListTableResponse>(http_response)
- .await?
- .identifiers,
- ),
- StatusCode::NOT_FOUND => Err(Error::new(
- ErrorKind::Unexpected,
- "Tried to list tables of a namespace that does not exist",
- )),
- _ =>
Err(deserialize_unexpected_catalog_error(http_response).await),
+ let http_response =
context.client.query_catalog(request.build()?).await?;
+
+ match http_response.status() {
+ StatusCode::OK => {
+ let response =
+
deserialize_catalog_response::<ListTableResponse>(http_response).await?;
+
+ identifiers.extend(response.identifiers);
+
+ match response.next_page_token {
+ Some(token) => next_token = Some(token),
+ None => break,
+ }
+ }
+ StatusCode::NOT_FOUND => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Tried to list tables of a namespace that does not
exist",
+ ));
+ }
+ _ => return
Err(deserialize_unexpected_catalog_error(http_response).await),
+ }
}
+
+ Ok(identifiers)
}
/// Create a new table inside the namespace.
@@ -566,7 +602,6 @@ impl Catalog for RestCatalog {
/// If there are any config properties that are present in both the
response from the REST
/// server and the config provided when creating this `RestCatalog`
instance, then the value
/// provided locally to the `RestCatalog` will take precedence.
-
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
let context = self.context().await?;
@@ -1085,6 +1120,166 @@ mod tests {
list_ns_mock.assert_async().await;
}
+ #[tokio::test]
+ async fn test_list_namespace_with_pagination() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let list_ns_mock_page1 = server
+ .mock("GET", "/v1/namespaces")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns1", "ns11"],
+ ["ns2"]
+ ],
+ "next-page-token": "token123"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let list_ns_mock_page2 = server
+ .mock("GET", "/v1/namespaces?pageToken=token123")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns3"],
+ ["ns4", "ns41"]
+ ]
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+ let namespaces = catalog.list_namespaces(None).await.unwrap();
+
+ let expected_ns = vec![
+ NamespaceIdent::from_vec(vec!["ns1".to_string(),
"ns11".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns4".to_string(),
"ns41".to_string()]).unwrap(),
+ ];
+
+ assert_eq!(expected_ns, namespaces);
+
+ config_mock.assert_async().await;
+ list_ns_mock_page1.assert_async().await;
+ list_ns_mock_page2.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_list_namespace_with_multiple_pages() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ // Page 1
+ let list_ns_mock_page1 = server
+ .mock("GET", "/v1/namespaces")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns1", "ns11"],
+ ["ns2"]
+ ],
+ "next-page-token": "page2"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 2
+ let list_ns_mock_page2 = server
+ .mock("GET", "/v1/namespaces?pageToken=page2")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns3"],
+ ["ns4", "ns41"]
+ ],
+ "next-page-token": "page3"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 3
+ let list_ns_mock_page3 = server
+ .mock("GET", "/v1/namespaces?pageToken=page3")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns5", "ns51", "ns511"]
+ ],
+ "next-page-token": "page4"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 4
+ let list_ns_mock_page4 = server
+ .mock("GET", "/v1/namespaces?pageToken=page4")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns6"],
+ ["ns7"]
+ ],
+ "next-page-token": "page5"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 5 (final page)
+ let list_ns_mock_page5 = server
+ .mock("GET", "/v1/namespaces?pageToken=page5")
+ .with_body(
+ r#"{
+ "namespaces": [
+ ["ns8", "ns81"]
+ ]
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+ let namespaces = catalog.list_namespaces(None).await.unwrap();
+
+ let expected_ns = vec![
+ NamespaceIdent::from_vec(vec!["ns1".to_string(),
"ns11".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns4".to_string(),
"ns41".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec![
+ "ns5".to_string(),
+ "ns51".to_string(),
+ "ns511".to_string(),
+ ])
+ .unwrap(),
+ NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
+ NamespaceIdent::from_vec(vec!["ns8".to_string(),
"ns81".to_string()]).unwrap(),
+ ];
+
+ assert_eq!(expected_ns, namespaces);
+
+ // Verify all page requests were made
+ config_mock.assert_async().await;
+ list_ns_mock_page1.assert_async().await;
+ list_ns_mock_page2.assert_async().await;
+ list_ns_mock_page3.assert_async().await;
+ list_ns_mock_page4.assert_async().await;
+ list_ns_mock_page5.assert_async().await;
+ }
+
#[tokio::test]
async fn test_create_namespace() {
let mut server = Server::new_async().await;
@@ -1252,6 +1447,210 @@ mod tests {
list_tables_mock.assert_async().await;
}
+ #[tokio::test]
+ async fn test_list_tables_with_pagination() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ let list_tables_mock_page1 = server
+ .mock("GET", "/v1/namespaces/ns1/tables")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table1"
+ },
+ {
+ "namespace": ["ns1"],
+ "name": "table2"
+ }
+ ],
+ "next-page-token": "token456"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let list_tables_mock_page2 = server
+ .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table3"
+ },
+ {
+ "namespace": ["ns1"],
+ "name": "table4"
+ }
+ ]
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+ let tables = catalog
+ .list_tables(&NamespaceIdent::new("ns1".to_string()))
+ .await
+ .unwrap();
+
+ let expected_tables = vec![
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table1".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table2".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table3".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table4".to_string()),
+ ];
+
+ assert_eq!(tables, expected_tables);
+
+ config_mock.assert_async().await;
+ list_tables_mock_page1.assert_async().await;
+ list_tables_mock_page2.assert_async().await;
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_with_multiple_pages() {
+ let mut server = Server::new_async().await;
+
+ let config_mock = create_config_mock(&mut server).await;
+
+ // Page 1
+ let list_tables_mock_page1 = server
+ .mock("GET", "/v1/namespaces/ns1/tables")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table1"
+ },
+ {
+ "namespace": ["ns1"],
+ "name": "table2"
+ }
+ ],
+ "next-page-token": "page2"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 2
+ let list_tables_mock_page2 = server
+ .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table3"
+ },
+ {
+ "namespace": ["ns1"],
+ "name": "table4"
+ }
+ ],
+ "next-page-token": "page3"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 3
+ let list_tables_mock_page3 = server
+ .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table5"
+ }
+ ],
+ "next-page-token": "page4"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 4
+ let list_tables_mock_page4 = server
+ .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table6"
+ },
+ {
+ "namespace": ["ns1"],
+ "name": "table7"
+ }
+ ],
+ "next-page-token": "page5"
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ // Page 5 (final page)
+ let list_tables_mock_page5 = server
+ .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
+ .with_status(200)
+ .with_body(
+ r#"{
+ "identifiers": [
+ {
+ "namespace": ["ns1"],
+ "name": "table8"
+ }
+ ]
+ }"#,
+ )
+ .create_async()
+ .await;
+
+ let catalog =
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+ let tables = catalog
+ .list_tables(&NamespaceIdent::new("ns1".to_string()))
+ .await
+ .unwrap();
+
+ let expected_tables = vec![
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table1".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table2".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table3".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table4".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table5".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table6".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table7".to_string()),
+ TableIdent::new(NamespaceIdent::new("ns1".to_string()),
"table8".to_string()),
+ ];
+
+ assert_eq!(tables, expected_tables);
+
+ // Verify all page requests were made
+ config_mock.assert_async().await;
+ list_tables_mock_page1.assert_async().await;
+ list_tables_mock_page2.assert_async().await;
+ list_tables_mock_page3.assert_async().await;
+ list_tables_mock_page4.assert_async().await;
+ list_tables_mock_page5.assert_async().await;
+ }
+
#[tokio::test]
async fn test_drop_tables() {
let mut server = Server::new_async().await;
diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs
index 3c0349cc..4ae2907a 100644
--- a/crates/catalog/rest/src/types.rs
+++ b/crates/catalog/rest/src/types.rs
@@ -122,8 +122,11 @@ impl From<&Namespace> for NamespaceSerde {
}
#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
pub(super) struct ListNamespaceResponse {
pub(super) namespaces: Vec<Vec<String>>,
+ #[serde(default)]
+ pub(super) next_page_token: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -140,8 +143,11 @@ pub(super) struct UpdateNamespacePropsResponse {
}
#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
pub(super) struct ListTableResponse {
pub(super) identifiers: Vec<TableIdent>,
+ #[serde(default)]
+ pub(super) next_page_token: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]