This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 80c1399  feat: initialise SQL Catalog (#524)
80c1399 is described below

commit 80c13999e30599762f12b8ad6fb63758c2626f5c
Author: Callum Ryan <[email protected]>
AuthorDate: Fri Aug 9 09:59:48 2024 +0100

    feat: initialise SQL Catalog (#524)
    
    * feat: initialise SQL Catalog
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * fix: remove rls-rustls
    
    Signed-off-by: callum-ryan <[email protected]>
    
    * feat: change to SqlBindStyle and rename consts
    
    Signed-off-by: callum-ryan <[email protected]>
    
    ---------
    
    Signed-off-by: callum-ryan <[email protected]>
---
 crates/catalog/sql/Cargo.toml     |  43 ++++++
 crates/catalog/sql/src/catalog.rs | 279 ++++++++++++++++++++++++++++++++++++++
 crates/catalog/sql/src/error.rs   |  27 ++++
 crates/catalog/sql/src/lib.rs     |  24 ++++
 4 files changed, 373 insertions(+)

diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml
new file mode 100644
index 0000000..5d14535
--- /dev/null
+++ b/crates/catalog/sql/Cargo.toml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-sql"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Rust Sql Catalog"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "sql", "catalog"]
+
+[dependencies]
+async-trait = { workspace = true }
+iceberg = { workspace = true }
+sqlx = { version = "0.7.4", features = ["any"], default-features = false }
+typed-builder = { workspace = true }
+
+[dev-dependencies]
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+itertools = { workspace = true }
+regex = "1.10.5"
+sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", 
"sqlite", "migrate"], default-features = false }
+tempfile = { workspace = true }
+tokio = { workspace = true }
diff --git a/crates/catalog/sql/src/catalog.rs 
b/crates/catalog/sql/src/catalog.rs
new file mode 100644
index 0000000..078fff6
--- /dev/null
+++ b/crates/catalog/sql/src/catalog.rs
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use iceberg::io::FileIO;
+use iceberg::table::Table;
+use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation, TableIdent};
+use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
+use sqlx::AnyPool;
+use typed_builder::TypedBuilder;
+
+use crate::error::from_sqlx_error;
+
+static CATALOG_TABLE_NAME: &str = "iceberg_tables";
+static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
+static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
+static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
+static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
+static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = 
"previous_metadata_location";
+static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
+
+static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
+static NAMESPACE_FIELD_NAME: &str = "namespace";
+static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
+static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
+
+static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if 
not provided
+static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per 
connection to 10s before it is closed
+static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each 
connection to enabled prior to returning
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    uri: String,
+    name: String,
+    warehouse_location: String,
+    file_io: FileIO,
+    sql_bind_style: SqlBindStyle,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    _name: String,
+    connection: AnyPool,
+    _warehouse_location: String,
+    _fileio: FileIO,
+    sql_bind_style: SqlBindStyle,
+}
+
+#[derive(Debug, PartialEq)]
+/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? 
(SQLite/MySQL/MariaDB)
+pub enum SqlBindStyle {
+    /// DollarNumeric uses parameters of the form `$1..$N``, which is the 
Postgres style
+    DollarNumeric,
+    /// QMark uses parameters of the form `?` which is the style for other 
dialects (SQLite/MySQL/MariaDB)
+    QMark,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(MAX_CONNECTIONS);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(IDLE_TIMEOUT);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(TEST_BEFORE_ACQUIRE);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.uri)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        sqlx::query(&format!(
+            "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
+                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
+                {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
+                {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
+                {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
+                {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
+                {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
+                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, 
{CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
+        ))
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(&format!(
+            "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
+                {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
+                {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
+                {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
+                {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
+                PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, 
{NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
+        ))
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        Ok(SqlCatalog {
+            _name: config.name.to_owned(),
+            connection: pool,
+            _warehouse_location: config.warehouse_location,
+            _fileio: config.file_io,
+            sql_bind_style: config.sql_bind_style,
+        })
+    }
+
+    /// SQLX Any does not implement PostgresSQL bindings, so we have to do 
this.
+    pub async fn execute_statement(
+        &self,
+        query: &String,
+        args: Vec<Option<&String>>,
+    ) -> Result<Vec<AnyRow>> {
+        let query_with_placeholders: Cow<str> =
+            if self.sql_bind_style == SqlBindStyle::DollarNumeric {
+                let mut query = query.clone();
+                for i in 0..args.len() {
+                    query = query.replacen("?", &format!("${}", i + 1), 1);
+                }
+                Cow::Owned(query)
+            } else {
+                Cow::Borrowed(query)
+            };
+
+        let mut sqlx_query = sqlx::query(&query_with_placeholders);
+        for arg in args {
+            sqlx_query = sqlx_query.bind(arg);
+        }
+
+        sqlx_query
+            .fetch_all(&self.connection)
+            .await
+            .map_err(from_sqlx_error)
+    }
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        todo!()
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, _namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        todo!()
+    }
+
+    async fn table_exists(&self, _identifier: &TableIdent) -> Result<bool> {
+        todo!()
+    }
+
+    async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> {
+        todo!()
+    }
+
+    async fn create_table(
+        &self,
+        _namespace: &NamespaceIdent,
+        _creation: TableCreation,
+    ) -> Result<Table> {
+        todo!()
+    }
+
+    async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> 
Result<()> {
+        todo!()
+    }
+
+    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
+        todo!()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use iceberg::io::FileIOBuilder;
+    use iceberg::Catalog;
+    use sqlx::migrate::MigrateDatabase;
+    use tempfile::TempDir;
+
+    use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
+
+    fn temp_path() -> String {
+        let temp_dir = TempDir::new().unwrap();
+        temp_dir.path().to_str().unwrap().to_string()
+    }
+
+    async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
+        let sql_lite_uri = format!("sqlite:{}", temp_path());
+        sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
+
+        let config = SqlCatalogConfig::builder()
+            .uri(sql_lite_uri.to_string())
+            .name("iceberg".to_string())
+            .warehouse_location(warehouse_location)
+            .file_io(FileIOBuilder::new_fs_io().build().unwrap())
+            .sql_bind_style(SqlBindStyle::QMark)
+            .build();
+
+        SqlCatalog::new(config).await.unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_initialized() {
+        let warehouse_loc = temp_path();
+        new_sql_catalog(warehouse_loc.clone()).await;
+        // catalog instantiation should not fail even if tables exist
+        new_sql_catalog(warehouse_loc.clone()).await;
+        new_sql_catalog(warehouse_loc.clone()).await;
+    }
+}
diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs
new file mode 100644
index 0000000..90bba1f
--- /dev/null
+++ b/crates/catalog/sql/src/error.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iceberg::{Error, ErrorKind};
+
+/// Format an sqlx error into iceberg error.
+pub fn from_sqlx_error(error: sqlx::Error) -> Error {
+    Error::new(
+        ErrorKind::Unexpected,
+        "operation failed for hitting sqlx error".to_string(),
+    )
+    .with_source(error)
+}
diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs
new file mode 100644
index 0000000..6861dab
--- /dev/null
+++ b/crates/catalog/sql/src/lib.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Iceberg sql catalog implementation.
+
+#![deny(missing_docs)]
+
+mod catalog;
+mod error;
+pub use catalog::*;

Reply via email to