This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 36e128c Add in-memory catalog implementation (#475)
36e128c is described below
commit 36e128c8d2c94c182899c9d2e1d386aa13df625b
Author: Farooq Qaiser <[email protected]>
AuthorDate: Fri Jul 26 09:58:39 2024 -0400
Add in-memory catalog implementation (#475)
* feat: Add in-memory catalog
* Make clippy happy
* Make cargo sort happy
* Fix README links
* Configurable file_io
* Avoid nightly features
* Remove TempFile
* Use futures::lock::Mutex instead
* Minor clean up
* Make root configurable in FS FileIO and remove
default_table_root_location from Catalog
* Revert "Make root configurable in FS FileIO and remove
default_table_root_location from Catalog"
This reverts commit 807dd4cf649b5c367f25afc59f99341d6995c337.
* Remove default_table_root_location from Catalog and explicitly configure
a location for tables in unit tests
* lowercase catalog
* Use default instead of new
* Change references to memory
---
crates/catalog/inmemory/Cargo.toml | 41 +
crates/catalog/inmemory/README.md | 27 +
crates/catalog/inmemory/src/catalog.rs | 1466 ++++++++++++++++++++++++
crates/catalog/inmemory/src/lib.rs | 25 +
crates/catalog/inmemory/src/namespace_state.rs | 297 +++++
crates/iceberg/src/catalog/mod.rs | 2 +-
6 files changed, 1857 insertions(+), 1 deletion(-)
diff --git a/crates/catalog/inmemory/Cargo.toml
b/crates/catalog/inmemory/Cargo.toml
new file mode 100644
index 0000000..c62974a
--- /dev/null
+++ b/crates/catalog/inmemory/Cargo.toml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-memory"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Rust Memory Catalog API"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "memory", "catalog"]
+
+[dependencies]
+async-trait = { workspace = true }
+futures = { workspace = true }
+iceberg = { workspace = true }
+itertools = { workspace = true }
+serde_json = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
+
+[dev-dependencies]
+tempfile = { workspace = true }
+tokio = { workspace = true }
diff --git a/crates/catalog/inmemory/README.md
b/crates/catalog/inmemory/README.md
new file mode 100644
index 0000000..5b04f78
--- /dev/null
+++ b/crates/catalog/inmemory/README.md
@@ -0,0 +1,27 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+-->
+
+# Apache Iceberg Memory Catalog Official Native Rust Implementation
+
+[](https://crates.io/crates/iceberg-catalog-memory)
+[](https://docs.rs/iceberg/latest/iceberg-catalog-memory/)
+
+This crate contains the official Native Rust implementation of Apache Iceberg
Memory Catalog.
+
+See the [API documentation](https://docs.rs/iceberg-catalog-memory/latest) for
examples and the full API.
diff --git a/crates/catalog/inmemory/src/catalog.rs
b/crates/catalog/inmemory/src/catalog.rs
new file mode 100644
index 0000000..eb2a545
--- /dev/null
+++ b/crates/catalog/inmemory/src/catalog.rs
@@ -0,0 +1,1466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains memory catalog implementation.
+
+use futures::lock::Mutex;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
+use itertools::Itertools;
+use std::collections::HashMap;
+use uuid::Uuid;
+
+use async_trait::async_trait;
+
+use iceberg::table::Table;
+use iceberg::Result;
+use iceberg::{
+ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit,
TableCreation, TableIdent,
+};
+
+use crate::namespace_state::NamespaceState;
+
+/// Memory catalog implementation.
+#[derive(Debug)]
+pub struct MemoryCatalog {
+ root_namespace_state: Mutex<NamespaceState>,
+ file_io: FileIO,
+}
+
+impl MemoryCatalog {
+ /// Creates an memory catalog.
+ pub fn new(file_io: FileIO) -> Self {
+ Self {
+ root_namespace_state: Mutex::new(NamespaceState::default()),
+ file_io,
+ }
+ }
+}
+
+#[async_trait]
+impl Catalog for MemoryCatalog {
+ /// List namespaces inside the catalog.
+ async fn list_namespaces(
+ &self,
+ maybe_parent: Option<&NamespaceIdent>,
+ ) -> Result<Vec<NamespaceIdent>> {
+ let root_namespace_state = self.root_namespace_state.lock().await;
+
+ match maybe_parent {
+ None => {
+ let namespaces = root_namespace_state
+ .list_top_level_namespaces()
+ .into_iter()
+ .map(|str| NamespaceIdent::new(str.to_string()))
+ .collect_vec();
+
+ Ok(namespaces)
+ }
+ Some(parent_namespace_ident) => {
+ let namespaces = root_namespace_state
+ .list_namespaces_under(parent_namespace_ident)?
+ .into_iter()
+ .map(|name| NamespaceIdent::new(name.to_string()))
+ .collect_vec();
+
+ Ok(namespaces)
+ }
+ }
+ }
+
+ /// Create a new namespace inside the catalog.
+ async fn create_namespace(
+ &self,
+ namespace_ident: &NamespaceIdent,
+ properties: HashMap<String, String>,
+ ) -> Result<Namespace> {
+ let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+ root_namespace_state.insert_new_namespace(namespace_ident,
properties.clone())?;
+ let namespace = Namespace::with_properties(namespace_ident.clone(),
properties);
+
+ Ok(namespace)
+ }
+
+ /// Get a namespace information from the catalog.
+ async fn get_namespace(&self, namespace_ident: &NamespaceIdent) ->
Result<Namespace> {
+ let root_namespace_state = self.root_namespace_state.lock().await;
+
+ let namespace = Namespace::with_properties(
+ namespace_ident.clone(),
+ root_namespace_state
+ .get_properties(namespace_ident)?
+ .clone(),
+ );
+
+ Ok(namespace)
+ }
+
+ /// Check if namespace exists in catalog.
+ async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) ->
Result<bool> {
+ let guarded_namespaces = self.root_namespace_state.lock().await;
+
+ Ok(guarded_namespaces.namespace_exists(namespace_ident))
+ }
+
+ /// Update a namespace inside the catalog.
+ ///
+ /// # Behavior
+ ///
+ /// The properties must be the full set of namespace.
+ async fn update_namespace(
+ &self,
+ namespace_ident: &NamespaceIdent,
+ properties: HashMap<String, String>,
+ ) -> Result<()> {
+ let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+ root_namespace_state.replace_properties(namespace_ident, properties)
+ }
+
+ /// Drop a namespace from the catalog.
+ async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) ->
Result<()> {
+ let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+ root_namespace_state.remove_existing_namespace(namespace_ident)
+ }
+
+ /// List tables from namespace.
+ async fn list_tables(&self, namespace_ident: &NamespaceIdent) ->
Result<Vec<TableIdent>> {
+ let root_namespace_state = self.root_namespace_state.lock().await;
+
+ let table_names = root_namespace_state.list_tables(namespace_ident)?;
+ let table_idents = table_names
+ .into_iter()
+ .map(|table_name| TableIdent::new(namespace_ident.clone(),
table_name.clone()))
+ .collect_vec();
+
+ Ok(table_idents)
+ }
+
+ /// Create a new table inside the namespace.
+ async fn create_table(
+ &self,
+ namespace_ident: &NamespaceIdent,
+ table_creation: TableCreation,
+ ) -> Result<Table> {
+ let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+ let table_name = table_creation.name.clone();
+ let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
+
+ let (table_creation, location) = match table_creation.location.clone()
{
+ Some(location) => (table_creation, location),
+ None => {
+ let location = format!(
+ "{}/{}",
+ table_ident.namespace().join("/"),
+ table_ident.name()
+ );
+
+ let new_table_creation = TableCreation {
+ location: Some(location.clone()),
+ ..table_creation
+ };
+
+ (new_table_creation, location)
+ }
+ };
+
+ let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+ let metadata_location = format!(
+ "{}/metadata/{}-{}.metadata.json",
+ &location,
+ 0,
+ Uuid::new_v4()
+ );
+
+ self.file_io
+ .new_output(&metadata_location)?
+ .write(serde_json::to_vec(&metadata)?.into())
+ .await?;
+
+ root_namespace_state.insert_new_table(&table_ident,
metadata_location.clone())?;
+
+ let table = Table::builder()
+ .file_io(self.file_io.clone())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .identifier(table_ident)
+ .build();
+
+ Ok(table)
+ }
+
+ /// Load table from the catalog.
+ async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
+ let root_namespace_state = self.root_namespace_state.lock().await;
+
+ let metadata_location =
root_namespace_state.get_existing_table_location(table_ident)?;
+ let input_file = self.file_io.new_input(metadata_location)?;
+ let metadata_content = input_file.read().await?;
+ let metadata =
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
+ let table = Table::builder()
+ .file_io(self.file_io.clone())
+ .metadata_location(metadata_location.clone())
+ .metadata(metadata)
+ .identifier(table_ident.clone())
+ .build();
+
+ Ok(table)
+ }
+
+ /// Drop a table from the catalog.
+ async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
+ let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+ root_namespace_state.remove_existing_table(table_ident)
+ }
+
+ /// Check if a table exists in the catalog.
+ async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
+ let root_namespace_state = self.root_namespace_state.lock().await;
+
+ root_namespace_state.table_exists(table_ident)
+ }
+
+ /// Rename a table in the catalog.
+ async fn rename_table(
+ &self,
+ src_table_ident: &TableIdent,
+ dst_table_ident: &TableIdent,
+ ) -> Result<()> {
+ let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+ let mut new_root_namespace_state = root_namespace_state.clone();
+ let metadata_location = new_root_namespace_state
+ .get_existing_table_location(src_table_ident)?
+ .clone();
+ new_root_namespace_state.remove_existing_table(src_table_ident)?;
+ new_root_namespace_state.insert_new_table(dst_table_ident,
metadata_location)?;
+ *root_namespace_state = new_root_namespace_state;
+
+ Ok(())
+ }
+
+ /// Update a table to the catalog.
+ async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
+ Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "MemoryCatalog does not currently support updating tables.",
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use iceberg::io::FileIOBuilder;
+ use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema,
SortOrder, Type};
+ use std::collections::HashSet;
+ use std::hash::Hash;
+ use std::iter::FromIterator;
+ use tempfile::TempDir;
+
+ use super::*;
+
+ fn new_memory_catalog() -> impl Catalog {
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ MemoryCatalog::new(file_io)
+ }
+
+ async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident:
&NamespaceIdent) {
+ let _ = catalog
+ .create_namespace(namespace_ident, HashMap::new())
+ .await
+ .unwrap();
+ }
+
+ async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents:
&Vec<&NamespaceIdent>) {
+ for namespace_ident in namespace_idents {
+ let _ = create_namespace(catalog, namespace_ident).await;
+ }
+ }
+
+ fn to_set<T: std::cmp::Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
+ HashSet::from_iter(vec)
+ }
+
+ fn simple_table_schema() -> Schema {
+ Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 1,
+ "foo",
+ Type::Primitive(PrimitiveType::Int),
+ )
+ .into()])
+ .build()
+ .unwrap()
+ }
+
+ async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
+ let tmp_dir = TempDir::new().unwrap();
+ let location = tmp_dir.path().to_str().unwrap().to_string();
+
+ let _ = catalog
+ .create_table(
+ &table_ident.namespace,
+ TableCreation::builder()
+ .name(table_ident.name().into())
+ .schema(simple_table_schema())
+ .location(location)
+ .build(),
+ )
+ .await
+ .unwrap();
+ }
+
+ async fn create_tables<C: Catalog>(catalog: &C, table_idents:
Vec<&TableIdent>) {
+ for table_ident in table_idents {
+ create_table(catalog, table_ident).await;
+ }
+ }
+
+ fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent,
expected_schema: &Schema) {
+ assert_eq!(table.identifier(), expected_table_ident);
+
+ let metadata = table.metadata();
+
+ assert_eq!(metadata.current_schema().as_ref(), expected_schema);
+
+ let expected_partition_spec = PartitionSpec::builder()
+ .with_spec_id(0)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ assert_eq!(
+ metadata
+ .partition_specs_iter()
+ .map(|p| p.as_ref())
+ .collect_vec(),
+ vec![&expected_partition_spec]
+ );
+
+ let expected_sorted_order = SortOrder::builder()
+ .with_order_id(0)
+ .with_fields(vec![])
+ .build(expected_schema.clone())
+ .unwrap();
+
+ assert_eq!(
+ metadata
+ .sort_orders_iter()
+ .map(|s| s.as_ref())
+ .collect_vec(),
+ vec![&expected_sorted_order]
+ );
+
+ assert_eq!(metadata.properties(), &HashMap::new());
+
+ assert!(!table.readonly());
+ }
+
+ #[tokio::test]
+ async fn test_list_namespaces_returns_empty_vector() {
+ let catalog = new_memory_catalog();
+
+ assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
+ }
+
+ #[tokio::test]
+ async fn test_list_namespaces_returns_single_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("abc".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ assert_eq!(
+ catalog.list_namespaces(None).await.unwrap(),
+ vec![namespace_ident]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_namespaces_returns_multiple_namespaces() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_1 = NamespaceIdent::new("a".into());
+ let namespace_ident_2 = NamespaceIdent::new("b".into());
+ create_namespaces(&catalog, &vec![&namespace_ident_1,
&namespace_ident_2]).await;
+
+ assert_eq!(
+ to_set(catalog.list_namespaces(None).await.unwrap()),
+ to_set(vec![namespace_ident_1, namespace_ident_2])
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_namespaces_returns_only_top_level_namespaces() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_1 = NamespaceIdent::new("a".into());
+ let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_3 = NamespaceIdent::new("b".into());
+ create_namespaces(
+ &catalog,
+ &vec![&namespace_ident_1, &namespace_ident_2, &namespace_ident_3],
+ )
+ .await;
+
+ assert_eq!(
+ to_set(catalog.list_namespaces(None).await.unwrap()),
+ to_set(vec![namespace_ident_1, namespace_ident_3])
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_namespaces_returns_no_namespaces_under_parent() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_1 = NamespaceIdent::new("a".into());
+ let namespace_ident_2 = NamespaceIdent::new("b".into());
+ create_namespaces(&catalog, &vec![&namespace_ident_1,
&namespace_ident_2]).await;
+
+ assert_eq!(
+ catalog
+ .list_namespaces(Some(&namespace_ident_1))
+ .await
+ .unwrap(),
+ vec![]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_namespaces_returns_namespace_under_parent() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_1 = NamespaceIdent::new("a".into());
+ let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_3 = NamespaceIdent::new("c".into());
+ create_namespaces(
+ &catalog,
+ &vec![&namespace_ident_1, &namespace_ident_2, &namespace_ident_3],
+ )
+ .await;
+
+ assert_eq!(
+ to_set(catalog.list_namespaces(None).await.unwrap()),
+ to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
+ );
+
+ assert_eq!(
+ catalog
+ .list_namespaces(Some(&namespace_ident_1))
+ .await
+ .unwrap(),
+ vec![NamespaceIdent::new("b".into())]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_1 = NamespaceIdent::new("a".to_string());
+ let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a",
"a"]).unwrap();
+ let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a",
"c"]).unwrap();
+ let namespace_ident_5 = NamespaceIdent::new("b".into());
+ create_namespaces(
+ &catalog,
+ &vec![
+ &namespace_ident_1,
+ &namespace_ident_2,
+ &namespace_ident_3,
+ &namespace_ident_4,
+ &namespace_ident_5,
+ ],
+ )
+ .await;
+
+ assert_eq!(
+ to_set(
+ catalog
+ .list_namespaces(Some(&namespace_ident_1))
+ .await
+ .unwrap()
+ ),
+ to_set(vec![
+ NamespaceIdent::new("a".into()),
+ NamespaceIdent::new("b".into()),
+ NamespaceIdent::new("c".into()),
+ ])
+ );
+ }
+
+ #[tokio::test]
+ async fn test_namespace_exists_returns_false() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ assert!(!catalog
+ .namespace_exists(&NamespaceIdent::new("b".into()))
+ .await
+ .unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_namespace_exists_returns_true() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_create_namespace_with_empty_properties() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("a".into());
+
+ assert_eq!(
+ catalog
+ .create_namespace(&namespace_ident, HashMap::new())
+ .await
+ .unwrap(),
+ Namespace::new(namespace_ident.clone())
+ );
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident).await.unwrap(),
+ Namespace::with_properties(namespace_ident, HashMap::new())
+ );
+ }
+
+ #[tokio::test]
+ async fn test_create_namespace_with_properties() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("abc".into());
+
+ let mut properties: HashMap<String, String> = HashMap::new();
+ properties.insert("k".into(), "v".into());
+
+ assert_eq!(
+ catalog
+ .create_namespace(&namespace_ident, properties.clone())
+ .await
+ .unwrap(),
+ Namespace::with_properties(namespace_ident.clone(),
properties.clone())
+ );
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident).await.unwrap(),
+ Namespace::with_properties(namespace_ident, properties)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_create_namespace_throws_error_if_namespace_already_exists() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ assert_eq!(
+ catalog
+ .create_namespace(&namespace_ident, HashMap::new())
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => Cannot create namespace {:?}. Namespace already
exists.",
+ &namespace_ident
+ )
+ );
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident).await.unwrap(),
+ Namespace::with_properties(namespace_ident, HashMap::new())
+ );
+ }
+
+ #[tokio::test]
+ async fn test_create_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let parent_namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &parent_namespace_ident).await;
+
+ let child_namespace_ident = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+
+ assert_eq!(
+ catalog
+ .create_namespace(&child_namespace_ident, HashMap::new())
+ .await
+ .unwrap(),
+ Namespace::new(child_namespace_ident.clone())
+ );
+
+ assert_eq!(
+ catalog.get_namespace(&child_namespace_ident).await.unwrap(),
+ Namespace::with_properties(child_namespace_ident, HashMap::new())
+ );
+ }
+
+ #[tokio::test]
+ async fn test_create_deeply_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b",
"c"]).unwrap();
+
+ assert_eq!(
+ catalog
+ .create_namespace(&namespace_ident_a_b_c, HashMap::new())
+ .await
+ .unwrap(),
+ Namespace::new(namespace_ident_a_b_c.clone())
+ );
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
+ Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
+ );
+ }
+
+ #[tokio::test]
+ async fn
test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist()
{
+ let catalog = new_memory_catalog();
+
+ let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+
+ assert_eq!(
+ catalog
+ .create_namespace(&nested_namespace_ident, HashMap::new())
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ NamespaceIdent::new("a".into())
+ )
+ );
+
+ assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
+ }
+
+ #[tokio::test]
+ async fn
test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist(
+ ) {
+ let catalog = new_memory_catalog();
+
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident_a).await;
+
+ let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b",
"c"]).unwrap();
+
+ assert_eq!(
+ catalog
+ .create_namespace(&namespace_ident_a_b_c, HashMap::new())
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
+ )
+ );
+
+ assert_eq!(
+ catalog.list_namespaces(None).await.unwrap(),
+ vec![namespace_ident_a.clone()]
+ );
+
+ assert_eq!(
+ catalog
+ .list_namespaces(Some(&namespace_ident_a))
+ .await
+ .unwrap(),
+ vec![]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_get_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("abc".into());
+
+ let mut properties: HashMap<String, String> = HashMap::new();
+ properties.insert("k".into(), "v".into());
+ let _ = catalog
+ .create_namespace(&namespace_ident, properties.clone())
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident).await.unwrap(),
+ Namespace::with_properties(namespace_ident, properties)
+ )
+ }
+
+ #[tokio::test]
+ async fn test_get_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
+ Namespace::with_properties(namespace_ident_a_b, HashMap::new())
+ );
+ }
+
+ #[tokio::test]
+ async fn test_get_deeply_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b",
"c"]).unwrap();
+ create_namespaces(
+ &catalog,
+ &vec![
+ &namespace_ident_a,
+ &namespace_ident_a_b,
+ &namespace_ident_a_b_c,
+ ],
+ )
+ .await;
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
+ Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
+ );
+ }
+
+ #[tokio::test]
+ async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+ create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
+
+ let non_existent_namespace_ident = NamespaceIdent::new("b".into());
+ assert_eq!(
+ catalog
+ .get_namespace(&non_existent_namespace_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ )
+ )
+ }
+
+ #[tokio::test]
+ async fn test_update_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("abc".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let mut new_properties: HashMap<String, String> = HashMap::new();
+ new_properties.insert("k".into(), "v".into());
+
+ catalog
+ .update_namespace(&namespace_ident, new_properties.clone())
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident).await.unwrap(),
+ Namespace::with_properties(namespace_ident, new_properties)
+ )
+ }
+
+ #[tokio::test]
+ async fn test_update_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ let mut new_properties = HashMap::new();
+ new_properties.insert("k".into(), "v".into());
+
+ catalog
+ .update_namespace(&namespace_ident_a_b, new_properties.clone())
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
+ Namespace::with_properties(namespace_ident_a_b, new_properties)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_update_deeply_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b",
"c"]).unwrap();
+ create_namespaces(
+ &catalog,
+ &vec![
+ &namespace_ident_a,
+ &namespace_ident_a_b,
+ &namespace_ident_a_b_c,
+ ],
+ )
+ .await;
+
+ let mut new_properties = HashMap::new();
+ new_properties.insert("k".into(), "v".into());
+
+ catalog
+ .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
+ Namespace::with_properties(namespace_ident_a_b_c, new_properties)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+ create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
+
+ let non_existent_namespace_ident = NamespaceIdent::new("def".into());
+ assert_eq!(
+ catalog
+ .update_namespace(&non_existent_namespace_ident,
HashMap::new())
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ )
+ )
+ }
+
+ #[tokio::test]
+ async fn test_drop_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("abc".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ catalog.drop_namespace(&namespace_ident).await.unwrap();
+
+ assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
+ }
+
+ #[tokio::test]
+ async fn test_drop_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
+
+ assert!(!catalog
+ .namespace_exists(&namespace_ident_a_b)
+ .await
+ .unwrap());
+
+ assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_drop_deeply_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b",
"c"]).unwrap();
+ create_namespaces(
+ &catalog,
+ &vec![
+ &namespace_ident_a,
+ &namespace_ident_a_b,
+ &namespace_ident_a_b_c,
+ ],
+ )
+ .await;
+
+ catalog
+ .drop_namespace(&namespace_ident_a_b_c)
+ .await
+ .unwrap();
+
+ assert!(!catalog
+ .namespace_exists(&namespace_ident_a_b_c)
+ .await
+ .unwrap());
+
+ assert!(catalog
+ .namespace_exists(&namespace_ident_a_b)
+ .await
+ .unwrap());
+
+ assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+
+ let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
+ assert_eq!(
+ catalog
+ .drop_namespace(&non_existent_namespace_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ )
+ )
+ }
+
+ #[tokio::test]
+ async fn
test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+ create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
+
+ let non_existent_namespace_ident =
+ NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
+ assert_eq!(
+ catalog
+ .drop_namespace(&non_existent_namespace_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ )
+ )
+ }
+
+ #[tokio::test]
+ async fn
test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ catalog.drop_namespace(&namespace_ident_a).await.unwrap();
+
+ assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
+
+ assert!(!catalog
+ .namespace_exists(&namespace_ident_a_b)
+ .await
+ .unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_create_table_with_location() {
+ let tmp_dir = TempDir::new().unwrap();
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let table_name = "abc";
+ let location = tmp_dir.path().to_str().unwrap().to_string();
+ let table_creation = TableCreation::builder()
+ .name(table_name.into())
+ .location(location.clone())
+ .schema(simple_table_schema())
+ .build();
+
+ let expected_table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+
+ assert_table_eq(
+ &catalog
+ .create_table(&namespace_ident, table_creation)
+ .await
+ .unwrap(),
+ &expected_table_ident,
+ &simple_table_schema(),
+ );
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+
+ assert!(table
+ .metadata_location()
+ .unwrap()
+ .to_string()
+ .starts_with(&location));
+
+ assert_table_eq(
+ &catalog.load_table(&expected_table_ident).await.unwrap(),
+ &expected_table_ident,
+ &simple_table_schema(),
+ )
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_throws_error_if_table_with_same_name_already_exists() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_name = "tbl1";
+ let table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+ create_table(&catalog, &table_ident).await;
+
+ let tmp_dir = TempDir::new().unwrap();
+ let location = tmp_dir.path().to_str().unwrap().to_string();
+
+ assert_eq!(
+ catalog
+ .create_table(
+ &namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ .location(location)
+ .build()
+ )
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => Cannot create table {:?}. Table already
exists.",
+ &table_ident
+ )
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_returns_empty_vector() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(),
vec![]);
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_returns_a_single_table() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ assert_eq!(
+ catalog.list_tables(&namespace_ident).await.unwrap(),
+ vec![table_ident]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_returns_multiple_tables() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let table_ident_1 = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ let table_ident_2 = TableIdent::new(namespace_ident.clone(),
"tbl2".into());
+ let _ = create_tables(&catalog, vec![&table_ident_1,
&table_ident_2]).await;
+
+ assert_eq!(
+ to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
+ to_set(vec![table_ident_1, table_ident_2])
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_returns_tables_from_correct_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_1 = NamespaceIdent::new("n1".into());
+ let namespace_ident_2 = NamespaceIdent::new("n2".into());
+ create_namespaces(&catalog, &vec![&namespace_ident_1,
&namespace_ident_2]).await;
+
+ let table_ident_1 = TableIdent::new(namespace_ident_1.clone(),
"tbl1".into());
+ let table_ident_2 = TableIdent::new(namespace_ident_1.clone(),
"tbl2".into());
+ let table_ident_3 = TableIdent::new(namespace_ident_2.clone(),
"tbl1".into());
+ let _ = create_tables(
+ &catalog,
+ vec![&table_ident_1, &table_ident_2, &table_ident_3],
+ )
+ .await;
+
+ assert_eq!(
+ to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
+ to_set(vec![table_ident_1, table_ident_2])
+ );
+
+ assert_eq!(
+ to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
+ to_set(vec![table_ident_3])
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_returns_table_under_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ let table_ident = TableIdent::new(namespace_ident_a_b.clone(),
"tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ assert_eq!(
+ catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
+ vec![table_ident]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+
+ let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
+
+ assert_eq!(
+ catalog
+ .list_tables(&non_existent_namespace_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_drop_table() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ catalog.drop_table(&table_ident).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_drop_table_drops_table_under_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ let table_ident = TableIdent::new(namespace_ident_a_b.clone(),
"tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ catalog.drop_table(&table_ident).await.unwrap();
+
+ assert_eq!(
+ catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
+ vec![]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+
+ let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
+ let non_existent_table_ident =
+ TableIdent::new(non_existent_namespace_ident.clone(),
"tbl1".into());
+
+ assert_eq!(
+ catalog
+ .drop_table(&non_existent_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_drop_table_throws_error_if_table_doesnt_exist() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let non_existent_table_ident =
TableIdent::new(namespace_ident.clone(), "tbl1".into());
+
+ assert_eq!(
+ catalog
+ .drop_table(&non_existent_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such table: {:?}",
+ non_existent_table_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_table_exists_returns_true() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ assert!(catalog.table_exists(&table_ident).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_table_exists_returns_false() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let non_existent_table_ident =
TableIdent::new(namespace_ident.clone(), "tbl1".into());
+
+ assert!(!catalog
+ .table_exists(&non_existent_table_ident)
+ .await
+ .unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_table_exists_under_nested_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ create_namespaces(&catalog, &vec![&namespace_ident_a,
&namespace_ident_a_b]).await;
+
+ let table_ident = TableIdent::new(namespace_ident_a_b.clone(),
"tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ assert!(catalog.table_exists(&table_ident).await.unwrap());
+
+ let non_existent_table_ident =
TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
+ assert!(!catalog
+ .table_exists(&non_existent_table_ident)
+ .await
+ .unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+
+ let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
+ let non_existent_table_ident =
+ TableIdent::new(non_existent_namespace_ident.clone(),
"tbl1".into());
+
+ assert_eq!(
+ catalog
+ .table_exists(&non_existent_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_namespace_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_in_same_namespace() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let src_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl2".into());
+ create_table(&catalog, &src_table_ident).await;
+
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.list_tables(&namespace_ident).await.unwrap(),
+ vec![dst_table_ident],
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_across_namespaces() {
+ let catalog = new_memory_catalog();
+ let src_namespace_ident = NamespaceIdent::new("a".into());
+ let dst_namespace_ident = NamespaceIdent::new("b".into());
+ create_namespaces(&catalog, &vec![&src_namespace_ident,
&dst_namespace_ident]).await;
+ let src_table_ident = TableIdent::new(src_namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(),
"tbl2".into());
+ create_table(&catalog, &src_table_ident).await;
+
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.list_tables(&src_namespace_ident).await.unwrap(),
+ vec![],
+ );
+
+ assert_eq!(
+ catalog.list_tables(&dst_namespace_ident).await.unwrap(),
+ vec![dst_table_ident],
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_src_table_is_same_as_dst_table() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_ident = TableIdent::new(namespace_ident.clone(),
"tbl".into());
+ create_table(&catalog, &table_ident).await;
+
+ catalog
+ .rename_table(&table_ident, &table_ident)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ catalog.list_tables(&namespace_ident).await.unwrap(),
+ vec![table_ident],
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_across_nested_namespaces() {
+ let catalog = new_memory_catalog();
+ let namespace_ident_a = NamespaceIdent::new("a".into());
+ let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b",
"c"]).unwrap();
+ create_namespaces(
+ &catalog,
+ &vec![
+ &namespace_ident_a,
+ &namespace_ident_a_b,
+ &namespace_ident_a_b_c,
+ ],
+ )
+ .await;
+
+ let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(),
"tbl1".into());
+ create_tables(&catalog, vec![&src_table_ident]).await;
+
+ let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(),
"tbl1".into());
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap();
+
+ assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
+
+ assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+
+ let non_existent_src_namespace_ident =
NamespaceIdent::new("n1".into());
+ let src_table_ident =
+ TableIdent::new(non_existent_src_namespace_ident.clone(),
"tbl1".into());
+
+ let dst_namespace_ident = NamespaceIdent::new("n2".into());
+ create_namespace(&catalog, &dst_namespace_ident).await;
+ let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(),
"tbl1".into());
+
+ assert_eq!(
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_src_namespace_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
+ let catalog = new_memory_catalog();
+ let src_namespace_ident = NamespaceIdent::new("n1".into());
+ let src_table_ident = TableIdent::new(src_namespace_ident.clone(),
"tbl1".into());
+ create_namespace(&catalog, &src_namespace_ident).await;
+ create_table(&catalog, &src_table_ident).await;
+
+ let non_existent_dst_namespace_ident =
NamespaceIdent::new("n2".into());
+ let dst_table_ident =
+ TableIdent::new(non_existent_dst_namespace_ident.clone(),
"tbl1".into());
+ assert_eq!(
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => No such namespace: {:?}",
+ non_existent_dst_namespace_ident
+ ),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let src_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl2".into());
+
+ assert_eq!(
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!("Unexpected => No such table: {:?}", src_table_ident),
+ );
+ }
+
+ #[tokio::test]
+ async fn test_rename_table_throws_error_if_dst_table_already_exists() {
+ let catalog = new_memory_catalog();
+ let namespace_ident = NamespaceIdent::new("n1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let src_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ let dst_table_ident = TableIdent::new(namespace_ident.clone(),
"tbl2".into());
+ create_tables(&catalog, vec![&src_table_ident,
&dst_table_ident]).await;
+
+ assert_eq!(
+ catalog
+ .rename_table(&src_table_ident, &dst_table_ident)
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => Cannot create table {:? }. Table already
exists.",
+ &dst_table_ident
+ ),
+ );
+ }
+}
diff --git a/crates/catalog/inmemory/src/lib.rs
b/crates/catalog/inmemory/src/lib.rs
new file mode 100644
index 0000000..8988ac7
--- /dev/null
+++ b/crates/catalog/inmemory/src/lib.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Iceberg memory Catalog API implementation.
+
+#![deny(missing_docs)]
+
+mod catalog;
+mod namespace_state;
+
+pub use catalog::*;
diff --git a/crates/catalog/inmemory/src/namespace_state.rs
b/crates/catalog/inmemory/src/namespace_state.rs
new file mode 100644
index 0000000..875e0c7
--- /dev/null
+++ b/crates/catalog/inmemory/src/namespace_state.rs
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent};
+use itertools::Itertools;
+use std::collections::{hash_map, HashMap};
+
+// Represents the state of a namespace
+#[derive(Debug, Clone, Default)]
+pub(crate) struct NamespaceState {
+ // Properties of this namespace
+ properties: HashMap<String, String>,
+ // Namespaces nested inside this namespace
+ namespaces: HashMap<String, NamespaceState>,
+ // Mapping of tables to metadata locations in this namespace
+ table_metadata_locations: HashMap<String, String>,
+}
+
+fn no_such_namespace_err<T>(namespace_ident: &NamespaceIdent) -> Result<T> {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("No such namespace: {:?}", namespace_ident),
+ ))
+}
+
+fn no_such_table_err<T>(table_ident: &TableIdent) -> Result<T> {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("No such table: {:?}", table_ident),
+ ))
+}
+
+fn namespace_already_exists_err<T>(namespace_ident: &NamespaceIdent) ->
Result<T> {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Cannot create namespace {:?}. Namespace already exists.",
+ namespace_ident
+ ),
+ ))
+}
+
+fn table_already_exists_err<T>(table_ident: &TableIdent) -> Result<T> {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Cannot create table {:?}. Table already exists.",
+ table_ident
+ ),
+ ))
+}
+
+impl NamespaceState {
+ // Returns the state of the given namespace or an error if doesn't exist
+ fn get_namespace(&self, namespace_ident: &NamespaceIdent) ->
Result<&NamespaceState> {
+ let mut acc_name_parts = vec![];
+ let mut namespace_state = self;
+ for next_name in namespace_ident.iter() {
+ acc_name_parts.push(next_name);
+ match namespace_state.namespaces.get(next_name) {
+ None => {
+ let namespace_ident =
NamespaceIdent::from_strs(acc_name_parts)?;
+ return no_such_namespace_err(&namespace_ident);
+ }
+ Some(intermediate_namespace) => {
+ namespace_state = intermediate_namespace;
+ }
+ }
+ }
+
+ Ok(namespace_state)
+ }
+
+ // Returns the state of the given namespace or an error if doesn't exist
+ fn get_mut_namespace(
+ &mut self,
+ namespace_ident: &NamespaceIdent,
+ ) -> Result<&mut NamespaceState> {
+ let mut acc_name_parts = vec![];
+ let mut namespace_state = self;
+ for next_name in namespace_ident.iter() {
+ acc_name_parts.push(next_name);
+ match namespace_state.namespaces.get_mut(next_name) {
+ None => {
+ let namespace_ident =
NamespaceIdent::from_strs(acc_name_parts)?;
+ return no_such_namespace_err(&namespace_ident);
+ }
+ Some(intermediate_namespace) => {
+ namespace_state = intermediate_namespace;
+ }
+ }
+ }
+
+ Ok(namespace_state)
+ }
+
+ // Returns the state of the parent of the given namespace or an error if
doesn't exist
+ fn get_mut_parent_namespace_of(
+ &mut self,
+ namespace_ident: &NamespaceIdent,
+ ) -> Result<(&mut NamespaceState, String)> {
+ match namespace_ident.split_last() {
+ None => Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Namespace identifier can't be empty!",
+ )),
+ Some((child_namespace_name, parent_name_parts)) => {
+ let parent_namespace_state = if parent_name_parts.is_empty() {
+ Ok(self)
+ } else {
+ let parent_namespace_ident =
NamespaceIdent::from_strs(parent_name_parts)?;
+ self.get_mut_namespace(&parent_namespace_ident)
+ }?;
+
+ Ok((parent_namespace_state, child_namespace_name.clone()))
+ }
+ }
+ }
+
+ // Returns any top-level namespaces
+ pub(crate) fn list_top_level_namespaces(&self) -> Vec<&String> {
+ self.namespaces.keys().collect_vec()
+ }
+
+ // Returns any namespaces nested under the given namespace or an error if
the given namespace does not exist
+ pub(crate) fn list_namespaces_under(
+ &self,
+ namespace_ident: &NamespaceIdent,
+ ) -> Result<Vec<&String>> {
+ let nested_namespace_names = self
+ .get_namespace(namespace_ident)?
+ .namespaces
+ .keys()
+ .collect_vec();
+
+ Ok(nested_namespace_names)
+ }
+
+ // Returns true if the given namespace exists, otherwise false
+ pub(crate) fn namespace_exists(&self, namespace_ident: &NamespaceIdent) ->
bool {
+ self.get_namespace(namespace_ident).is_ok()
+ }
+
+ // Inserts the given namespace or returns an error if it already exists
+ pub(crate) fn insert_new_namespace(
+ &mut self,
+ namespace_ident: &NamespaceIdent,
+ properties: HashMap<String, String>,
+ ) -> Result<()> {
+ let (parent_namespace_state, child_namespace_name) =
+ self.get_mut_parent_namespace_of(namespace_ident)?;
+
+ match parent_namespace_state
+ .namespaces
+ .entry(child_namespace_name)
+ {
+ hash_map::Entry::Occupied(_) =>
namespace_already_exists_err(namespace_ident),
+ hash_map::Entry::Vacant(entry) => {
+ let _ = entry.insert(NamespaceState {
+ properties,
+ namespaces: HashMap::new(),
+ table_metadata_locations: HashMap::new(),
+ });
+
+ Ok(())
+ }
+ }
+ }
+
+ // Removes the given namespace or returns an error if doesn't exist
+ pub(crate) fn remove_existing_namespace(
+ &mut self,
+ namespace_ident: &NamespaceIdent,
+ ) -> Result<()> {
+ let (parent_namespace_state, child_namespace_name) =
+ self.get_mut_parent_namespace_of(namespace_ident)?;
+
+ match parent_namespace_state
+ .namespaces
+ .remove(&child_namespace_name)
+ {
+ None => no_such_namespace_err(namespace_ident),
+ Some(_) => Ok(()),
+ }
+ }
+
+ // Returns the properties of the given namespace or an error if doesn't
exist
+ pub(crate) fn get_properties(
+ &self,
+ namespace_ident: &NamespaceIdent,
+ ) -> Result<&HashMap<String, String>> {
+ let properties = &self.get_namespace(namespace_ident)?.properties;
+
+ Ok(properties)
+ }
+
+ // Returns the properties of this namespace or an error if doesn't exist
+ fn get_mut_properties(
+ &mut self,
+ namespace_ident: &NamespaceIdent,
+ ) -> Result<&mut HashMap<String, String>> {
+ let properties = &mut
self.get_mut_namespace(namespace_ident)?.properties;
+
+ Ok(properties)
+ }
+
+ // Replaces the properties of the given namespace or an error if doesn't
exist
+ pub(crate) fn replace_properties(
+ &mut self,
+ namespace_ident: &NamespaceIdent,
+ new_properties: HashMap<String, String>,
+ ) -> Result<()> {
+ let properties = self.get_mut_properties(namespace_ident)?;
+ *properties = new_properties;
+
+ Ok(())
+ }
+
+ // Returns the list of table names under the given namespace
+ pub(crate) fn list_tables(&self, namespace_ident: &NamespaceIdent) ->
Result<Vec<&String>> {
+ let table_names = self
+ .get_namespace(namespace_ident)?
+ .table_metadata_locations
+ .keys()
+ .collect_vec();
+
+ Ok(table_names)
+ }
+
+ // Returns true if the given table exists, otherwise false
+ pub(crate) fn table_exists(&self, table_ident: &TableIdent) ->
Result<bool> {
+ let namespace_state = self.get_namespace(table_ident.namespace())?;
+ let table_exists = namespace_state
+ .table_metadata_locations
+ .contains_key(&table_ident.name);
+
+ Ok(table_exists)
+ }
+
+ // Returns the metadata location of the given table or an error if doesn't
exist
+ pub(crate) fn get_existing_table_location(&self, table_ident: &TableIdent)
-> Result<&String> {
+ let namespace = self.get_namespace(table_ident.namespace())?;
+
+ match namespace.table_metadata_locations.get(table_ident.name()) {
+ None => no_such_table_err(table_ident),
+ Some(table_metadadata_location) => Ok(table_metadadata_location),
+ }
+ }
+
+ // Inserts the given table or returns an error if it already exists
+ pub(crate) fn insert_new_table(
+ &mut self,
+ table_ident: &TableIdent,
+ metadata_location: String,
+ ) -> Result<()> {
+ let namespace = self.get_mut_namespace(table_ident.namespace())?;
+
+ match namespace
+ .table_metadata_locations
+ .entry(table_ident.name().to_string())
+ {
+ hash_map::Entry::Occupied(_) =>
table_already_exists_err(table_ident),
+ hash_map::Entry::Vacant(entry) => {
+ let _ = entry.insert(metadata_location);
+
+ Ok(())
+ }
+ }
+ }
+
+ // Removes the given table or returns an error if doesn't exist
+ pub(crate) fn remove_existing_table(&mut self, table_ident: &TableIdent)
-> Result<()> {
+ let namespace = self.get_mut_namespace(table_ident.namespace())?;
+
+ match namespace
+ .table_metadata_locations
+ .remove(table_ident.name())
+ {
+ None => no_such_table_err(table_ident),
+ Some(_) => Ok(()),
+ }
+ }
+}
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index 4d7a47a..b49a80c 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -35,7 +35,7 @@ use uuid::Uuid;
/// The catalog API for Iceberg Rust.
#[async_trait]
pub trait Catalog: Debug + Sync + Send {
- /// List namespaces from table.
+ /// List namespaces inside the catalog.
async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
-> Result<Vec<NamespaceIdent>>;