This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 75c5f34d Handle pagination via `next-page-token` in REST Catalog 
(#1097)
75c5f34d is described below

commit 75c5f34d034223a2e3b3268782816b07b028e2eb
Author: Phillip LeBlanc <[email protected]>
AuthorDate: Tue Mar 18 18:37:14 2025 +0900

    Handle pagination via `next-page-token` in REST Catalog (#1097)
    
    ## Which issue does this PR close?
    
    - Closes #1096
    
    ## What changes are included in this PR?
    
    Implements pagination on the `GET /v1/namespaces` and `GET
    /v1/namespaces/my_namespace/tables` APIs by looking for a
    `next-page-token` and adding a `pageToken` query parameter on the next
    request with that token.
    
    This is allowed on the Iceberg Catalog REST API spec:
    ListTablesResponse:
    
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L3898
    ListNamespacesResponse:
    
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L3909
    
    PageToken:
    
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L2009C5-L2029C40
    > PageToken:
    >       description:
    > An opaque token that allows clients to make use of pagination for list
    APIs
    > (e.g. ListTables). Clients may initiate the first paginated request by
    sending an empty
    >         query parameter `pageToken` to the server.
    > Servers that support pagination should identify the `pageToken`
    parameter and return a
    > `next-page-token` in the response if there are more results available.
    After the initial
    > request, the value of `next-page-token` from each response must be
    used as the `pageToken`
    > parameter value for the next request. The server must return `null`
    value for the
    >         `next-page-token` in the last response.
    > Servers that support pagination must return all results in a single
    response with the value
    > of `next-page-token` set to `null` if the query parameter `pageToken`
    is not set in the
    >         request.
    > Servers that do not support pagination should ignore the `pageToken`
    parameter and return
    > all results in a single response. The `next-page-token` must be
    omitted from the response.
    > Clients must interpret either `null` or missing response value of
    `next-page-token` as
    >         the end of the listing results.
    
    ## Are these changes tested?
    
    Yes, unit tests have been added to cover pagination with one page and
    multiple pages for both APIs
---
 crates/catalog/rest/src/catalog.rs | 479 +++++++++++++++++++++++++++++++++----
 crates/catalog/rest/src/types.rs   |   6 +
 2 files changed, 445 insertions(+), 40 deletions(-)

diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index 8df5a4bd..e15fbb8b 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -321,34 +321,54 @@ impl Catalog for RestCatalog {
         parent: Option<&NamespaceIdent>,
     ) -> Result<Vec<NamespaceIdent>> {
         let context = self.context().await?;
+        let endpoint = context.config.namespaces_endpoint();
+        let mut namespaces = Vec::new();
+        let mut next_token = None;
 
-        let mut request_builder = context
-            .client
-            .request(Method::GET, context.config.namespaces_endpoint());
-        // Filter on `parent={namespace}` if a parent namespace exists.
-        if let Some(namespace) = parent {
-            request_builder = request_builder.query(&[("parent", 
namespace.to_url_string())]);
-        }
-        let request = request_builder.build()?;
+        loop {
+            let mut request = context.client.request(Method::GET, 
endpoint.clone());
 
-        let http_response = context.client.query_catalog(request).await?;
+            // Filter on `parent={namespace}` if a parent namespace exists.
+            if let Some(ns) = parent {
+                request = request.query(&[("parent", ns.to_url_string())]);
+            }
 
-        match http_response.status() {
-            StatusCode::OK => {
-                let response =
-                    
deserialize_catalog_response::<ListNamespaceResponse>(http_response).await?;
-                response
-                    .namespaces
-                    .into_iter()
-                    .map(NamespaceIdent::from_vec)
-                    .collect::<Result<Vec<NamespaceIdent>>>()
+            if let Some(token) = next_token {
+                request = request.query(&[("pageToken", token)]);
+            }
+
+            let http_response = 
context.client.query_catalog(request.build()?).await?;
+
+            match http_response.status() {
+                StatusCode::OK => {
+                    let response =
+                        
deserialize_catalog_response::<ListNamespaceResponse>(http_response)
+                            .await?;
+
+                    let ns_identifiers = response
+                        .namespaces
+                        .into_iter()
+                        .map(NamespaceIdent::from_vec)
+                        .collect::<Result<Vec<NamespaceIdent>>>()?;
+
+                    namespaces.extend(ns_identifiers);
+
+                    match response.next_page_token {
+                        Some(token) => next_token = Some(token),
+                        None => break,
+                    }
+                }
+                StatusCode::NOT_FOUND => {
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "The parent parameter of the namespace provided does 
not exist",
+                    ));
+                }
+                _ => return 
Err(deserialize_unexpected_catalog_error(http_response).await),
             }
-            StatusCode::NOT_FOUND => Err(Error::new(
-                ErrorKind::Unexpected,
-                "The parent parameter of the namespace provided does not 
exist",
-            )),
-            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
         }
+
+        Ok(namespaces)
     }
 
     async fn create_namespace(
@@ -457,26 +477,42 @@ impl Catalog for RestCatalog {
 
     async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
         let context = self.context().await?;
+        let endpoint = context.config.tables_endpoint(namespace);
+        let mut identifiers = Vec::new();
+        let mut next_token = None;
 
-        let request = context
-            .client
-            .request(Method::GET, context.config.tables_endpoint(namespace))
-            .build()?;
+        loop {
+            let mut request = context.client.request(Method::GET, 
endpoint.clone());
 
-        let http_response = context.client.query_catalog(request).await?;
+            if let Some(token) = next_token {
+                request = request.query(&[("pageToken", token)]);
+            }
 
-        match http_response.status() {
-            StatusCode::OK => Ok(
-                
deserialize_catalog_response::<ListTableResponse>(http_response)
-                    .await?
-                    .identifiers,
-            ),
-            StatusCode::NOT_FOUND => Err(Error::new(
-                ErrorKind::Unexpected,
-                "Tried to list tables of a namespace that does not exist",
-            )),
-            _ => 
Err(deserialize_unexpected_catalog_error(http_response).await),
+            let http_response = 
context.client.query_catalog(request.build()?).await?;
+
+            match http_response.status() {
+                StatusCode::OK => {
+                    let response =
+                        
deserialize_catalog_response::<ListTableResponse>(http_response).await?;
+
+                    identifiers.extend(response.identifiers);
+
+                    match response.next_page_token {
+                        Some(token) => next_token = Some(token),
+                        None => break,
+                    }
+                }
+                StatusCode::NOT_FOUND => {
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "Tried to list tables of a namespace that does not 
exist",
+                    ));
+                }
+                _ => return 
Err(deserialize_unexpected_catalog_error(http_response).await),
+            }
         }
+
+        Ok(identifiers)
     }
 
     /// Create a new table inside the namespace.
@@ -566,7 +602,6 @@ impl Catalog for RestCatalog {
     /// If there are any config properties that are present in both the 
response from the REST
     /// server and the config provided when creating this `RestCatalog` 
instance, then the value
     /// provided locally to the `RestCatalog` will take precedence.
-
     async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
         let context = self.context().await?;
 
@@ -1085,6 +1120,166 @@ mod tests {
         list_ns_mock.assert_async().await;
     }
 
+    #[tokio::test]
+    async fn test_list_namespace_with_pagination() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let list_ns_mock_page1 = server
+            .mock("GET", "/v1/namespaces")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns1", "ns11"],
+                    ["ns2"]
+                ],
+                "next-page-token": "token123"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let list_ns_mock_page2 = server
+            .mock("GET", "/v1/namespaces?pageToken=token123")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns3"],
+                    ["ns4", "ns41"]
+                ]
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+        let namespaces = catalog.list_namespaces(None).await.unwrap();
+
+        let expected_ns = vec![
+            NamespaceIdent::from_vec(vec!["ns1".to_string(), 
"ns11".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns4".to_string(), 
"ns41".to_string()]).unwrap(),
+        ];
+
+        assert_eq!(expected_ns, namespaces);
+
+        config_mock.assert_async().await;
+        list_ns_mock_page1.assert_async().await;
+        list_ns_mock_page2.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_list_namespace_with_multiple_pages() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        // Page 1
+        let list_ns_mock_page1 = server
+            .mock("GET", "/v1/namespaces")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns1", "ns11"],
+                    ["ns2"]
+                ],
+                "next-page-token": "page2"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 2
+        let list_ns_mock_page2 = server
+            .mock("GET", "/v1/namespaces?pageToken=page2")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns3"],
+                    ["ns4", "ns41"]
+                ],
+                "next-page-token": "page3"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 3
+        let list_ns_mock_page3 = server
+            .mock("GET", "/v1/namespaces?pageToken=page3")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns5", "ns51", "ns511"]
+                ],
+                "next-page-token": "page4"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 4
+        let list_ns_mock_page4 = server
+            .mock("GET", "/v1/namespaces?pageToken=page4")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns6"],
+                    ["ns7"]
+                ],
+                "next-page-token": "page5"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 5 (final page)
+        let list_ns_mock_page5 = server
+            .mock("GET", "/v1/namespaces?pageToken=page5")
+            .with_body(
+                r#"{
+                "namespaces": [
+                    ["ns8", "ns81"]
+                ]
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+        let namespaces = catalog.list_namespaces(None).await.unwrap();
+
+        let expected_ns = vec![
+            NamespaceIdent::from_vec(vec!["ns1".to_string(), 
"ns11".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns3".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns4".to_string(), 
"ns41".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec![
+                "ns5".to_string(),
+                "ns51".to_string(),
+                "ns511".to_string(),
+            ])
+            .unwrap(),
+            NamespaceIdent::from_vec(vec!["ns6".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns7".to_string()]).unwrap(),
+            NamespaceIdent::from_vec(vec!["ns8".to_string(), 
"ns81".to_string()]).unwrap(),
+        ];
+
+        assert_eq!(expected_ns, namespaces);
+
+        // Verify all page requests were made
+        config_mock.assert_async().await;
+        list_ns_mock_page1.assert_async().await;
+        list_ns_mock_page2.assert_async().await;
+        list_ns_mock_page3.assert_async().await;
+        list_ns_mock_page4.assert_async().await;
+        list_ns_mock_page5.assert_async().await;
+    }
+
     #[tokio::test]
     async fn test_create_namespace() {
         let mut server = Server::new_async().await;
@@ -1252,6 +1447,210 @@ mod tests {
         list_tables_mock.assert_async().await;
     }
 
+    #[tokio::test]
+    async fn test_list_tables_with_pagination() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        let list_tables_mock_page1 = server
+            .mock("GET", "/v1/namespaces/ns1/tables")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table1"
+                    },
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table2"
+                    }
+                ],
+                "next-page-token": "token456"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let list_tables_mock_page2 = server
+            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=token456")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table3"
+                    },
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table4"
+                    }
+                ]
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+        let tables = catalog
+            .list_tables(&NamespaceIdent::new("ns1".to_string()))
+            .await
+            .unwrap();
+
+        let expected_tables = vec![
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table1".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table2".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table3".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table4".to_string()),
+        ];
+
+        assert_eq!(tables, expected_tables);
+
+        config_mock.assert_async().await;
+        list_tables_mock_page1.assert_async().await;
+        list_tables_mock_page2.assert_async().await;
+    }
+
+    #[tokio::test]
+    async fn test_list_tables_with_multiple_pages() {
+        let mut server = Server::new_async().await;
+
+        let config_mock = create_config_mock(&mut server).await;
+
+        // Page 1
+        let list_tables_mock_page1 = server
+            .mock("GET", "/v1/namespaces/ns1/tables")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table1"
+                    },
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table2"
+                    }
+                ],
+                "next-page-token": "page2"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 2
+        let list_tables_mock_page2 = server
+            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page2")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table3"
+                    },
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table4"
+                    }
+                ],
+                "next-page-token": "page3"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 3
+        let list_tables_mock_page3 = server
+            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page3")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table5"
+                    }
+                ],
+                "next-page-token": "page4"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 4
+        let list_tables_mock_page4 = server
+            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page4")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table6"
+                    },
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table7"
+                    }
+                ],
+                "next-page-token": "page5"
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        // Page 5 (final page)
+        let list_tables_mock_page5 = server
+            .mock("GET", "/v1/namespaces/ns1/tables?pageToken=page5")
+            .with_status(200)
+            .with_body(
+                r#"{
+                "identifiers": [
+                    {
+                        "namespace": ["ns1"],
+                        "name": "table8"
+                    }
+                ]
+            }"#,
+            )
+            .create_async()
+            .await;
+
+        let catalog = 
RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build());
+
+        let tables = catalog
+            .list_tables(&NamespaceIdent::new("ns1".to_string()))
+            .await
+            .unwrap();
+
+        let expected_tables = vec![
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table1".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table2".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table3".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table4".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table5".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table6".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table7".to_string()),
+            TableIdent::new(NamespaceIdent::new("ns1".to_string()), 
"table8".to_string()),
+        ];
+
+        assert_eq!(tables, expected_tables);
+
+        // Verify all page requests were made
+        config_mock.assert_async().await;
+        list_tables_mock_page1.assert_async().await;
+        list_tables_mock_page2.assert_async().await;
+        list_tables_mock_page3.assert_async().await;
+        list_tables_mock_page4.assert_async().await;
+        list_tables_mock_page5.assert_async().await;
+    }
+
     #[tokio::test]
     async fn test_drop_tables() {
         let mut server = Server::new_async().await;
diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs
index 3c0349cc..4ae2907a 100644
--- a/crates/catalog/rest/src/types.rs
+++ b/crates/catalog/rest/src/types.rs
@@ -122,8 +122,11 @@ impl From<&Namespace> for NamespaceSerde {
 }
 
 #[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
 pub(super) struct ListNamespaceResponse {
     pub(super) namespaces: Vec<Vec<String>>,
+    #[serde(default)]
+    pub(super) next_page_token: Option<String>,
 }
 
 #[derive(Debug, Serialize, Deserialize)]
@@ -140,8 +143,11 @@ pub(super) struct UpdateNamespacePropsResponse {
 }
 
 #[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "kebab-case")]
 pub(super) struct ListTableResponse {
     pub(super) identifiers: Vec<TableIdent>,
+    #[serde(default)]
+    pub(super) next_page_token: Option<String>,
 }
 
 #[derive(Debug, Serialize, Deserialize)]

Reply via email to