This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fcf14ca test: refactor datafusion test with memory catalog (#557)
fcf14ca is described below
commit fcf14ca2373a13181d211e6f431f2f24f8bb6dd8
Author: FANNG <[email protected]>
AuthorDate: Sat Aug 17 00:42:32 2024 +0800
test: refactor datafusion test with memory catalog (#557)
* add memory catalog
* fix style
* fix style
---
crates/integrations/datafusion/Cargo.toml | 7 +-
.../datafusion/testdata/docker-compose.yaml | 49 -----
.../datafusion/testdata/hms_catalog/Dockerfile | 34 ---
.../datafusion/testdata/hms_catalog/core-site.xml | 51 -----
.../tests/integration_datafusion_hms_test.rs | 235 ---------------------
.../tests/integration_datafusion_test.rs | 149 +++++++++++++
6 files changed, 151 insertions(+), 374 deletions(-)
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 8e036a6..87e809c 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -34,11 +34,8 @@ async-trait = { workspace = true }
datafusion = { version = "41.0.0" }
futures = { workspace = true }
iceberg = { workspace = true }
-log = { workspace = true }
tokio = { workspace = true }
[dev-dependencies]
-ctor = { workspace = true }
-iceberg-catalog-hms = { workspace = true }
-iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
-port_scanner = { workspace = true }
+iceberg-catalog-memory = { workspace = true }
+tempfile = { workspace = true }
diff --git a/crates/integrations/datafusion/testdata/docker-compose.yaml
b/crates/integrations/datafusion/testdata/docker-compose.yaml
deleted file mode 100644
index be915ab..0000000
--- a/crates/integrations/datafusion/testdata/docker-compose.yaml
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-services:
- minio:
- image: minio/minio:RELEASE.2024-03-07T00-43-48Z
- expose:
- - 9000
- - 9001
- environment:
- - MINIO_ROOT_USER=admin
- - MINIO_ROOT_PASSWORD=password
- - MINIO_DOMAIN=minio
- command: [ "server", "/data", "--console-address", ":9001" ]
-
- mc:
- depends_on:
- - minio
- image: minio/mc:RELEASE.2024-03-07T00-31-49Z
- environment:
- - AWS_ACCESS_KEY_ID=admin
- - AWS_SECRET_ACCESS_KEY=password
- - AWS_REGION=us-east-1
- entrypoint: >
- /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000
admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb
minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f
/dev/null "
-
- hive-metastore:
- image: iceberg-hive-metastore
- build: ./hms_catalog/
- platform: ${DOCKER_DEFAULT_PLATFORM}
- expose:
- - 9083
- environment:
- SERVICE_NAME: "metastore"
- SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/"
diff --git a/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
b/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
deleted file mode 100644
index abece56..0000000
--- a/crates/integrations/datafusion/testdata/hms_catalog/Dockerfile
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM --platform=$BUILDPLATFORM openjdk:8-jre-slim AS build
-
-RUN apt-get update -qq && apt-get -qq -y install curl
-
-ENV AWSSDK_VERSION=2.20.18
-ENV HADOOP_VERSION=3.1.0
-
-RUN curl
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
-Lo /tmp/aws-java-sdk-bundle-1.11.271.jar
-RUN curl
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar
-Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
-
-
-FROM apache/hive:3.1.3
-
-ENV AWSSDK_VERSION=2.20.18
-ENV HADOOP_VERSION=3.1.0
-
-COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar
/opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar
-COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar
/opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar
-COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml
\ No newline at end of file
diff --git a/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
b/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
deleted file mode 100644
index f0583a0..0000000
--- a/crates/integrations/datafusion/testdata/hms_catalog/core-site.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<configuration>
- <property>
- <name>fs.defaultFS</name>
- <value>s3a://warehouse/hive</value>
- </property>
- <property>
- <name>fs.s3a.impl</name>
- <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
- </property>
- <property>
- <name>fs.s3a.fast.upload</name>
- <value>true</value>
- </property>
- <property>
- <name>fs.s3a.endpoint</name>
- <value>http://minio:9000</value>
- </property>
- <property>
- <name>fs.s3a.access.key</name>
- <value>admin</value>
- </property>
- <property>
- <name>fs.s3a.secret.key</name>
- <value>password</value>
- </property>
- <property>
- <name>fs.s3a.connection.ssl.enabled</name>
- <value>false</value>
- </property>
- <property>
- <name>fs.s3a.path.style.access</name>
- <value>true</value>
- </property>
-</configuration>
\ No newline at end of file
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
deleted file mode 100644
index 292cd8b..0000000
--- a/crates/integrations/datafusion/tests/integration_datafusion_hms_test.rs
+++ /dev/null
@@ -1,235 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Integration tests for Iceberg Datafusion with Hive Metastore.
-
-use std::collections::HashMap;
-use std::net::SocketAddr;
-use std::sync::{Arc, RwLock};
-
-use ctor::{ctor, dtor};
-use datafusion::arrow::datatypes::DataType;
-use datafusion::execution::context::SessionContext;
-use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
-use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
-use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
-use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
-use iceberg_datafusion::IcebergCatalogProvider;
-use iceberg_test_utils::docker::DockerCompose;
-use iceberg_test_utils::{normalize_test_name, set_up};
-use port_scanner::scan_port_addr;
-use tokio::time::sleep;
-
-const HMS_CATALOG_PORT: u16 = 9083;
-const MINIO_PORT: u16 = 9000;
-static DOCKER_COMPOSE_ENV: RwLock<Option<DockerCompose>> = RwLock::new(None);
-
-struct TestFixture {
- hms_catalog: HmsCatalog,
- props: HashMap<String, String>,
- hms_catalog_socket_addr: SocketAddr,
-}
-
-#[ctor]
-fn before_all() {
- let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
- let docker_compose = DockerCompose::new(
- normalize_test_name(module_path!()),
- format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
- );
- docker_compose.run();
- guard.replace(docker_compose);
-}
-
-#[dtor]
-fn after_all() {
- let mut guard = DOCKER_COMPOSE_ENV.write().unwrap();
- guard.take();
-}
-
-impl TestFixture {
- fn get_catalog(&self) -> HmsCatalog {
- let config = HmsCatalogConfig::builder()
- .address(self.hms_catalog_socket_addr.to_string())
- .thrift_transport(HmsThriftTransport::Buffered)
- .warehouse("s3a://warehouse/hive".to_string())
- .props(self.props.clone())
- .build();
-
- HmsCatalog::new(config).unwrap()
- }
-}
-
-async fn get_test_fixture() -> TestFixture {
- set_up();
-
- let (hms_catalog_ip, minio_ip) = {
- let guard = DOCKER_COMPOSE_ENV.read().unwrap();
- let docker_compose = guard.as_ref().unwrap();
- (
- docker_compose.get_container_ip("hive-metastore"),
- docker_compose.get_container_ip("minio"),
- )
- };
-
- let hms_catalog_socket_addr = SocketAddr::new(hms_catalog_ip,
HMS_CATALOG_PORT);
- let minio_socket_addr = SocketAddr::new(minio_ip, MINIO_PORT);
- while !scan_port_addr(hms_catalog_socket_addr) {
- log::info!("Waiting for 1s hms catalog to ready...");
- sleep(std::time::Duration::from_millis(1000)).await;
- }
-
- let props = HashMap::from([
- (
- S3_ENDPOINT.to_string(),
- format!("http://{}", minio_socket_addr),
- ),
- (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
- (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
- (S3_REGION.to_string(), "us-east-1".to_string()),
- ]);
-
- let config = HmsCatalogConfig::builder()
- .address(hms_catalog_socket_addr.to_string())
- .thrift_transport(HmsThriftTransport::Buffered)
- .warehouse("s3a://warehouse/hive".to_string())
- .props(props.clone())
- .build();
-
- let hms_catalog = HmsCatalog::new(config).unwrap();
-
- TestFixture {
- hms_catalog,
- props,
- hms_catalog_socket_addr,
- }
-}
-
-async fn set_test_namespace(catalog: &HmsCatalog, namespace: &NamespaceIdent)
-> Result<()> {
- let properties = HashMap::new();
-
- catalog.create_namespace(namespace, properties).await?;
-
- Ok(())
-}
-
-fn set_table_creation(location: impl ToString, name: impl ToString) ->
Result<TableCreation> {
- let schema = Schema::builder()
- .with_schema_id(0)
- .with_fields(vec![
- NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
- NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::String)).into(),
- ])
- .build()?;
-
- let creation = TableCreation::builder()
- .location(location.to_string())
- .name(name.to_string())
- .properties(HashMap::new())
- .schema(schema)
- .build();
-
- Ok(creation)
-}
-
-#[tokio::test]
-async fn test_provider_get_table_schema() -> Result<()> {
- let fixture = get_test_fixture().await;
- let namespace =
NamespaceIdent::new("test_provider_get_table_schema".to_string());
- set_test_namespace(&fixture.hms_catalog, &namespace).await?;
-
- let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- fixture
- .hms_catalog
- .create_table(&namespace, creation)
- .await?;
-
- let client = Arc::new(fixture.get_catalog());
- let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
-
- let ctx = SessionContext::new();
- ctx.register_catalog("hive", catalog);
-
- let provider = ctx.catalog("hive").unwrap();
- let schema = provider.schema("test_provider_get_table_schema").unwrap();
-
- let table = schema.table("my_table").await.unwrap().unwrap();
- let table_schema = table.schema();
-
- let expected = [("foo", &DataType::Int32), ("bar", &DataType::Utf8)];
-
- for (field, exp) in table_schema.fields().iter().zip(expected.iter()) {
- assert_eq!(field.name(), exp.0);
- assert_eq!(field.data_type(), exp.1);
- assert!(!field.is_nullable())
- }
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_provider_list_table_names() -> Result<()> {
- let fixture = get_test_fixture().await;
- let namespace =
NamespaceIdent::new("test_provider_list_table_names".to_string());
- set_test_namespace(&fixture.hms_catalog, &namespace).await?;
-
- let creation = set_table_creation("s3a://warehouse/hive", "my_table")?;
- fixture
- .hms_catalog
- .create_table(&namespace, creation)
- .await?;
-
- let client = Arc::new(fixture.get_catalog());
- let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
-
- let ctx = SessionContext::new();
- ctx.register_catalog("hive", catalog);
-
- let provider = ctx.catalog("hive").unwrap();
- let schema = provider.schema("test_provider_list_table_names").unwrap();
-
- let expected = vec!["my_table"];
- let result = schema.table_names();
-
- assert_eq!(result, expected);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn test_provider_list_schema_names() -> Result<()> {
- let fixture = get_test_fixture().await;
- let namespace =
NamespaceIdent::new("test_provider_list_schema_names".to_string());
- set_test_namespace(&fixture.hms_catalog, &namespace).await?;
-
- set_table_creation("test_provider_list_schema_names", "my_table")?;
- let client = Arc::new(fixture.get_catalog());
- let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
-
- let ctx = SessionContext::new();
- ctx.register_catalog("hive", catalog);
-
- let provider = ctx.catalog("hive").unwrap();
-
- let expected = ["default", "test_provider_list_schema_names"];
- let result = provider.schema_names();
-
- assert!(expected
- .iter()
- .all(|item| result.contains(&item.to_string())));
- Ok(())
-}
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
new file mode 100644
index 0000000..9e62930
--- /dev/null
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for Iceberg Datafusion with Hive Metastore.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion::arrow::datatypes::DataType;
+use datafusion::execution::context::SessionContext;
+use iceberg::io::FileIOBuilder;
+use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+use iceberg::{Catalog, NamespaceIdent, Result, TableCreation};
+use iceberg_catalog_memory::MemoryCatalog;
+use iceberg_datafusion::IcebergCatalogProvider;
+use tempfile::TempDir;
+
+fn temp_path() -> String {
+ let temp_dir = TempDir::new().unwrap();
+ temp_dir.path().to_str().unwrap().to_string()
+}
+
+fn get_iceberg_catalog() -> MemoryCatalog {
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ MemoryCatalog::new(file_io, Some(temp_path()))
+}
+
+async fn set_test_namespace(catalog: &MemoryCatalog, namespace:
&NamespaceIdent) -> Result<()> {
+ let properties = HashMap::new();
+
+ catalog.create_namespace(namespace, properties).await?;
+
+ Ok(())
+}
+
+fn set_table_creation(location: impl ToString, name: impl ToString) ->
Result<TableCreation> {
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+
+ let creation = TableCreation::builder()
+ .location(location.to_string())
+ .name(name.to_string())
+ .properties(HashMap::new())
+ .schema(schema)
+ .build();
+
+ Ok(creation)
+}
+
+#[tokio::test]
+async fn test_provider_get_table_schema() -> Result<()> {
+ let iceberg_catalog = get_iceberg_catalog();
+ let namespace =
NamespaceIdent::new("test_provider_get_table_schema".to_string());
+ set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+ let creation = set_table_creation(temp_path(), "my_table")?;
+ iceberg_catalog.create_table(&namespace, creation).await?;
+
+ let client = Arc::new(iceberg_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("catalog", catalog);
+
+ let provider = ctx.catalog("catalog").unwrap();
+ let schema = provider.schema("test_provider_get_table_schema").unwrap();
+
+ let table = schema.table("my_table").await.unwrap().unwrap();
+ let table_schema = table.schema();
+
+ let expected = [("foo", &DataType::Int32), ("bar", &DataType::Utf8)];
+
+ for (field, exp) in table_schema.fields().iter().zip(expected.iter()) {
+ assert_eq!(field.name(), exp.0);
+ assert_eq!(field.data_type(), exp.1);
+ assert!(!field.is_nullable())
+ }
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_provider_list_table_names() -> Result<()> {
+ let iceberg_catalog = get_iceberg_catalog();
+ let namespace =
NamespaceIdent::new("test_provider_list_table_names".to_string());
+ set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+ let creation = set_table_creation(temp_path(), "my_table")?;
+ iceberg_catalog.create_table(&namespace, creation).await?;
+
+ let client = Arc::new(iceberg_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("catalog", catalog);
+
+ let provider = ctx.catalog("catalog").unwrap();
+ let schema = provider.schema("test_provider_list_table_names").unwrap();
+
+ let expected = vec!["my_table"];
+ let result = schema.table_names();
+
+ assert_eq!(result, expected);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_provider_list_schema_names() -> Result<()> {
+ let iceberg_catalog = get_iceberg_catalog();
+ let namespace =
NamespaceIdent::new("test_provider_list_schema_names".to_string());
+ set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+ set_table_creation("test_provider_list_schema_names", "my_table")?;
+ let client = Arc::new(iceberg_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("catalog", catalog);
+
+ let provider = ctx.catalog("catalog").unwrap();
+
+ let expected = ["test_provider_list_schema_names"];
+ let result = provider.schema_names();
+
+ assert!(expected
+ .iter()
+ .all(|item| result.contains(&item.to_string())));
+ Ok(())
+}