This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 641d058 test: Add integration tests for rest catalog. (#109)
641d058 is described below
commit 641d05800e97afe732fe6c8317c01d022e449092
Author: Renjie Liu <[email protected]>
AuthorDate: Fri Dec 8 23:49:02 2023 +0800
test: Add integration tests for rest catalog. (#109)
* test: Introduce integration test of rest catalog client against docker
container
* Fix comment
---
Cargo.toml | 4 +-
Makefile | 6 +-
crates/catalog/rest/Cargo.toml | 3 +
crates/catalog/rest/src/catalog.rs | 46 ++-
.../rest/testdata/rest_catalog/docker-compose.yaml | 65 ++++
crates/catalog/rest/tests/rest_catalog_test.rs | 376 +++++++++++++++++++++
crates/iceberg/src/catalog/mod.rs | 2 +-
crates/iceberg/src/spec/partition.rs | 74 ++++
crates/iceberg/src/spec/sort.rs | 7 +
Makefile => crates/test_utils/Cargo.toml | 25 +-
crates/test_utils/src/cmd.rs | 41 +++
crates/test_utils/src/docker.rs | 102 ++++++
crates/test_utils/src/lib.rs | 41 +++
13 files changed, 767 insertions(+), 25 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 49e10e0..78763aa 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
[workspace]
resolver = "2"
-members = ["crates/catalog/*", "crates/iceberg"]
+members = ["crates/catalog/*", "crates/iceberg", "crates/test_utils"]
[workspace.dependencies]
anyhow = "1.0.72"
@@ -31,6 +31,7 @@ bitvec = "1.0.1"
chrono = "0.4"
derive_builder = "0.12.0"
either = "1"
+env_logger = "0.10.0"
futures = "0.3"
iceberg = { path = "./crates/iceberg" }
iceberg-catalog-rest = { path = "./crates/catalog/rest" }
@@ -43,6 +44,7 @@ once_cell = "1"
opendal = "0.42"
ordered-float = "4.0.0"
pretty_assertions = "1.4.0"
+port_scanner = "0.1.5"
reqwest = { version = "^0.11", features = ["json"] }
rust_decimal = "1.31.0"
serde = { version = "^1.0", features = ["rc"] }
diff --git a/Makefile b/Makefile
index 63fcb64..d846303 100644
--- a/Makefile
+++ b/Makefile
@@ -15,9 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+.EXPORT_ALL_VARIABLES:
+
+RUST_LOG = debug
+
build:
cargo build
-
+
check-fmt:
cargo fmt --all -- --check
diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml
index 43e2ae6..883f55c 100644
--- a/crates/catalog/rest/Cargo.toml
+++ b/crates/catalog/rest/Cargo.toml
@@ -31,6 +31,7 @@ keywords = ["iceberg", "rest", "catalog"]
async-trait = { workspace = true }
chrono = { workspace = true }
iceberg = { workspace = true }
+log = "0.4.20"
reqwest = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
@@ -40,5 +41,7 @@ urlencoding = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
mockito = { workspace = true }
+port_scanner = { workspace = true }
tokio = { workspace = true }
diff --git a/crates/catalog/rest/src/catalog.rs
b/crates/catalog/rest/src/catalog.rs
index 1dfbe79..7ccd108 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -21,7 +21,7 @@ use std::collections::HashMap;
use async_trait::async_trait;
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
-use reqwest::{Client, Request};
+use reqwest::{Client, Request, Response, StatusCode};
use serde::de::DeserializeOwned;
use typed_builder::TypedBuilder;
use urlencoding::encode;
@@ -166,6 +166,7 @@ impl HttpClient {
if resp.status().as_u16() == SUCCESS_CODE {
Ok(())
} else {
+ let code = resp.status();
let text = resp.bytes().await?;
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
Error::new(
@@ -173,6 +174,33 @@ impl HttpClient {
"Failed to parse response from rest catalog server!",
)
.with_context("json", String::from_utf8_lossy(&text))
+ .with_context("code", code.to_string())
+ .with_source(e)
+ })?;
+ Err(e.into())
+ }
+ }
+
+ /// More generic logic handling for special cases like head.
+ async fn do_execute<R, E: DeserializeOwned + Into<Error>>(
+ &self,
+ request: Request,
+ handler: impl FnOnce(&Response) -> Option<R>,
+ ) -> Result<R> {
+ let resp = self.0.execute(request).await?;
+
+ if let Some(ret) = handler(&resp) {
+ Ok(ret)
+ } else {
+ let code = resp.status();
+ let text = resp.bytes().await?;
+ let e = serde_json::from_slice::<E>(&text).map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Failed to parse response from rest catalog server!",
+ )
+ .with_context("code", code.to_string())
+ .with_context("json", String::from_utf8_lossy(&text))
.with_source(e)
})?;
Err(e.into())
@@ -273,9 +301,12 @@ impl Catalog for RestCatalog {
.build()?;
self.client
- .execute::<ErrorResponse, NO_CONTENT>(request)
+ .do_execute::<bool, ErrorResponse>(request, |resp| match
resp.status() {
+ StatusCode::NO_CONTENT => Some(true),
+ StatusCode::NOT_FOUND => Some(false),
+ _ => None,
+ })
.await
- .map(|_| true)
}
/// Drop a namespace from the catalog.
@@ -326,7 +357,7 @@ impl Catalog for RestCatalog {
partition_spec: creation.partition_spec,
write_order: creation.sort_order,
// We don't support stage create yet.
- stage_create: None,
+ stage_create: Some(false),
properties: if creation.properties.is_empty() {
None
} else {
@@ -406,9 +437,12 @@ impl Catalog for RestCatalog {
.build()?;
self.client
- .execute::<ErrorResponse, NO_CONTENT>(request)
+ .do_execute::<bool, ErrorResponse>(request, |resp| match
resp.status() {
+ StatusCode::NO_CONTENT => Some(true),
+ StatusCode::NOT_FOUND => Some(false),
+ _ => None,
+ })
.await
- .map(|_| true)
}
/// Rename a table in the catalog.
diff --git a/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
new file mode 100644
index 0000000..5c10146
--- /dev/null
+++ b/crates/catalog/rest/testdata/rest_catalog/docker-compose.yaml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+ rest:
+ image: tabulario/iceberg-rest:0.10.0
+ environment:
+ - AWS_ACCESS_KEY_ID=admin
+ - AWS_SECRET_ACCESS_KEY=password
+ - AWS_REGION=us-east-1
+ - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
+ - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
+ - CATALOG_WAREHOUSE=s3://icebergdata/demo
+ - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
+ - CATALOG_S3_ENDPOINT=http://minio:9000
+ depends_on:
+ - minio
+ links:
+ - minio:icebergdata.minio
+ expose:
+ - 8181
+
+ minio:
+ image: minio/minio
+ environment:
+ - MINIO_ROOT_USER=admin
+ - MINIO_ROOT_PASSWORD=password
+ - MINIO_DOMAIN=minio
+ expose:
+ - 9001
+ - 9000
+ command: [ "server", "/data", "--console-address", ":9001" ]
+
+ mc:
+ depends_on:
+ - minio
+ image: minio/mc
+ environment:
+ - AWS_ACCESS_KEY_ID=admin
+ - AWS_SECRET_ACCESS_KEY=password
+ - AWS_REGION=us-east-1
+ entrypoint: >
+ /bin/sh -c "
+ until (/usr/bin/mc config host add minio http://minio:9000 admin
password) do echo '...waiting...' && sleep 1; done;
+ /usr/bin/mc rm -r --force minio/icebergdata;
+ /usr/bin/mc mb minio/icebergdata;
+ /usr/bin/mc policy set public minio/icebergdata;
+ tail -f /dev/null
+ "
\ No newline at end of file
diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs
b/crates/catalog/rest/tests/rest_catalog_test.rs
new file mode 100644
index 0000000..a4d0795
--- /dev/null
+++ b/crates/catalog/rest/tests/rest_catalog_test.rs
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for rest catalog.
+
+use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
+use iceberg::transaction::Transaction;
+use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
+use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
+use iceberg_test_utils::docker::DockerCompose;
+use iceberg_test_utils::{normalize_test_name, set_up};
+use port_scanner::scan_port_addr;
+use std::collections::HashMap;
+use tokio::time::sleep;
+
+const REST_CATALOG_PORT: u16 = 8181;
+
+struct TestFixture {
+ _docker_compose: DockerCompose,
+ rest_catalog: RestCatalog,
+}
+
+async fn set_test_fixture(func: &str) -> TestFixture {
+ set_up();
+ let docker_compose = DockerCompose::new(
+ normalize_test_name(format!("{}_{func}", module_path!())),
+ format!("{}/testdata/rest_catalog", env!("CARGO_MANIFEST_DIR")),
+ );
+
+ // Start docker compose
+ docker_compose.run();
+
+ let rest_catalog_ip = docker_compose.get_container_ip("rest");
+
+ let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
+ loop {
+ if !scan_port_addr(&read_port) {
+ log::info!("Waiting for 1s rest catalog to ready...");
+ sleep(std::time::Duration::from_millis(1000)).await;
+ } else {
+ break;
+ }
+ }
+
+ let config = RestCatalogConfig::builder()
+ .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
+ .build();
+ let rest_catalog = RestCatalog::new(config).await.unwrap();
+
+ TestFixture {
+ _docker_compose: docker_compose,
+ rest_catalog,
+ }
+}
+#[tokio::test]
+async fn test_get_non_exist_namespace() {
+ let fixture = set_test_fixture("test_get_non_exist_namespace").await;
+
+ let result = fixture
+ .rest_catalog
+ .get_namespace(&NamespaceIdent::from_strs(["demo"]).unwrap())
+ .await;
+
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Namespace does not exist"));
+}
+
+#[tokio::test]
+async fn test_get_namespace() {
+ let fixture = set_test_fixture("test_get_namespace").await;
+
+ let ns = Namespace::with_properties(
+ NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "ray".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ // Verify that namespace doesn't exist
+ assert!(fixture.rest_catalog.get_namespace(ns.name()).await.is_err());
+
+ // Create this namespace
+ let created_ns = fixture
+ .rest_catalog
+ .create_namespace(ns.name(), ns.properties().clone())
+ .await
+ .unwrap();
+
+ assert_eq!(ns.name(), created_ns.name());
+ assert_map_contains(ns.properties(), created_ns.properties());
+
+ // Check that this namespace already exists
+ let get_ns = fixture.rest_catalog.get_namespace(ns.name()).await.unwrap();
+ assert_eq!(ns.name(), get_ns.name());
+ assert_map_contains(ns.properties(), created_ns.properties());
+}
+
+#[tokio::test]
+async fn test_list_namespace() {
+ let fixture = set_test_fixture("test_list_namespace").await;
+
+ let ns1 = Namespace::with_properties(
+ NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "ray".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ let ns2 = Namespace::with_properties(
+ NamespaceIdent::from_strs(["apple", "macos"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "xuanwo".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ // Currently this namespace doesn't exist, so it should return error.
+ assert!(fixture
+ .rest_catalog
+ .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ .await
+ .is_err());
+
+ // Create namespaces
+ fixture
+ .rest_catalog
+ .create_namespace(ns1.name(), ns1.properties().clone())
+ .await
+ .unwrap();
+ fixture
+ .rest_catalog
+ .create_namespace(ns2.name(), ns1.properties().clone())
+ .await
+ .unwrap();
+
+ // List namespace
+ let mut nss = fixture
+ .rest_catalog
+ .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ .await
+ .unwrap();
+ nss.sort();
+
+ assert_eq!(&nss[0], ns1.name());
+ assert_eq!(&nss[1], ns2.name());
+}
+
+#[tokio::test]
+async fn test_list_empty_namespace() {
+ let fixture = set_test_fixture("test_list_empty_namespace").await;
+
+ let ns_apple = Namespace::with_properties(
+ NamespaceIdent::from_strs(["apple"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "ray".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ // Currently this namespace doesn't exist, so it should return error.
+ assert!(fixture
+ .rest_catalog
+ .list_namespaces(Some(ns_apple.name()))
+ .await
+ .is_err());
+
+ // Create namespaces
+ fixture
+ .rest_catalog
+ .create_namespace(ns_apple.name(), ns_apple.properties().clone())
+ .await
+ .unwrap();
+
+ // List namespace
+ let nss = fixture
+ .rest_catalog
+ .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ .await
+ .unwrap();
+ assert!(nss.is_empty());
+}
+
+#[tokio::test]
+async fn test_list_root_namespace() {
+ let fixture = set_test_fixture("test_list_root_namespace").await;
+
+ let ns1 = Namespace::with_properties(
+ NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "ray".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ let ns2 = Namespace::with_properties(
+ NamespaceIdent::from_strs(["google", "android"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "xuanwo".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ // Currently this namespace doesn't exist, so it should return error.
+ assert!(fixture
+ .rest_catalog
+ .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap()))
+ .await
+ .is_err());
+
+ // Create namespaces
+ fixture
+ .rest_catalog
+ .create_namespace(ns1.name(), ns1.properties().clone())
+ .await
+ .unwrap();
+ fixture
+ .rest_catalog
+ .create_namespace(ns2.name(), ns1.properties().clone())
+ .await
+ .unwrap();
+
+ // List namespace
+ let mut nss = fixture.rest_catalog.list_namespaces(None).await.unwrap();
+ nss.sort();
+
+ assert_eq!(&nss[0], &NamespaceIdent::from_strs(["apple"]).unwrap());
+ assert_eq!(&nss[1], &NamespaceIdent::from_strs(["google"]).unwrap());
+}
+
+#[tokio::test]
+async fn test_create_table() {
+ let fixture = set_test_fixture("test_create_table").await;
+
+ let ns = Namespace::with_properties(
+ NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "ray".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ // Create namespaces
+ fixture
+ .rest_catalog
+ .create_namespace(ns.name(), ns.properties().clone())
+ .await
+ .unwrap();
+
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![2])
+ .with_fields(vec![
+ NestedField::optional(1, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let table_creation = TableCreation::builder()
+ .name("t1".to_string())
+ .schema(schema.clone())
+ .build();
+
+ let table = fixture
+ .rest_catalog
+ .create_table(ns.name(), table_creation)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ table.identifier(),
+ &TableIdent::new(ns.name().clone(), "t1".to_string())
+ );
+
+ assert_eq!(
+ table.metadata().current_schema().as_struct(),
+ schema.as_struct()
+ );
+ assert_eq!(table.metadata().format_version(), FormatVersion::V2);
+ assert!(table.metadata().current_snapshot().is_none());
+ assert!(table.metadata().history().is_empty());
+ assert!(table.metadata().default_sort_order().unwrap().is_unsorted());
+ assert!(table
+ .metadata()
+ .default_partition_spec()
+ .unwrap()
+ .is_unpartitioned());
+}
+
+#[tokio::test]
+async fn test_update_table() {
+ let fixture = set_test_fixture("test_update_table").await;
+
+ let ns = Namespace::with_properties(
+ NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
+ HashMap::from([
+ ("owner".to_string(), "ray".to_string()),
+ ("community".to_string(), "apache".to_string()),
+ ]),
+ );
+
+ // Create namespaces
+ fixture
+ .rest_catalog
+ .create_namespace(ns.name(), ns.properties().clone())
+ .await
+ .unwrap();
+
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![2])
+ .with_fields(vec![
+ NestedField::optional(1, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ // Now we create a table
+ let table_creation = TableCreation::builder()
+ .name("t1".to_string())
+ .schema(schema.clone())
+ .build();
+
+ let table = fixture
+ .rest_catalog
+ .create_table(ns.name(), table_creation)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ table.identifier(),
+ &TableIdent::new(ns.name().clone(), "t1".to_string())
+ );
+
+ // Update table by committing transaction
+ let table2 = Transaction::new(&table)
+ .set_properties(HashMap::from([("prop1".to_string(),
"v1".to_string())]))
+ .unwrap()
+ .commit(&fixture.rest_catalog)
+ .await
+ .unwrap();
+
+ assert_map_contains(
+ &HashMap::from([("prop1".to_string(), "v1".to_string())]),
+ table2.metadata().properties(),
+ );
+}
+
+fn assert_map_contains(map1: &HashMap<String, String>, map2: &HashMap<String,
String>) {
+ for (k, v) in map1 {
+ assert!(map2.contains_key(k));
+ assert_eq!(map2.get(k).unwrap(), v);
+ }
+}
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index 2ddeace..b688375 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -97,7 +97,7 @@ pub trait Catalog: std::fmt::Debug {
/// The namespace identifier is a list of strings, where each string is a
/// component of the namespace. It's catalog implementer's responsibility to
/// handle the namespace identifier correctly.
-#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash,
PartialOrd, Ord)]
pub struct NamespaceIdent(Vec<String>);
impl NamespaceIdent {
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index 16395dc..484ec7e 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -58,6 +58,17 @@ impl PartitionSpec {
pub fn builder() -> PartitionSpecBuilder {
PartitionSpecBuilder::default()
}
+
+ /// Returns if the partition spec is unpartitioned.
+ ///
+ /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields
are [`Transform::Void`] transform.
+ pub fn is_unpartitioned(&self) -> bool {
+ self.fields.is_empty()
+ || self
+ .fields
+ .iter()
+ .all(|f| matches!(f.transform, Transform::Void))
+ }
}
/// Reference to [`UnboundPartitionSpec`].
@@ -143,6 +154,69 @@ mod tests {
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
}
+ #[test]
+ fn test_is_unpartitioned() {
+ let partition_spec = PartitionSpec::builder()
+ .with_spec_id(1)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+ assert!(
+ partition_spec.is_unpartitioned(),
+ "Empty partition spec should be unpartitioned"
+ );
+
+ let partition_spec = PartitionSpec::builder()
+ .with_partition_field(
+ PartitionField::builder()
+ .source_id(1)
+ .field_id(1)
+ .name("id".to_string())
+ .transform(Transform::Identity)
+ .build(),
+ )
+ .with_partition_field(
+ PartitionField::builder()
+ .source_id(2)
+ .field_id(2)
+ .name("name".to_string())
+ .transform(Transform::Void)
+ .build(),
+ )
+ .with_spec_id(1)
+ .build()
+ .unwrap();
+ assert!(
+ !partition_spec.is_unpartitioned(),
+ "Partition spec with one non void transform should not be
unpartitioned"
+ );
+
+ let partition_spec = PartitionSpec::builder()
+ .with_spec_id(1)
+ .with_partition_field(
+ PartitionField::builder()
+ .source_id(1)
+ .field_id(1)
+ .name("id".to_string())
+ .transform(Transform::Void)
+ .build(),
+ )
+ .with_partition_field(
+ PartitionField::builder()
+ .source_id(2)
+ .field_id(2)
+ .name("name".to_string())
+ .transform(Transform::Void)
+ .build(),
+ )
+ .build()
+ .unwrap();
+ assert!(
+ partition_spec.is_unpartitioned(),
+ "Partition spec with all void field should be unpartitioned"
+ );
+ }
+
#[test]
fn test_unbound_partition_spec() {
let spec = r#"
diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs
index 2150421..01a1edd 100644
--- a/crates/iceberg/src/spec/sort.rs
+++ b/crates/iceberg/src/spec/sort.rs
@@ -81,6 +81,13 @@ impl SortOrder {
pub fn builder() -> SortOrderBuilder {
SortOrderBuilder::default()
}
+
+ /// Returns true if the sort order is unsorted.
+ ///
+ /// A [`SortOrder`] is unsorted if it has no sort fields.
+ pub fn is_unsorted(&self) -> bool {
+ self.fields.is_empty()
+ }
}
#[cfg(test)]
diff --git a/Makefile b/crates/test_utils/Cargo.toml
similarity index 67%
copy from Makefile
copy to crates/test_utils/Cargo.toml
index 63fcb64..91210c5 100644
--- a/Makefile
+++ b/crates/test_utils/Cargo.toml
@@ -15,21 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-build:
- cargo build
-
-check-fmt:
- cargo fmt --all -- --check
+[package]
+name = "iceberg_test_utils"
+version = "0.1.0"
+edition = "2021"
-check-clippy:
- cargo clippy --all-targets --all-features --workspace -- -D warnings
+[dependencies]
+env_logger = { workspace = true }
+log = "0.4.20"
-cargo-sort:
- cargo install cargo-sort
- cargo sort -c -w
-
-check: check-fmt check-clippy cargo-sort
-
-test:
- cargo test --no-fail-fast --all-targets --all-features --workspace
- cargo test --no-fail-fast --doc --all-features --workspace
\ No newline at end of file
+[features]
+tests = []
diff --git a/crates/test_utils/src/cmd.rs b/crates/test_utils/src/cmd.rs
new file mode 100644
index 0000000..604d4a1
--- /dev/null
+++ b/crates/test_utils/src/cmd.rs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::process::Command;
+
+pub fn run_command(mut cmd: Command, desc: impl ToString) {
+ let desc = desc.to_string();
+ log::info!("Starting to {}, command: {:?}", &desc, cmd);
+ let exit = cmd.status().unwrap();
+ if exit.success() {
+ log::info!("{} succeed!", desc)
+ } else {
+ panic!("{} failed: {:?}", desc, exit);
+ }
+}
+
+pub fn get_cmd_output(mut cmd: Command, desc: impl ToString) -> String {
+ let desc = desc.to_string();
+ log::info!("Starting to {}, command: {:?}", &desc, cmd);
+ let output = cmd.output().unwrap();
+ if output.status.success() {
+ log::info!("{} succeed!", desc);
+ String::from_utf8(output.stdout).unwrap()
+ } else {
+ panic!("{} failed: {:?}", desc, output.status);
+ }
+}
diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs
new file mode 100644
index 0000000..6c5fbef
--- /dev/null
+++ b/crates/test_utils/src/docker.rs
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cmd::{get_cmd_output, run_command};
+use std::process::Command;
+
+/// A utility to manage lifecycle of docker compose.
+///
+/// It's will start docker compose when calling `run` method, and will be
stopped when dropped.
+#[derive(Debug)]
+pub struct DockerCompose {
+ project_name: String,
+ docker_compose_dir: String,
+}
+
+impl DockerCompose {
+ pub fn new(project_name: impl ToString, docker_compose_dir: impl ToString)
-> Self {
+ Self {
+ project_name: project_name.to_string(),
+ docker_compose_dir: docker_compose_dir.to_string(),
+ }
+ }
+
+ pub fn project_name(&self) -> &str {
+ self.project_name.as_str()
+ }
+
+ pub fn run(&self) {
+ let mut cmd = Command::new("docker");
+ cmd.current_dir(&self.docker_compose_dir);
+
+ cmd.args(vec![
+ "compose",
+ "-p",
+ self.project_name.as_str(),
+ "up",
+ "-d",
+ "--wait",
+ "--timeout",
+ "1200000",
+ ]);
+
+ run_command(
+ cmd,
+ format!(
+ "Starting docker compose in {}, project name: {}",
+ self.docker_compose_dir, self.project_name
+ ),
+ )
+ }
+
+ pub fn get_container_ip(&self, service_name: impl AsRef<str>) -> String {
+ let container_name = format!("{}-{}-1", self.project_name,
service_name.as_ref());
+ let mut cmd = Command::new("docker");
+ cmd.arg("inspect")
+ .arg("-f")
+ .arg("{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}")
+ .arg(&container_name);
+
+ get_cmd_output(cmd, format!("Get container ip of {container_name}"))
+ .trim()
+ .to_string()
+ }
+}
+
+impl Drop for DockerCompose {
+ fn drop(&mut self) {
+ let mut cmd = Command::new("docker");
+ cmd.current_dir(&self.docker_compose_dir);
+
+ cmd.args(vec![
+ "compose",
+ "-p",
+ self.project_name.as_str(),
+ "down",
+ "-v",
+ "--remove-orphans",
+ ]);
+
+ run_command(
+ cmd,
+ format!(
+ "Stopping docker compose in {}, project name: {}",
+ self.docker_compose_dir, self.project_name
+ ),
+ )
+ }
+}
diff --git a/crates/test_utils/src/lib.rs b/crates/test_utils/src/lib.rs
new file mode 100644
index 0000000..4f63b8d
--- /dev/null
+++ b/crates/test_utils/src/lib.rs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This crate contains common utilities for testing.
+//!
+//! It's not intended for use outside of `iceberg-rust`.
+
+#[cfg(feature = "tests")]
+mod cmd;
+#[cfg(feature = "tests")]
+pub mod docker;
+
+#[cfg(feature = "tests")]
+pub use common::*;
+
+#[cfg(feature = "tests")]
+mod common {
+ use std::sync::Once;
+
+ static INIT: Once = Once::new();
+ pub fn set_up() {
+ INIT.call_once(env_logger::init);
+ }
+ pub fn normalize_test_name(s: impl ToString) -> String {
+ s.to_string().replace("::", "__").replace('.', "_")
+ }
+}