toutane commented on code in PR #2298:
URL: https://github.com/apache/iceberg-rust/pull/2298#discussion_r3419277261
##########
crates/integrations/datafusion/src/table/mod.rs:
##########
@@ -865,4 +933,382 @@ mod tests {
"Limit should be None when not specified"
);
}
+
+ // ── Bucketed scan tests
──────────────────────────────────────────────────
+
+ async fn make_catalog_and_table_for_bucketing()
+ -> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
+ use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+ use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+ use iceberg::{CatalogBuilder, TableCreation};
+
+ let temp_dir = tempfile::TempDir::new().unwrap();
+ let warehouse = temp_dir.path().to_str().unwrap().to_string();
+
+ let catalog = Arc::new(
+ MemoryCatalogBuilder::default()
+ .load(
+ "memory",
+ std::collections::HashMap::from([(
+ MEMORY_CATALOG_WAREHOUSE.to_string(),
+ warehouse.clone(),
+ )]),
+ )
+ .await
+ .unwrap(),
+ );
+
+ let namespace = NamespaceIdent::new("ns".to_string());
+ catalog
+ .create_namespace(&namespace, std::collections::HashMap::new())
+ .await
+ .unwrap();
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ catalog
+ .create_table(
+ &namespace,
+ TableCreation::builder()
+ .name("t".to_string())
+ .location(format!("{warehouse}/t"))
+ .schema(schema)
+ .properties(std::collections::HashMap::new())
+ .build(),
+ )
+ .await
+ .unwrap();
+
+ (catalog, namespace, "t".to_string(), temp_dir)
+ }
+
+ /// Registers `n` synthetic data files in the table metadata via the
iceberg
+ /// transaction API. No actual parquet files are written, only the metadata
+ /// entries that `plan_files()` reads are created.
+ async fn append_fake_data_files(
+ catalog: &Arc<dyn Catalog>,
+ namespace: &NamespaceIdent,
+ table_name: &str,
+ n: usize,
+ ) {
+ use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat};
+ use iceberg::transaction::{ApplyTransactionAction, Transaction};
+
+ let table = catalog
+ .load_table(&TableIdent::new(namespace.clone(),
table_name.to_string()))
+ .await
+ .unwrap();
+
+ let data_files = (0..n)
+ .map(|i| {
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(format!(
+ "{}/data/fake_{i}.parquet",
+ table.metadata().location()
+ ))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(128)
+ .record_count(1)
+
.partition_spec_id(table.metadata().default_partition_spec_id())
+ .build()
+ .unwrap()
+ })
+ .collect::<Vec<_>>();
+
+ let tx = Transaction::new(&table);
+ let action = tx.fast_append().add_data_files(data_files);
+ action
+ .apply(tx)
+ .unwrap()
+ .commit(catalog.as_ref())
+ .await
+ .unwrap();
+ }
+
+ fn ctx_with_target_partitions(n: usize) -> SessionContext {
+ use datafusion::prelude::SessionConfig;
+
SessionContext::new_with_config(SessionConfig::new().with_target_partitions(n))
+ }
+
+ /// An empty table must produce a single empty-bucket scan so that
DataFusion
+ /// can schedule the plan normally. execute(0) on an empty bucket simply
+ /// returns an empty record-batch stream.
+ #[tokio::test]
+ async fn test_empty_table_single_empty_bucket() {
+ let (catalog, namespace, table_name, _temp_dir) =
+ make_catalog_and_table_for_bucketing().await;
+ // no files appended
+ let provider = IcebergTableProvider::try_new(catalog, namespace,
table_name)
+ .await
+ .unwrap();
+ let plan = provider
+ .scan(&ctx_with_target_partitions(8).state(), None, &[], None)
+ .await
+ .unwrap();
+ let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+ assert_eq!(scan.buckets().len(), 1);
+ assert_eq!(scan.buckets()[0].len(), 0);
+ assert_eq!(scan.properties().partitioning.partition_count(), 1);
+ }
+
+ /// When the table has no identity-partition columns, every task takes the
+ /// fallback (file_path) bucket path, so the declaration must drop to
+ /// `UnknownPartitioning`. The bucket count should still equal
+ /// min(target_partitions, num_files).
+ #[tokio::test]
+ async fn test_unpartitioned_falls_back_to_unknown() {
+ use datafusion::physical_plan::Partitioning;
+
+ let (catalog, namespace, table_name, _temp_dir) =
+ make_catalog_and_table_for_bucketing().await;
+ append_fake_data_files(&catalog, &namespace, &table_name, 5).await;
+
+ let provider = IcebergTableProvider::try_new(catalog, namespace,
table_name)
+ .await
+ .unwrap();
+ let plan = provider
+ .scan(&ctx_with_target_partitions(3).state(), None, &[], None)
+ .await
+ .unwrap();
+ let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+ let total_files: usize = scan.buckets().iter().map(|b| b.len()).sum();
+ assert_eq!(total_files, 5);
+ assert_eq!(scan.buckets().len(), 3);
+ assert!(matches!(
+ scan.properties().partitioning,
+ Partitioning::UnknownPartitioning(3)
+ ));
+ }
+
+ /// Bucket count must be capped at the number of files: spinning up more
+ /// DataFusion partitions than there are tasks would just leave empty
+ /// streams, wasting scheduler slots.
+ #[tokio::test]
+ async fn test_bucket_count_capped_at_file_count() {
+ let (catalog, namespace, table_name, _temp_dir) =
+ make_catalog_and_table_for_bucketing().await;
+ append_fake_data_files(&catalog, &namespace, &table_name, 2).await;
+
+ let provider = IcebergTableProvider::try_new(catalog, namespace,
table_name)
+ .await
+ .unwrap();
+ let plan = provider
+ .scan(&ctx_with_target_partitions(16).state(), None, &[], None)
+ .await
+ .unwrap();
+ let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+ assert_eq!(scan.buckets().len(), 2);
+ }
+
+ /// target_partitions = 1 collapses every task into a single bucket, giving
+ /// the same execution profile as a single-partition scan.
+ #[tokio::test]
+ async fn test_single_target_partition_single_bucket() {
+ let (catalog, namespace, table_name, _temp_dir) =
+ make_catalog_and_table_for_bucketing().await;
+ append_fake_data_files(&catalog, &namespace, &table_name, 4).await;
+
+ let provider = IcebergTableProvider::try_new(catalog, namespace,
table_name)
+ .await
+ .unwrap();
+ let plan = provider
+ .scan(&ctx_with_target_partitions(1).state(), None, &[], None)
+ .await
+ .unwrap();
+ let scan = plan.as_any().downcast_ref::<IcebergTableScan>().unwrap();
+
+ assert_eq!(scan.buckets().len(), 1);
+ assert_eq!(scan.buckets()[0].len(), 4);
+ }
+
+ async fn make_partitioned_catalog_and_table_for_bucketing()
+ -> (Arc<dyn Catalog>, NamespaceIdent, String, tempfile::TempDir) {
+ use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+ use iceberg::spec::{
+ NestedField, PrimitiveType, Schema, Transform, Type,
UnboundPartitionSpec,
+ };
+ use iceberg::{CatalogBuilder, TableCreation};
+
+ let temp_dir = tempfile::TempDir::new().unwrap();
+ let warehouse = temp_dir.path().to_str().unwrap().to_string();
+
+ let catalog = Arc::new(
+ MemoryCatalogBuilder::default()
+ .load(
+ "memory",
+ std::collections::HashMap::from([(
+ MEMORY_CATALOG_WAREHOUSE.to_string(),
+ warehouse.clone(),
+ )]),
+ )
+ .await
+ .unwrap(),
+ );
+
+ let namespace = NamespaceIdent::new("ns".to_string());
+ catalog
+ .create_namespace(&namespace, std::collections::HashMap::new())
+ .await
+ .unwrap();
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec = UnboundPartitionSpec::builder()
+ .with_spec_id(0)
+ .add_partition_field(2, "name_part", Transform::Identity)
+ .unwrap()
+ .build();
+
+ catalog
+ .create_table(
+ &namespace,
+ TableCreation::builder()
+ .name("t".to_string())
+ .location(format!("{warehouse}/t"))
+ .schema(schema)
+ .partition_spec(partition_spec)
+ .properties(std::collections::HashMap::new())
+ .build(),
+ )
+ .await
+ .unwrap();
+
+ (catalog, namespace, "t".to_string(), temp_dir)
+ }
+
+ /// Like [`append_fake_data_files`] but each file carries a partition tuple
+ /// matching the table's identity-partition spec on `name`.
+ async fn append_partitioned_fake_data_files(
+ catalog: &Arc<dyn Catalog>,
+ namespace: &NamespaceIdent,
+ table_name: &str,
+ partition_values: Vec<&str>,
+ ) {
+ use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat,
Literal, Struct};
+ use iceberg::transaction::{ApplyTransactionAction, Transaction};
+
+ let table = catalog
+ .load_table(&TableIdent::new(namespace.clone(),
table_name.to_string()))
+ .await
+ .unwrap();
+
+ let data_files = partition_values
+ .iter()
+ .enumerate()
+ .map(|(i, value)| {
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(format!(
+ "{}/data/fake_{i}.parquet",
+ table.metadata().location()
+ ))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(128)
+ .record_count(1)
+
.partition_spec_id(table.metadata().default_partition_spec_id())
+
.partition(Struct::from_iter(vec![Some(Literal::string(*value))]))
+ .build()
+ .unwrap()
+ })
+ .collect::<Vec<_>>();
+
+ let tx = Transaction::new(&table);
+ let action = tx.fast_append().add_data_files(data_files);
+ action
+ .apply(tx)
+ .unwrap()
+ .commit(catalog.as_ref())
+ .await
+ .unwrap();
+ }
+
+ /// Identity-partitioned table whose source column is in the projection
+ /// must produce `Partitioning::Hash` referencing that column.
Review Comment:
Added the missing provider-level coverage in
`crates/integrations/datafusion/src/table/mod.rs`:
- Co-location vs DataFusion repartition hash was already covered by
`test_identity_partitioned_hash_buckets_match_datafusion_repartition`
- Added test_spec_evolution_falls_back_to_unknown_partitioning`
- Added
`test_unsupported_output_partition_dtype_falls_back_to_unknown_partitioning`
(timestamp is currently supported by this bucketing path, so I used `Utf8View`
as the unsupported dtype.)
- Added `test_null_partition_value_falls_back_to_unknown_partitioning`
Also kept the existing lower-level null fallback behavior covered in
`bucketing.rs`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]