This is an automated email from the ASF dual-hosted git repository. liurenjie1024 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push: new 4f5d8e2e Rest: Implement register table (#1521) 4f5d8e2e is described below commit 4f5d8e2e9d6d2a07814c69cd42a702078cf4d887 Author: Gabriel Igliozzi <gaboig...@gmail.com> AuthorDate: Thu Jul 31 05:55:01 2025 -0700 Rest: Implement register table (#1521) ## Which issue does this PR close? - Closes #1516. ## What changes are included in this PR? - New type for RegisterTableRequest - New method in rest catalog for implementing register table ## Are these changes tested? - 2 unit tests for a successful registered table operation and a failed one due to namespace is non existent - Integration tests for a successful registered table operation Im new to Rust so any feedback is welcomed! :) --- crates/catalog/rest/src/catalog.rs | 150 +++++++++++++++++++++++-- crates/catalog/rest/src/types.rs | 8 ++ crates/catalog/rest/tests/rest_catalog_test.rs | 36 ++++++ 3 files changed, 186 insertions(+), 8 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 5c9e6e15..7d81982f 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -42,7 +42,7 @@ use crate::client::{ use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, - RenameTableRequest, + RegisterTableRequest, RenameTableRequest, }; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; @@ -101,6 +101,10 @@ impl RestCatalogConfig { self.url_prefixed(&["tables", "rename"]) } + fn register_table_endpoint(&self, ns: &NamespaceIdent) -> String { + self.url_prefixed(&["namespaces", &ns.to_url_string(), "register"]) + } + fn table_endpoint(&self, table: &TableIdent) -> String { self.url_prefixed(&[ "namespaces", @@ -238,7 +242,7 @@ struct RestContext { pub struct RestCatalog { /// User config is stored as-is and never be changed. /// - /// It's could be different from the config fetched from the server and used at runtime. + /// It could be different from the config fetched from the server and used at runtime. user_config: RestCatalogConfig, ctx: OnceCell<RestContext>, /// Extensions for the FileIOBuilder. @@ -755,13 +759,60 @@ impl Catalog for RestCatalog { async fn register_table( &self, - _table_ident: &TableIdent, - _metadata_location: String, + table_ident: &TableIdent, + metadata_location: String, ) -> Result<Table> { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Registering a table is not supported yet", - )) + let context = self.context().await?; + + let request = context + .client + .request( + Method::POST, + context + .config + .register_table_endpoint(table_ident.namespace()), + ) + .json(&RegisterTableRequest { + name: table_ident.name.clone(), + metadata_location: metadata_location.clone(), + overwrite: Some(false), + }) + .build()?; + + let http_response = context.client.query_catalog(request).await?; + + let response: LoadTableResponse = match http_response.status() { + StatusCode::OK => { + deserialize_catalog_response::<LoadTableResponse>(http_response).await? + } + StatusCode::NOT_FOUND => { + return Err(Error::new( + ErrorKind::NamespaceNotFound, + "The namespace specified does not exist.", + )); + } + StatusCode::CONFLICT => { + return Err(Error::new( + ErrorKind::TableAlreadyExists, + "The given table already exists.", + )); + } + _ => return Err(deserialize_unexpected_catalog_error(http_response).await), + }; + + let metadata_location = response.metadata_location.as_ref().ok_or(Error::new( + ErrorKind::DataInvalid, + "Metadata location missing in `register_table` response!", + ))?; + + let file_io = self.load_file_io(Some(metadata_location), None).await?; + + Table::builder() + .identifier(table_ident.clone()) + .file_io(file_io) + .metadata(response.metadata) + .metadata_location(metadata_location.clone()) + .build() } async fn update_table(&self, mut commit: TableCommit) -> Result<Table> { @@ -2470,4 +2521,87 @@ mod tests { update_table_mock.assert_async().await; load_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_register_table() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let register_table_mock = server + .mock("POST", "/v1/namespaces/ns1/register") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "load_table_response.json" + )) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()); + let table_ident = + TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); + let metadata_location = String::from( + "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + ); + + let table = catalog + .register_table(&table_ident, metadata_location) + .await + .unwrap(); + + assert_eq!( + &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), + table.identifier() + ); + assert_eq!( + "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + table.metadata_location().unwrap() + ); + + config_mock.assert_async().await; + register_table_mock.assert_async().await; + } + + #[tokio::test] + async fn test_register_table_404() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let register_table_mock = server + .mock("POST", "/v1/namespaces/ns1/register") + .with_status(404) + .with_body( + r#" +{ + "error": { + "message": "The namespace specified does not exist", + "type": "NoSuchNamespaceErrorException", + "code": 404 + } +} + "#, + ) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()); + + let table_ident = + TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); + let metadata_location = String::from( + "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + ); + let table = catalog + .register_table(&table_ident, metadata_location) + .await; + + assert!(table.is_err()); + assert!(table.err().unwrap().message().contains("does not exist")); + + config_mock.assert_async().await; + register_table_mock.assert_async().await; + } } diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 63be9b76..70ed7205 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -191,3 +191,11 @@ pub(super) struct CommitTableResponse { pub(super) metadata_location: String, pub(super) metadata: TableMetadata, } + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub(super) struct RegisterTableRequest { + pub(super) name: String, + pub(super) metadata_location: String, + pub(super) overwrite: Option<bool>, +} diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 94d2e392..393b2435 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -407,3 +407,39 @@ async fn test_list_empty_multi_level_namespace() { .unwrap(); assert!(nss.is_empty()); } + +#[tokio::test] +async fn test_register_table() { + let catalog = get_catalog().await; + + // Create namespace + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + // Create the table, store the metadata location, drop the table + let empty_schema = Schema::builder().build().unwrap(); + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(empty_schema) + .build(); + + let table = catalog.create_table(&ns, table_creation).await.unwrap(); + + let metadata_location = table.metadata_location().unwrap(); + catalog.drop_table(table.identifier()).await.unwrap(); + + let new_table_identifier = TableIdent::from_strs(["ns", "t2"]).unwrap(); + let table_registered = catalog + .register_table(&new_table_identifier, metadata_location.to_string()) + .await + .unwrap(); + + assert_eq!( + table.metadata_location(), + table_registered.metadata_location() + ); + assert_ne!( + table.identifier().to_string(), + table_registered.identifier().to_string() + ); +}