This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0f7fc20 Use namespace location or warehouse location if table
location is missing (#511)
0f7fc20 is described below
commit 0f7fc208c04bcfd2583d324b1a004d468755c334
Author: Farooq Qaiser <[email protected]>
AuthorDate: Fri Aug 2 22:15:13 2024 -0400
Use namespace location or warehouse location if table location is missing
(#511)
---
Cargo.toml | 1 +
crates/catalog/memory/Cargo.toml | 1 +
crates/catalog/memory/src/catalog.rs | 265 ++++++++++++++++++++++++++++++++---
3 files changed, 251 insertions(+), 16 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 70eadf7..642c99b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -73,6 +73,7 @@ parquet = "52"
pilota = "0.11.2"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
+regex = "1.10.5"
reqwest = { version = "^0.12", default-features = false, features = ["json"] }
rust_decimal = "1.31.0"
serde = { version = "^1.0", features = ["rc"] }
diff --git a/crates/catalog/memory/Cargo.toml b/crates/catalog/memory/Cargo.toml
index c62974a..011479e 100644
--- a/crates/catalog/memory/Cargo.toml
+++ b/crates/catalog/memory/Cargo.toml
@@ -37,5 +37,6 @@ serde_json = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
+regex = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
diff --git a/crates/catalog/memory/src/catalog.rs
b/crates/catalog/memory/src/catalog.rs
index 1e7e77f..d86bbfe 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -33,19 +33,24 @@ use uuid::Uuid;
use crate::namespace_state::NamespaceState;
+/// namespace `location` property
+const LOCATION: &str = "location";
+
/// Memory catalog implementation.
#[derive(Debug)]
pub struct MemoryCatalog {
root_namespace_state: Mutex<NamespaceState>,
file_io: FileIO,
+ warehouse_location: Option<String>,
}
impl MemoryCatalog {
/// Creates an memory catalog.
- pub fn new(file_io: FileIO) -> Self {
+ pub fn new(file_io: FileIO, warehouse_location: Option<String>) -> Self {
Self {
root_namespace_state: Mutex::new(NamespaceState::default()),
file_io,
+ warehouse_location,
}
}
}
@@ -165,11 +170,20 @@ impl Catalog for MemoryCatalog {
let (table_creation, location) = match table_creation.location.clone()
{
Some(location) => (table_creation, location),
None => {
- let location = format!(
- "{}/{}",
- table_ident.namespace().join("/"),
- table_ident.name()
- );
+ let namespace_properties =
root_namespace_state.get_properties(namespace_ident)?;
+ let location_prefix = match namespace_properties.get(LOCATION)
{
+ Some(namespace_location) => Ok(namespace_location.clone()),
+ None => match self.warehouse_location.clone() {
+ Some(warehouse_location) => Ok(format!("{}/{}",
warehouse_location, namespace_ident.join("/"))),
+ None => Err(Error::new(ErrorKind::Unexpected,
+ format!(
+ "Cannot create table {:?}. No default path is
set, please specify a location when creating a table.",
+ &table_ident
+ )))
+ },
+ }?;
+
+ let location = format!("{}/{}", location_prefix,
table_ident.name());
let new_table_creation = TableCreation {
location: Some(location.clone()),
@@ -273,13 +287,20 @@ mod tests {
use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema,
SortOrder, Type};
+ use regex::Regex;
use tempfile::TempDir;
use super::*;
+ fn temp_path() -> String {
+ let temp_dir = TempDir::new().unwrap();
+ temp_dir.path().to_str().unwrap().to_string()
+ }
+
fn new_memory_catalog() -> impl Catalog {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
- MemoryCatalog::new(file_io)
+ let warehouse_location = temp_path();
+ MemoryCatalog::new(file_io, Some(warehouse_location))
}
async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident:
&NamespaceIdent) {
@@ -312,16 +333,12 @@ mod tests {
}
async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
- let tmp_dir = TempDir::new().unwrap();
- let location = tmp_dir.path().to_str().unwrap().to_string();
-
let _ = catalog
.create_table(
&table_ident.namespace,
TableCreation::builder()
.name(table_ident.name().into())
.schema(simple_table_schema())
- .location(location)
.build(),
)
.await
@@ -374,6 +391,14 @@ mod tests {
assert!(!table.readonly());
}
+ const UUID_REGEX_STR: &str =
"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
+
+ fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
+ let actual = table.metadata_location().unwrap().to_string();
+ let regex = Regex::new(regex_str).unwrap();
+ assert!(regex.is_match(&actual))
+ }
+
#[tokio::test]
async fn test_list_namespaces_returns_empty_vector() {
let catalog = new_memory_catalog();
@@ -990,12 +1015,220 @@ mod tests {
.metadata_location()
.unwrap()
.to_string()
- .starts_with(&location));
+ .starts_with(&location))
+ }
- assert_table_eq(
- &catalog.load_table(&expected_table_ident).await.unwrap(),
- &expected_table_ident,
- &simple_table_schema(),
+ #[tokio::test]
+ async fn
test_create_table_falls_back_to_namespace_location_if_table_location_is_missing()
{
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let warehouse_location = temp_path();
+ let catalog = MemoryCatalog::new(file_io,
Some(warehouse_location.clone()));
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ let mut namespace_properties = HashMap::new();
+ let namespace_location = temp_path();
+ namespace_properties.insert(LOCATION.to_string(),
namespace_location.to_string());
+ catalog
+ .create_namespace(&namespace_ident, namespace_properties)
+ .await
+ .unwrap();
+
+ let table_name = "tbl1";
+ let expected_table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/tbl1/metadata/0-{}.metadata.json$",
+ namespace_location, UUID_REGEX_STR,
+ );
+
+ let table = catalog
+ .create_table(
+ &namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing(
+ ) {
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let warehouse_location = temp_path();
+ let catalog = MemoryCatalog::new(file_io,
Some(warehouse_location.clone()));
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ let mut namespace_properties = HashMap::new();
+ let namespace_location = temp_path();
+ namespace_properties.insert(LOCATION.to_string(),
namespace_location.to_string());
+ catalog
+ .create_namespace(&namespace_ident, namespace_properties)
+ .await
+ .unwrap();
+
+ let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ let mut nested_namespace_properties = HashMap::new();
+ let nested_namespace_location = temp_path();
+ nested_namespace_properties
+ .insert(LOCATION.to_string(),
nested_namespace_location.to_string());
+ catalog
+ .create_namespace(&nested_namespace_ident,
nested_namespace_properties)
+ .await
+ .unwrap();
+
+ let table_name = "tbl1";
+ let expected_table_ident =
+ TableIdent::new(nested_namespace_ident.clone(), table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/tbl1/metadata/0-{}.metadata.json$",
+ nested_namespace_location, UUID_REGEX_STR,
+ );
+
+ let table = catalog
+ .create_table(
+ &nested_namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
+ ) {
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let warehouse_location = temp_path();
+ let catalog = MemoryCatalog::new(file_io,
Some(warehouse_location.clone()));
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ // note: no location specified in namespace_properties
+ let namespace_properties = HashMap::new();
+ catalog
+ .create_namespace(&namespace_ident, namespace_properties)
+ .await
+ .unwrap();
+
+ let table_name = "tbl1";
+ let expected_table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/a/tbl1/metadata/0-{}.metadata.json$",
+ warehouse_location, UUID_REGEX_STR
+ );
+
+ let table = catalog
+ .create_table(
+ &namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing(
+ ) {
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let warehouse_location = temp_path();
+ let catalog = MemoryCatalog::new(file_io,
Some(warehouse_location.clone()));
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ catalog
+ // note: no location specified in namespace_properties
+ .create_namespace(&namespace_ident, HashMap::new())
+ .await
+ .unwrap();
+
+ let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a",
"b"]).unwrap();
+ catalog
+ // note: no location specified in namespace_properties
+ .create_namespace(&nested_namespace_ident, HashMap::new())
+ .await
+ .unwrap();
+
+ let table_name = "tbl1";
+ let expected_table_ident =
+ TableIdent::new(nested_namespace_ident.clone(), table_name.into());
+ let expected_table_metadata_location_regex = format!(
+ "^{}/a/b/tbl1/metadata/0-{}.metadata.json$",
+ warehouse_location, UUID_REGEX_STR
+ );
+
+ let table = catalog
+ .create_table(
+ &nested_namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ // no location specified for table
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+
+ let table = catalog.load_table(&expected_table_ident).await.unwrap();
+ assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+ assert_table_metadata_location_matches(&table,
&expected_table_metadata_location_regex);
+ }
+
+ #[tokio::test]
+ async fn
test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing(
+ ) {
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let catalog = MemoryCatalog::new(file_io, None);
+
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(&catalog, &namespace_ident).await;
+
+ let table_name = "tbl1";
+ let expected_table_ident = TableIdent::new(namespace_ident.clone(),
table_name.into());
+
+ assert_eq!(
+ catalog
+ .create_table(
+ &namespace_ident,
+ TableCreation::builder()
+ .name(table_name.into())
+ .schema(simple_table_schema())
+ .build(),
+ )
+ .await
+ .unwrap_err()
+ .to_string(),
+ format!(
+ "Unexpected => Cannot create table {:?}. No default path is
set, please specify a location when creating a table.",
+ &expected_table_ident
+ )
)
}