This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 045c0b2e2 Cleanup integration tests (#2074)
045c0b2e2 is described below
commit 045c0b2e298fee03e6d658bba78092a7fcf65747
Author: Renjie Liu <[email protected]>
AuthorDate: Wed Jan 28 12:23:50 2026 +0800
Cleanup integration tests (#2074)
---
Cargo.lock | 3 -
crates/integration_tests/Cargo.toml | 2 -
crates/integration_tests/src/lib.rs | 50 +--
.../tests/shared_tests/append_data_file_test.rs | 138 --------
.../append_partition_data_file_test.rs | 235 -------------
.../tests/shared_tests/datafusion.rs | 142 --------
crates/integration_tests/tests/shared_tests/mod.rs | 4 -
.../tests/shared_tests/scan_all_type.rs | 366 ---------------------
.../sqllogictest/testdata/schedules/df_test.toml | 8 +
.../testdata/slts/df_test/basic_queries.slt | 212 ++++++++++++
.../testdata/slts/df_test/scan_all_types.slt | 128 +++++++
crates/test_utils/Cargo.toml | 1 -
crates/test_utils/src/cmd.rs | 58 ----
crates/test_utils/src/docker.rs | 165 ----------
crates/test_utils/src/lib.rs | 5 -
15 files changed, 349 insertions(+), 1168 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c0c46ce67..211b6416c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3547,11 +3547,9 @@ version = "0.8.0"
dependencies = [
"arrow-array",
"arrow-schema",
- "datafusion",
"futures",
"iceberg",
"iceberg-catalog-rest",
- "iceberg-datafusion",
"iceberg_test_utils",
"ordered-float 2.10.1",
"parquet",
@@ -3608,7 +3606,6 @@ name = "iceberg_test_utils"
version = "0.8.0"
dependencies = [
"iceberg",
- "tracing",
"tracing-subscriber",
]
diff --git a/crates/integration_tests/Cargo.toml
b/crates/integration_tests/Cargo.toml
index 07291f29e..2ed211769 100644
--- a/crates/integration_tests/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -27,11 +27,9 @@ version = { workspace = true }
[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
-datafusion = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
-iceberg-datafusion = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
diff --git a/crates/integration_tests/src/lib.rs
b/crates/integration_tests/src/lib.rs
index e99c74df6..4bf8f4d19 100644
--- a/crates/integration_tests/src/lib.rs
+++ b/crates/integration_tests/src/lib.rs
@@ -20,19 +20,7 @@ use std::sync::OnceLock;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY};
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
-use iceberg_test_utils::docker::DockerCompose;
-use iceberg_test_utils::{
- get_minio_endpoint, get_rest_catalog_endpoint, normalize_test_name, set_up,
-};
-
-const REST_CATALOG_PORT: u16 = 8181;
-
-/// Test fixture that manages Docker containers.
-/// This is kept for backward compatibility but deprecated in favor of
GlobalTestFixture.
-pub struct TestFixture {
- pub _docker_compose: DockerCompose,
- pub catalog_config: HashMap<String, String>,
-}
+use iceberg_test_utils::{get_minio_endpoint, get_rest_catalog_endpoint,
set_up};
/// Global test fixture that uses environment-based configuration.
/// This assumes Docker containers are started externally (e.g., via `make
docker-up`).
@@ -68,39 +56,3 @@ impl GlobalTestFixture {
pub fn get_test_fixture() -> &'static GlobalTestFixture {
GLOBAL_FIXTURE.get_or_init(GlobalTestFixture::from_env)
}
-
-/// Legacy function to create a test fixture with Docker container management.
-/// Deprecated: prefer using `get_test_fixture()` with externally managed
containers.
-pub fn set_test_fixture(func: &str) -> TestFixture {
- set_up();
- let docker_compose = DockerCompose::new(
- normalize_test_name(format!("{}_{func}", module_path!())),
- format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
- );
-
- // Stop any containers from previous runs and start new ones
- docker_compose.down();
- docker_compose.up();
-
- let rest_catalog_ip = docker_compose.get_container_ip("rest");
- let minio_ip = docker_compose.get_container_ip("minio");
-
- let catalog_config = HashMap::from([
- (
- REST_CATALOG_PROP_URI.to_string(),
- format!("http://{rest_catalog_ip}:{REST_CATALOG_PORT}"),
- ),
- (
- S3_ENDPOINT.to_string(),
- format!("http://{}:{}", minio_ip, 9000),
- ),
- (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
- (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
- (S3_REGION.to_string(), "us-east-1".to_string()),
- ]);
-
- TestFixture {
- _docker_compose: docker_compose,
- catalog_config,
- }
-}
diff --git
a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
deleted file mode 100644
index bedc97510..000000000
--- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Integration tests for rest catalog.
-
-use std::sync::Arc;
-
-use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch,
StringArray};
-use futures::TryStreamExt;
-use iceberg::transaction::{ApplyTransactionAction, Transaction};
-use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
-use iceberg::writer::file_writer::ParquetWriterBuilder;
-use iceberg::writer::file_writer::location_generator::{
- DefaultFileNameGenerator, DefaultLocationGenerator,
-};
-use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
-use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
-use iceberg::{Catalog, CatalogBuilder, TableCreation};
-use iceberg_catalog_rest::RestCatalogBuilder;
-use parquet::arrow::arrow_reader::ArrowReaderOptions;
-use parquet::file::properties::WriterProperties;
-
-use crate::get_shared_containers;
-use crate::shared_tests::{random_ns, test_schema};
-
-#[tokio::test]
-async fn test_append_data_file() {
- let fixture = get_shared_containers();
- let rest_catalog = RestCatalogBuilder::default()
- .load("rest", fixture.catalog_config.clone())
- .await
- .unwrap();
- let ns = random_ns().await;
- let schema = test_schema();
-
- let table_creation = TableCreation::builder()
- .name("t1".to_string())
- .schema(schema.clone())
- .build();
-
- let table = rest_catalog
- .create_table(ns.name(), table_creation)
- .await
- .unwrap();
-
- // Create the writer and write the data
- let schema: Arc<arrow_schema::Schema> = Arc::new(
- table
- .metadata()
- .current_schema()
- .as_ref()
- .try_into()
- .unwrap(),
- );
- let location_generator =
DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
- let file_name_generator = DefaultFileNameGenerator::new(
- "test".to_string(),
- None,
- iceberg::spec::DataFileFormat::Parquet,
- );
- let parquet_writer_builder = ParquetWriterBuilder::new(
- WriterProperties::default(),
- table.metadata().current_schema().clone(),
- );
- let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
- parquet_writer_builder,
- table.file_io().clone(),
- location_generator.clone(),
- file_name_generator.clone(),
- );
- let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_file_writer_builder);
- let mut data_file_writer =
data_file_writer_builder.build(None).await.unwrap();
- let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None,
Some("baz")]);
- let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
- let col3 = BooleanArray::from(vec![Some(true), Some(false), None,
Some(false)]);
- let batch = RecordBatch::try_new(schema.clone(), vec![
- Arc::new(col1) as ArrayRef,
- Arc::new(col2) as ArrayRef,
- Arc::new(col3) as ArrayRef,
- ])
- .unwrap();
- data_file_writer.write(batch.clone()).await.unwrap();
- let data_file = data_file_writer.close().await.unwrap();
-
- // check parquet file schema
- let content = table
- .file_io()
- .new_input(data_file[0].file_path())
- .unwrap()
- .read()
- .await
- .unwrap();
- let parquet_reader =
parquet::arrow::arrow_reader::ArrowReaderMetadata::load(
- &content,
- ArrowReaderOptions::default(),
- )
- .unwrap();
- let field_ids: Vec<i32> = parquet_reader
- .parquet_schema()
- .columns()
- .iter()
- .map(|col| col.self_type().get_basic_info().id())
- .collect();
- assert_eq!(field_ids, vec![1, 2, 3]);
-
- // commit result
- let tx = Transaction::new(&table);
- let append_action = tx.fast_append().add_data_files(data_file.clone());
- let tx = append_action.apply(tx).unwrap();
- let table = tx.commit(&rest_catalog).await.unwrap();
-
- // check result
- let batch_stream = table
- .scan()
- .select_all()
- .build()
- .unwrap()
- .to_arrow()
- .await
- .unwrap();
- let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
- assert_eq!(batches.len(), 1);
- assert_eq!(batches[0], batch);
-}
diff --git
a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
deleted file mode 100644
index a305ec084..000000000
---
a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
+++ /dev/null
@@ -1,235 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Integration test for partition data file
-
-use std::sync::Arc;
-
-use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch,
StringArray};
-use futures::TryStreamExt;
-use iceberg::spec::{
- Literal, PartitionKey, PrimitiveLiteral, Struct, Transform,
UnboundPartitionSpec,
-};
-use iceberg::table::Table;
-use iceberg::transaction::{ApplyTransactionAction, Transaction};
-use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
-use iceberg::writer::file_writer::ParquetWriterBuilder;
-use iceberg::writer::file_writer::location_generator::{
- DefaultFileNameGenerator, DefaultLocationGenerator,
-};
-use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
-use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
-use iceberg::{Catalog, CatalogBuilder, TableCreation};
-use iceberg_catalog_rest::RestCatalogBuilder;
-use parquet::file::properties::WriterProperties;
-
-use crate::get_shared_containers;
-use crate::shared_tests::{random_ns, test_schema};
-
-#[tokio::test]
-async fn test_append_partition_data_file() {
- let fixture = get_shared_containers();
- let rest_catalog = RestCatalogBuilder::default()
- .load("rest", fixture.catalog_config.clone())
- .await
- .unwrap();
- let ns = random_ns().await;
- let schema = test_schema();
-
- let unbound_partition_spec = UnboundPartitionSpec::builder()
- .add_partition_field(2, "id", Transform::Identity)
- .expect("could not add partition field")
- .build();
-
- let partition_spec = unbound_partition_spec
- .bind(schema.clone())
- .expect("could not bind to schema");
-
- let table_creation = TableCreation::builder()
- .name("t1".to_string())
- .schema(schema.clone())
- .partition_spec(partition_spec.clone())
- .build();
-
- let table = rest_catalog
- .create_table(ns.name(), table_creation)
- .await
- .unwrap();
-
- // Create the writer and write the data
- let schema: Arc<arrow_schema::Schema> = Arc::new(
- table
- .metadata()
- .current_schema()
- .as_ref()
- .try_into()
- .unwrap(),
- );
-
- let first_partition_id_value = 100;
- let partition_key = PartitionKey::new(
- partition_spec.clone(),
- table.metadata().current_schema().clone(),
- Struct::from_iter(vec![Some(Literal::int(first_partition_id_value))]),
- );
-
- let location_generator =
DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
- let file_name_generator = DefaultFileNameGenerator::new(
- "test".to_string(),
- None,
- iceberg::spec::DataFileFormat::Parquet,
- );
-
- let parquet_writer_builder = ParquetWriterBuilder::new(
- WriterProperties::default(),
- table.metadata().current_schema().clone(),
- );
-
- let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
- parquet_writer_builder.clone(),
- table.file_io().clone(),
- location_generator.clone(),
- file_name_generator.clone(),
- );
-
- let mut data_file_writer_valid =
- DataFileWriterBuilder::new(rolling_file_writer_builder.clone())
- .build(Some(partition_key.clone()))
- .await
- .unwrap();
-
- let col1 = StringArray::from(vec![Some("foo1"), Some("foo2")]);
- let col2 = Int32Array::from(vec![
- Some(first_partition_id_value),
- Some(first_partition_id_value),
- ]);
- let col3 = BooleanArray::from(vec![Some(true), Some(false)]);
- let batch = RecordBatch::try_new(schema.clone(), vec![
- Arc::new(col1) as ArrayRef,
- Arc::new(col2) as ArrayRef,
- Arc::new(col3) as ArrayRef,
- ])
- .unwrap();
-
- data_file_writer_valid.write(batch.clone()).await.unwrap();
- let data_file_valid = data_file_writer_valid.close().await.unwrap();
-
- // commit result
- let tx = Transaction::new(&table);
- let append_action =
tx.fast_append().add_data_files(data_file_valid.clone());
- let tx = append_action.apply(tx).unwrap();
- let table = tx.commit(&rest_catalog).await.unwrap();
-
- // check result
- let batch_stream = table
- .scan()
- .select_all()
- .build()
- .unwrap()
- .to_arrow()
- .await
- .unwrap();
- let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
- assert_eq!(batches.len(), 1);
- assert_eq!(batches[0], batch);
-
- let partition_key = partition_key.copy_with_data(Struct::from_iter([Some(
- Literal::Primitive(PrimitiveLiteral::Boolean(true)),
- )]));
- test_schema_incompatible_partition_type(
- rolling_file_writer_builder.clone(),
- batch.clone(),
- partition_key.clone(),
- table.clone(),
- &rest_catalog,
- )
- .await;
-
- let partition_key = partition_key.copy_with_data(Struct::from_iter([
- Some(Literal::Primitive(PrimitiveLiteral::Int(
- first_partition_id_value,
- ))),
- Some(Literal::Primitive(PrimitiveLiteral::Int(
- first_partition_id_value,
- ))),
- ]));
- test_schema_incompatible_partition_fields(
- rolling_file_writer_builder.clone(),
- batch,
- partition_key,
- table,
- &rest_catalog,
- )
- .await;
-}
-
-async fn test_schema_incompatible_partition_type(
- rolling_file_writer_builder: RollingFileWriterBuilder<
- ParquetWriterBuilder,
- DefaultLocationGenerator,
- DefaultFileNameGenerator,
- >,
- batch: RecordBatch,
- partition_key: PartitionKey,
- table: Table,
- catalog: &dyn Catalog,
-) {
- // test writing different "type" of partition than mentioned in schema
- let mut data_file_writer_invalid =
DataFileWriterBuilder::new(rolling_file_writer_builder)
- .build(Some(partition_key))
- .await
- .unwrap();
-
- data_file_writer_invalid.write(batch.clone()).await.unwrap();
- let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
-
- let tx = Transaction::new(&table);
- let append_action =
tx.fast_append().add_data_files(data_file_invalid.clone());
- let tx = append_action.apply(tx).unwrap();
-
- if tx.commit(catalog).await.is_ok() {
- panic!("diverging partition info should have returned error");
- }
-}
-
-async fn test_schema_incompatible_partition_fields(
- rolling_file_writer_builder: RollingFileWriterBuilder<
- ParquetWriterBuilder,
- DefaultLocationGenerator,
- DefaultFileNameGenerator,
- >,
- batch: RecordBatch,
- partition_key: PartitionKey,
- table: Table,
- catalog: &dyn Catalog,
-) {
- // test writing different number of partition fields than mentioned in
schema
- let mut data_file_writer_invalid =
DataFileWriterBuilder::new(rolling_file_writer_builder)
- .build(Some(partition_key))
- .await
- .unwrap();
-
- data_file_writer_invalid.write(batch.clone()).await.unwrap();
- let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
-
- let tx = Transaction::new(&table);
- let append_action =
tx.fast_append().add_data_files(data_file_invalid.clone());
- let tx = append_action.apply(tx).unwrap();
- if tx.commit(catalog).await.is_ok() {
- panic!("passing different number of partition fields should have
returned error");
- }
-}
diff --git a/crates/integration_tests/tests/shared_tests/datafusion.rs
b/crates/integration_tests/tests/shared_tests/datafusion.rs
deleted file mode 100644
index 60dd9f36c..000000000
--- a/crates/integration_tests/tests/shared_tests/datafusion.rs
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::HashMap;
-use std::sync::Arc;
-
-use arrow_schema::TimeUnit;
-use datafusion::arrow::datatypes::{DataType, Field, Schema};
-use datafusion::assert_batches_eq;
-use datafusion::catalog::TableProvider;
-use datafusion::error::DataFusionError;
-use datafusion::prelude::SessionContext;
-use iceberg::{Catalog, CatalogBuilder, TableIdent};
-use iceberg_catalog_rest::RestCatalogBuilder;
-use iceberg_datafusion::IcebergStaticTableProvider;
-use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
-
-use crate::get_shared_containers;
-
-#[tokio::test]
-async fn test_basic_queries() -> Result<(), DataFusionError> {
- let fixture = get_shared_containers();
- let rest_catalog = RestCatalogBuilder::default()
- .load("rest", fixture.catalog_config.clone())
- .await
- .unwrap();
-
- let table = rest_catalog
- .load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap())
- .await
- .unwrap();
-
- let ctx = SessionContext::new();
-
- let table_provider = Arc::new(
- IcebergStaticTableProvider::try_new_from_table(table)
- .await
- .unwrap(),
- );
-
- let schema = table_provider.schema();
-
- assert_eq!(
- schema.as_ref(),
- &Schema::new(vec![
- Field::new("cboolean", DataType::Boolean,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "1".to_string(),
- )])),
- Field::new("ctinyint", DataType::Int32,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "2".to_string(),
- )])),
- Field::new("csmallint", DataType::Int32,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "3".to_string(),
- )])),
- Field::new("cint", DataType::Int32,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "4".to_string(),
- )])),
- Field::new("cbigint", DataType::Int64,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )])),
- Field::new("cfloat", DataType::Float32,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )])),
- Field::new("cdouble", DataType::Float64,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )])),
- Field::new("cdecimal", DataType::Decimal128(8, 2),
true).with_metadata(HashMap::from(
- [(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)]
- )),
- Field::new("cdate", DataType::Date32,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "9".to_string(),
- )])),
- Field::new(
- "ctimestamp_ntz",
- DataType::Timestamp(TimeUnit::Microsecond, None),
- true
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "10".to_string(),
- )])),
- Field::new(
- "ctimestamp",
- DataType::Timestamp(TimeUnit::Microsecond,
Some(Arc::from("+00:00"))),
- true
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "11".to_string(),
- )])),
- Field::new("cstring", DataType::Utf8,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "12".to_string(),
- )])),
- Field::new("cbinary", DataType::LargeBinary,
true).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "13".to_string(),
- )])),
- ])
- );
-
- ctx.register_table("types_table", table_provider)?;
-
- let batches = ctx
- .sql("SELECT * FROM types_table ORDER BY cbigint LIMIT 3")
- .await?
- .collect()
- .await?;
- let expected = [
-
"+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+",
- "| cboolean | ctinyint | csmallint | cint | cbigint | cfloat | cdouble
| cdecimal | cdate | ctimestamp_ntz | ctimestamp | cstring
| cbinary |",
-
"+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+",
- "| false | -128 | 0 | 0 | 0 | 0.0 | 0.0
| 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0
| 00000000 |",
- "| true | -127 | 1 | 1 | 1 | 1.0 | 1.0
| 0.01 | 1970-01-02 | 1970-01-01T00:00:01 | 1970-01-01T00:00:01Z | 1
| 00000001 |",
- "| false | -126 | 2 | 2 | 2 | 2.0 | 2.0
| 0.02 | 1970-01-03 | 1970-01-01T00:00:02 | 1970-01-01T00:00:02Z | 2
| 00000002 |",
-
"+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+",
- ];
- assert_batches_eq!(expected, &batches);
- Ok(())
-}
diff --git a/crates/integration_tests/tests/shared_tests/mod.rs
b/crates/integration_tests/tests/shared_tests/mod.rs
index 1e695c588..8fd56ab52 100644
--- a/crates/integration_tests/tests/shared_tests/mod.rs
+++ b/crates/integration_tests/tests/shared_tests/mod.rs
@@ -23,13 +23,9 @@ use iceberg_catalog_rest::RestCatalogBuilder;
use crate::get_shared_containers;
-mod append_data_file_test;
-mod append_partition_data_file_test;
mod conflict_commit_test;
-mod datafusion;
mod read_evolved_schema;
mod read_positional_deletes;
-mod scan_all_type;
pub async fn random_ns() -> Namespace {
let fixture = get_shared_containers();
diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs
b/crates/integration_tests/tests/shared_tests/scan_all_type.rs
deleted file mode 100644
index 7a2907d4c..000000000
--- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs
+++ /dev/null
@@ -1,366 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Integration tests for rest catalog.
-
-use std::collections::HashMap;
-use std::sync::Arc;
-
-use arrow_array::builder::{Int32Builder, ListBuilder, MapBuilder,
StringBuilder};
-use arrow_array::{
- Array, ArrayRef, BooleanArray, Date32Array, Decimal128Array,
FixedSizeBinaryArray,
- Float32Array, Float64Array, Int32Array, Int64Array, LargeBinaryArray,
MapArray, RecordBatch,
- StringArray, StructArray, Time64MicrosecondArray,
TimestampMicrosecondArray,
-};
-use arrow_schema::{DataType, Field, Fields};
-use futures::TryStreamExt;
-use iceberg::arrow::{DEFAULT_MAP_FIELD_NAME, UTC_TIME_ZONE};
-use iceberg::spec::{
- LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME,
MapType, NestedField,
- PrimitiveType, Schema, StructType, Type,
-};
-use iceberg::transaction::{ApplyTransactionAction, Transaction};
-use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
-use iceberg::writer::file_writer::ParquetWriterBuilder;
-use iceberg::writer::file_writer::location_generator::{
- DefaultFileNameGenerator, DefaultLocationGenerator,
-};
-use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
-use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
-use iceberg::{Catalog, CatalogBuilder, TableCreation};
-use iceberg_catalog_rest::RestCatalogBuilder;
-use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
-use parquet::file::properties::WriterProperties;
-use uuid::Uuid;
-
-use crate::get_shared_containers;
-use crate::shared_tests::random_ns;
-
-#[tokio::test]
-async fn test_scan_all_type() {
- let fixture = get_shared_containers();
- let rest_catalog = RestCatalogBuilder::default()
- .load("rest", fixture.catalog_config.clone())
- .await
- .unwrap();
- let ns = random_ns().await;
-
- let schema = Schema::builder()
- .with_schema_id(1)
- .with_identifier_field_ids(vec![2])
- .with_fields(vec![
- // test all type
- NestedField::required(1, "int",
Type::Primitive(PrimitiveType::Int)).into(),
- NestedField::required(2, "long",
Type::Primitive(PrimitiveType::Long)).into(),
- NestedField::required(3, "float",
Type::Primitive(PrimitiveType::Float)).into(),
- NestedField::required(4, "double",
Type::Primitive(PrimitiveType::Double)).into(),
- NestedField::required(
- 5,
- "decimal",
- Type::Primitive(PrimitiveType::Decimal {
- precision: 20,
- scale: 5,
- }),
- )
- .into(),
- NestedField::required(6, "string",
Type::Primitive(PrimitiveType::String)).into(),
- NestedField::required(7, "boolean",
Type::Primitive(PrimitiveType::Boolean)).into(),
- NestedField::required(8, "binary",
Type::Primitive(PrimitiveType::Binary)).into(),
- NestedField::required(9, "date",
Type::Primitive(PrimitiveType::Date)).into(),
- NestedField::required(10, "time",
Type::Primitive(PrimitiveType::Time)).into(),
- NestedField::required(11, "timestamp",
Type::Primitive(PrimitiveType::Timestamp))
- .into(),
- NestedField::required(12, "fixed",
Type::Primitive(PrimitiveType::Fixed(10))).into(),
- NestedField::required(13, "uuid",
Type::Primitive(PrimitiveType::Uuid)).into(),
- NestedField::required(
- 14,
- "timestamptz",
- Type::Primitive(PrimitiveType::Timestamptz),
- )
- .into(),
- NestedField::required(
- 15,
- "struct",
- Type::Struct(StructType::new(vec![
- NestedField::required(18, "int",
Type::Primitive(PrimitiveType::Int)).into(),
- NestedField::required(19, "string",
Type::Primitive(PrimitiveType::String))
- .into(),
- ])),
- )
- .into(),
- NestedField::required(
- 16,
- "list",
- Type::List(ListType::new(
- NestedField::list_element(20,
Type::Primitive(PrimitiveType::Int), true).into(),
- )),
- )
- .into(),
- NestedField::required(
- 17,
- "map",
- Type::Map(MapType::new(
- NestedField::map_key_element(21,
Type::Primitive(PrimitiveType::Int)).into(),
- NestedField::map_value_element(
- 22,
- Type::Primitive(PrimitiveType::String),
- true,
- )
- .into(),
- )),
- )
- .into(),
- ])
- .build()
- .unwrap();
-
- let table_creation = TableCreation::builder()
- .name("t1".to_string())
- .schema(schema.clone())
- .build();
-
- let table = rest_catalog
- .create_table(ns.name(), table_creation)
- .await
- .unwrap();
-
- // Create the writer and write the data
- let schema: Arc<arrow_schema::Schema> = Arc::new(
- table
- .metadata()
- .current_schema()
- .as_ref()
- .try_into()
- .unwrap(),
- );
- let location_generator =
DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
- let file_name_generator = DefaultFileNameGenerator::new(
- "test".to_string(),
- None,
- iceberg::spec::DataFileFormat::Parquet,
- );
- let parquet_writer_builder = ParquetWriterBuilder::new(
- WriterProperties::default(),
- table.metadata().current_schema().clone(),
- );
- let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
- parquet_writer_builder,
- table.file_io().clone(),
- location_generator.clone(),
- file_name_generator.clone(),
- );
- let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_file_writer_builder);
- let mut data_file_writer =
data_file_writer_builder.build(None).await.unwrap();
-
- // Prepare data
- let col1 = Int32Array::from(vec![1, 2, 3, 4, 5]);
- let col2 = Int64Array::from(vec![1, 2, 3, 4, 5]);
- let col3 = Float32Array::from(vec![1.1, 2.2, 3.3, 4.4, 5.5]);
- let col4 = Float64Array::from(vec![1.1, 2.2, 3.3, 4.4, 5.5]);
- let col5 = Decimal128Array::from(vec![
- Some(1.into()),
- Some(2.into()),
- Some(3.into()),
- Some(4.into()),
- Some(5.into()),
- ])
- .with_data_type(DataType::Decimal128(20, 5));
- let col6 = StringArray::from(vec!["a", "b", "c", "d", "e"]);
- let col7 = BooleanArray::from(vec![true, false, true, false, true]);
- let col8 = LargeBinaryArray::from_opt_vec(vec![
- Some(b"a"),
- Some(b"b"),
- Some(b"c"),
- Some(b"d"),
- Some(b"e"),
- ]);
- let col9 = Date32Array::from(vec![1, 2, 3, 4, 5]);
- let col10 = Time64MicrosecondArray::from(vec![1, 2, 3, 4, 5]);
- let col11 = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5]);
- let col12 = FixedSizeBinaryArray::try_from_iter(
- vec![
- vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
- vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
- vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
- vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
- vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
- ]
- .into_iter(),
- )
- .unwrap();
- assert_eq!(col12.data_type(), &DataType::FixedSizeBinary(10));
- let col13 = FixedSizeBinaryArray::try_from_iter(
- vec![
- Uuid::new_v4().as_bytes().to_vec(),
- Uuid::new_v4().as_bytes().to_vec(),
- Uuid::new_v4().as_bytes().to_vec(),
- Uuid::new_v4().as_bytes().to_vec(),
- Uuid::new_v4().as_bytes().to_vec(),
- ]
- .into_iter(),
- )
- .unwrap();
- assert_eq!(col13.data_type(), &DataType::FixedSizeBinary(16));
- let col14 = TimestampMicrosecondArray::from(vec![1, 2, 3, 4,
5]).with_timezone(UTC_TIME_ZONE);
- let col15 = StructArray::from(vec![
- (
- Arc::new(
- Field::new("int", DataType::Int32,
false).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- 18.to_string(),
- )])),
- ),
- Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as ArrayRef,
- ),
- (
- Arc::new(
- Field::new("string", DataType::Utf8,
false).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- 19.to_string(),
- )])),
- ),
- Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])) as
ArrayRef,
- ),
- ]);
- let col16 = {
- let mut builder =
ListBuilder::new(Int32Builder::new()).with_field(Arc::new(
- Field::new(LIST_FIELD_NAME, DataType::Int32,
false).with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- 20.to_string(),
- )])),
- ));
- builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]);
- builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]);
- builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]);
- builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]);
- builder.append_value([Some(1), Some(2), Some(3), Some(4), Some(5)]);
- builder.finish()
- };
- let col17 = {
- let string_builder = StringBuilder::new();
- let int_builder = Int32Builder::with_capacity(4);
- let mut builder = MapBuilder::new(None, int_builder, string_builder);
- builder.keys().append_value(1);
- builder.values().append_value("a");
- builder.append(true).unwrap();
- builder.keys().append_value(2);
- builder.values().append_value("b");
- builder.append(true).unwrap();
- builder.keys().append_value(3);
- builder.values().append_value("c");
- builder.append(true).unwrap();
- builder.keys().append_value(4);
- builder.values().append_value("d");
- builder.append(true).unwrap();
- builder.keys().append_value(5);
- builder.values().append_value("e");
- builder.append(true).unwrap();
- let array = builder.finish();
- let (_field, offsets, entries, nulls, ordered) = array.into_parts();
- let new_struct_fields = Fields::from(vec![
- Field::new(MAP_KEY_FIELD_NAME, DataType::Int32,
false).with_metadata(HashMap::from([
- (PARQUET_FIELD_ID_META_KEY.to_string(), 21.to_string()),
- ])),
- Field::new(MAP_VALUE_FIELD_NAME, DataType::Utf8,
false).with_metadata(HashMap::from([
- (PARQUET_FIELD_ID_META_KEY.to_string(), 22.to_string()),
- ])),
- ]);
- let entries = {
- let (_, arrays, nulls) = entries.into_parts();
- StructArray::new(new_struct_fields.clone(), arrays, nulls)
- };
- let field = Arc::new(Field::new(
- DEFAULT_MAP_FIELD_NAME,
- DataType::Struct(new_struct_fields),
- false,
- ));
- MapArray::new(field, offsets, entries, nulls, ordered)
- };
-
- let batch = RecordBatch::try_new(schema.clone(), vec![
- Arc::new(col1) as ArrayRef,
- Arc::new(col2) as ArrayRef,
- Arc::new(col3) as ArrayRef,
- Arc::new(col4) as ArrayRef,
- Arc::new(col5) as ArrayRef,
- Arc::new(col6) as ArrayRef,
- Arc::new(col7) as ArrayRef,
- Arc::new(col8) as ArrayRef,
- Arc::new(col9) as ArrayRef,
- Arc::new(col10) as ArrayRef,
- Arc::new(col11) as ArrayRef,
- Arc::new(col12) as ArrayRef,
- Arc::new(col13) as ArrayRef,
- Arc::new(col14) as ArrayRef,
- Arc::new(col15) as ArrayRef,
- Arc::new(col16) as ArrayRef,
- Arc::new(col17) as ArrayRef,
- ])
- .unwrap();
- data_file_writer.write(batch.clone()).await.unwrap();
- let data_file = data_file_writer.close().await.unwrap();
-
- // commit result
- let tx = Transaction::new(&table);
- let append_action = tx.fast_append().add_data_files(data_file.clone());
- let tx = append_action.apply(tx).unwrap();
- let table = tx.commit(&rest_catalog).await.unwrap();
-
- // check result
- let batch_stream = table
- .scan()
- .select(vec![
- "int",
- "long",
- "float",
- "double",
- "decimal",
- "string",
- "boolean",
- "binary",
- "date",
- "time",
- "timestamp",
- "fixed",
- "uuid",
- "timestamptz",
- "struct",
- "list",
- "map",
- ])
- .build()
- .unwrap()
- .to_arrow()
- .await
- .unwrap();
- let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
- assert_eq!(batches.len(), 1);
- assert_eq!(batches[0], batch);
-
- // check result
- let batch_stream = table
- .scan()
- .select_all()
- .build()
- .unwrap()
- .to_arrow()
- .await
- .unwrap();
- let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
- assert_eq!(batches.len(), 1);
- assert_eq!(batches[0], batch);
-}
diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml
b/crates/sqllogictest/testdata/schedules/df_test.toml
index 8f4860043..ef5a2e492 100644
--- a/crates/sqllogictest/testdata/schedules/df_test.toml
+++ b/crates/sqllogictest/testdata/schedules/df_test.toml
@@ -30,6 +30,14 @@ slt = "df_test/create_table.slt"
engine = "df"
slt = "df_test/insert_into.slt"
+[[steps]]
+engine = "df"
+slt = "df_test/scan_all_types.slt"
+
+[[steps]]
+engine = "df"
+slt = "df_test/basic_queries.slt"
+
[[steps]]
engine = "df"
slt = "df_test/binary_predicate_pushdown.slt"
diff --git a/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt
b/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt
new file mode 100644
index 000000000..5d8889f15
--- /dev/null
+++ b/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt
@@ -0,0 +1,212 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test basic SELECT queries with ORDER BY, LIMIT, and various clauses
+# This covers the functionality from the original datafusion.rs integration
test
+
+# Create a test table with multiple columns for query testing
+statement ok
+CREATE TABLE default.default.query_test_table (
+ id INT NOT NULL,
+ name STRING,
+ score DOUBLE,
+ category STRING
+)
+
+# Insert test data
+query I
+INSERT INTO default.default.query_test_table VALUES
+ (1, 'Alice', 95.5, 'A'),
+ (2, 'Bob', 87.0, 'B'),
+ (3, 'Charlie', 92.5, 'A'),
+ (4, 'Diana', 78.5, 'C'),
+ (5, 'Eve', 88.0, 'B'),
+ (6, 'Frank', 91.0, 'A'),
+ (7, 'Grace', 85.5, 'B'),
+ (8, 'Henry', 79.0, 'C'),
+ (9, 'Ivy', 96.5, 'A'),
+ (10, 'Jack', 82.0, 'C')
+----
+10
+
+# Test SELECT * with ORDER BY and LIMIT
+query ITRT
+SELECT * FROM default.default.query_test_table ORDER BY id LIMIT 3
+----
+1 Alice 95.5 A
+2 Bob 87 B
+3 Charlie 92.5 A
+
+# Test ORDER BY DESC with LIMIT
+query ITRT
+SELECT * FROM default.default.query_test_table ORDER BY score DESC LIMIT 5
+----
+9 Ivy 96.5 A
+1 Alice 95.5 A
+3 Charlie 92.5 A
+6 Frank 91 A
+5 Eve 88 B
+
+# Test ORDER BY with multiple columns
+query ITR
+SELECT id, name, score FROM default.default.query_test_table ORDER BY
category, score DESC
+----
+9 Ivy 96.5
+1 Alice 95.5
+3 Charlie 92.5
+6 Frank 91
+5 Eve 88
+2 Bob 87
+7 Grace 85.5
+10 Jack 82
+8 Henry 79
+4 Diana 78.5
+
+# Test LIMIT with OFFSET
+query IT
+SELECT id, name FROM default.default.query_test_table ORDER BY id LIMIT 3
OFFSET 5
+----
+6 Frank
+7 Grace
+8 Henry
+
+# Test WHERE clause with ORDER BY
+query ITR
+SELECT id, name, score FROM default.default.query_test_table WHERE score > 90
ORDER BY score DESC
+----
+9 Ivy 96.5
+1 Alice 95.5
+3 Charlie 92.5
+6 Frank 91
+
+# Test WHERE clause with string comparison
+query ITT rowsort
+SELECT id, name, category FROM default.default.query_test_table WHERE category
= 'A'
+----
+1 Alice A
+3 Charlie A
+6 Frank A
+9 Ivy A
+
+# Test aggregation with GROUP BY
+query TIR rowsort
+SELECT category, COUNT(*), AVG(score) FROM default.default.query_test_table
GROUP BY category
+----
+A 4 93.875
+B 3 86.833333333333
+C 3 79.833333333333
+
+# Test aggregation with HAVING clause
+query TI rowsort
+SELECT category, COUNT(*) as cnt FROM default.default.query_test_table GROUP
BY category HAVING COUNT(*) >= 4
+----
+A 4
+
+# Test DISTINCT
+query T rowsort
+SELECT DISTINCT category FROM default.default.query_test_table
+----
+A
+B
+C
+
+# Test COUNT with DISTINCT
+query I
+SELECT COUNT(DISTINCT category) FROM default.default.query_test_table
+----
+3
+
+# Test MIN and MAX
+query RR
+SELECT MIN(score), MAX(score) FROM default.default.query_test_table
+----
+78.5 96.5
+
+# Test SUM
+query R
+SELECT SUM(score) FROM default.default.query_test_table
+----
+875.5
+
+# Test combined WHERE, ORDER BY, and LIMIT
+query ITR
+SELECT id, name, score FROM default.default.query_test_table
+WHERE category IN ('A', 'B') AND score > 85
+ORDER BY score DESC
+LIMIT 4
+----
+9 Ivy 96.5
+1 Alice 95.5
+3 Charlie 92.5
+6 Frank 91
+
+# Test aliasing
+query IT
+SELECT id AS user_id, name AS user_name FROM default.default.query_test_table
ORDER BY user_id LIMIT 2
+----
+1 Alice
+2 Bob
+
+# Test expression in SELECT
+query ITR
+SELECT id, name, score * 1.1 AS adjusted_score FROM
default.default.query_test_table ORDER BY id LIMIT 3
+----
+1 Alice 105.05
+2 Bob 95.7
+3 Charlie 101.75
+
+# Test BETWEEN
+query IT rowsort
+SELECT id, name FROM default.default.query_test_table WHERE score BETWEEN 85
AND 92
+----
+2 Bob
+5 Eve
+6 Frank
+7 Grace
+
+# Test NOT IN
+query IT rowsort
+SELECT id, name FROM default.default.query_test_table WHERE category NOT IN
('A')
+----
+10 Jack
+2 Bob
+4 Diana
+5 Eve
+7 Grace
+8 Henry
+
+# Test LIKE pattern matching
+query IT rowsort
+SELECT id, name FROM default.default.query_test_table WHERE name LIKE 'A%'
+----
+1 Alice
+
+# Test OR condition
+query ITT rowsort
+SELECT id, name, category FROM default.default.query_test_table WHERE category
= 'A' OR score > 90
+----
+1 Alice A
+3 Charlie A
+6 Frank A
+9 Ivy A
+
+# Test AND condition
+query ITR rowsort
+SELECT id, name, score FROM default.default.query_test_table WHERE category =
'A' AND score > 93
+----
+1 Alice 95.5
+9 Ivy 96.5
diff --git a/crates/sqllogictest/testdata/slts/df_test/scan_all_types.slt
b/crates/sqllogictest/testdata/slts/df_test/scan_all_types.slt
new file mode 100644
index 000000000..d93e46133
--- /dev/null
+++ b/crates/sqllogictest/testdata/slts/df_test/scan_all_types.slt
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test scanning all primitive types supported by Iceberg
+# This tests INT, BIGINT, FLOAT, DOUBLE, DECIMAL, STRING, BOOLEAN, DATE,
TIMESTAMP
+
+# Create a table with all primitive types
+statement ok
+CREATE TABLE default.default.all_primitive_types (
+ col_int INT NOT NULL,
+ col_long BIGINT NOT NULL,
+ col_float FLOAT NOT NULL,
+ col_double DOUBLE NOT NULL,
+ col_decimal DECIMAL(10, 2),
+ col_string STRING,
+ col_boolean BOOLEAN
+)
+
+# Verify the table is initially empty
+query IIRRTTT rowsort
+SELECT * FROM default.default.all_primitive_types
+----
+
+# Insert test data with various primitive types
+query I
+INSERT INTO default.default.all_primitive_types VALUES
+ (1, 100, 1.5, 2.5, 123.45, 'hello', true),
+ (2, 200, 2.5, 3.5, 234.56, 'world', false),
+ (3, 300, 3.5, 4.5, 345.67, NULL, true),
+ (-1, -100, -1.5, -2.5, -123.45, 'negative', false),
+ (0, 0, 0.0, 0.0, 0.00, 'zero', NULL)
+----
+5
+
+# Verify all inserted rows
+query IIRRTTT rowsort
+SELECT * FROM default.default.all_primitive_types
+----
+-1 -100 -1.5 -2.5 -123.45 negative false
+0 0 0 0 0 zero NULL
+1 100 1.5 2.5 123.45 hello true
+2 200 2.5 3.5 234.56 world false
+3 300 3.5 4.5 345.67 NULL true
+
+# Test selecting specific columns with string filter
+query IT rowsort
+SELECT col_int, col_string FROM default.default.all_primitive_types WHERE
col_string = 'hello'
+----
+1 hello
+
+# Test numeric comparisons
+query IR rowsort
+SELECT col_int, col_decimal FROM default.default.all_primitive_types WHERE
col_decimal > 200.00
+----
+2 234.56
+3 345.67
+
+# Test filtering on int column
+query IIRRTTT rowsort
+SELECT * FROM default.default.all_primitive_types WHERE col_int >= 0 ORDER BY
col_int
+----
+0 0 0 0 0 zero NULL
+1 100 1.5 2.5 123.45 hello true
+2 200 2.5 3.5 234.56 world false
+3 300 3.5 4.5 345.67 NULL true
+
+# Test filtering with NULL
+query IT rowsort
+SELECT col_int, col_string FROM default.default.all_primitive_types WHERE
col_string IS NOT NULL
+----
+-1 negative
+0 zero
+1 hello
+2 world
+
+# Test aggregations on numeric types
+query IIRR
+SELECT COUNT(*), SUM(col_int), AVG(col_float), MAX(col_double) FROM
default.default.all_primitive_types
+----
+5 5 1.2 4.5
+
+# Create a table with date and timestamp types
+statement ok
+CREATE TABLE default.default.temporal_types (
+ id INT NOT NULL,
+ col_date DATE,
+ col_timestamp TIMESTAMP
+)
+
+# Insert temporal data
+query I
+INSERT INTO default.default.temporal_types VALUES
+ (1, '2024-01-15', '2024-01-15 10:30:00'),
+ (2, '2024-06-20', '2024-06-20 14:45:30'),
+ (3, '2023-12-31', '2023-12-31 23:59:59'),
+ (4, NULL, NULL)
+----
+4
+
+# Verify temporal data
+query IDP rowsort
+SELECT * FROM default.default.temporal_types
+----
+1 2024-01-15 2024-01-15T10:30:00
+2 2024-06-20 2024-06-20T14:45:30
+3 2023-12-31 2023-12-31T23:59:59
+4 NULL NULL
+
+# Test date comparisons
+query ID rowsort
+SELECT id, col_date FROM default.default.temporal_types WHERE col_date >=
'2024-01-01'
+----
+1 2024-01-15
+2 2024-06-20
diff --git a/crates/test_utils/Cargo.toml b/crates/test_utils/Cargo.toml
index 58f930779..383448881 100644
--- a/crates/test_utils/Cargo.toml
+++ b/crates/test_utils/Cargo.toml
@@ -27,7 +27,6 @@ repository = { workspace = true }
[dependencies]
iceberg = { workspace = true }
-tracing = { workspace = true }
tracing-subscriber = { workspace = true }
[features]
diff --git a/crates/test_utils/src/cmd.rs b/crates/test_utils/src/cmd.rs
deleted file mode 100644
index e5e3f09e4..000000000
--- a/crates/test_utils/src/cmd.rs
+++ /dev/null
@@ -1,58 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::process::Command;
-
-use tracing::{error, info};
-
-pub fn run_command(mut cmd: Command, desc: impl ToString) -> bool {
- let desc = desc.to_string();
- info!("Starting to {}, command: {:?}", &desc, cmd);
- let exit = cmd.status().unwrap();
- if exit.success() {
- info!("{} succeed!", desc);
- true
- } else {
- error!("{} failed: {:?}", desc, exit);
- false
- }
-}
-
-pub fn get_cmd_output_result(mut cmd: Command, desc: impl ToString) ->
Result<String, String> {
- let desc = desc.to_string();
- info!("Starting to {}, command: {:?}", &desc, cmd);
- let result = cmd.output();
- match result {
- Ok(output) => {
- if output.status.success() {
- info!("{} succeed!", desc);
- Ok(String::from_utf8(output.stdout).unwrap())
- } else {
- Err(format!("{} failed with rc: {:?}", desc, output.status))
- }
- }
- Err(err) => Err(format!("{} failed with error: {}", desc, { err })),
- }
-}
-
-pub fn get_cmd_output(cmd: Command, desc: impl ToString) -> String {
- let result = get_cmd_output_result(cmd, desc);
- match result {
- Ok(output_str) => output_str,
- Err(err) => panic!("{}", err),
- }
-}
diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs
deleted file mode 100644
index 078966ddb..000000000
--- a/crates/test_utils/src/docker.rs
+++ /dev/null
@@ -1,165 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::net::IpAddr;
-use std::process::Command;
-
-use tracing::error;
-
-use crate::cmd::{get_cmd_output, get_cmd_output_result, run_command};
-
-/// A utility to manage the lifecycle of `docker compose`.
-///
-/// It will start `docker compose` when calling the `run` method and will be
stopped via [`Drop`].
-#[derive(Debug)]
-pub struct DockerCompose {
- project_name: String,
- docker_compose_dir: String,
-}
-
-impl DockerCompose {
- pub fn new(project_name: impl ToString, docker_compose_dir: impl ToString)
-> Self {
- Self {
- project_name: project_name.to_string(),
- docker_compose_dir: docker_compose_dir.to_string(),
- }
- }
-
- pub fn project_name(&self) -> &str {
- self.project_name.as_str()
- }
-
- fn get_os_arch() -> String {
- let mut cmd = Command::new("docker");
- cmd.arg("info")
- .arg("--format")
- .arg("{{.OSType}}/{{.Architecture}}");
-
- let result = get_cmd_output_result(cmd, "Get os arch".to_string());
- match result {
- Ok(value) => value.trim().to_string(),
- Err(_err) => {
- // docker/podman do not consistently place OSArch info in the
same json path across OS and versions
- // Below tries an alternative path if the above path fails
- let mut alt_cmd = Command::new("docker");
- alt_cmd
- .arg("info")
- .arg("--format")
- .arg("{{.Version.OsArch}}");
- get_cmd_output(alt_cmd, "Get os arch".to_string())
- .trim()
- .to_string()
- }
- }
- }
-
- pub fn up(&self) {
- let mut cmd = Command::new("docker");
- cmd.current_dir(&self.docker_compose_dir);
-
- cmd.env("DOCKER_DEFAULT_PLATFORM", Self::get_os_arch());
-
- cmd.args(vec![
- "compose",
- "-p",
- self.project_name.as_str(),
- "up",
- "-d",
- "--build",
- "--wait",
- "--timeout",
- "1200000",
- ]);
-
- let ret = run_command(
- cmd,
- format!(
- "Starting docker compose in {}, project name: {}",
- self.docker_compose_dir, self.project_name
- ),
- );
-
- if !ret {
- let mut cmd = Command::new("docker");
- cmd.current_dir(&self.docker_compose_dir);
-
- cmd.env("DOCKER_DEFAULT_PLATFORM", Self::get_os_arch());
-
- cmd.args(vec![
- "compose",
- "-p",
- self.project_name.as_str(),
- "logs",
- "spark-iceberg",
- ]);
- run_command(cmd, "Docker compose logs");
- panic!("Docker compose up failed!")
- }
- }
-
- pub fn down(&self) {
- let mut cmd = Command::new("docker");
- cmd.current_dir(&self.docker_compose_dir);
-
- cmd.args(vec![
- "compose",
- "-p",
- self.project_name.as_str(),
- "down",
- "-v",
- "--remove-orphans",
- ]);
-
- let ret = run_command(
- cmd,
- format!(
- "Stopping docker compose in {}, project name: {}",
- self.docker_compose_dir, self.project_name
- ),
- );
-
- if !ret {
- panic!("Failed to stop docker compose")
- }
- }
-
- pub fn get_container_ip(&self, service_name: impl AsRef<str>) -> IpAddr {
- let container_name = format!("{}-{}-1", self.project_name,
service_name.as_ref());
- let mut cmd = Command::new("docker");
- cmd.arg("inspect")
- .arg("-f")
- .arg("{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}")
- .arg(&container_name);
-
- let ip_result = get_cmd_output(cmd, format!("Get container ip of
{container_name}"))
- .trim()
- .parse::<IpAddr>();
- match ip_result {
- Ok(ip) => ip,
- Err(e) => {
- error!("Invalid IP, {e}");
- panic!("Failed to parse IP for {container_name}")
- }
- }
- }
-}
-
-impl Drop for DockerCompose {
- fn drop(&mut self) {
- self.down()
- }
-}
diff --git a/crates/test_utils/src/lib.rs b/crates/test_utils/src/lib.rs
index 54ec54193..4511231d7 100644
--- a/crates/test_utils/src/lib.rs
+++ b/crates/test_utils/src/lib.rs
@@ -19,11 +19,6 @@
//!
//! It's not intended for use outside of `iceberg-rust`.
-#[cfg(feature = "tests")]
-mod cmd;
-#[cfg(feature = "tests")]
-pub mod docker;
-
#[cfg(feature = "tests")]
pub use common::*;