This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6fa878590 feat(datafusion): Apply SortExec when writing in clustered
mode (#2005)
6fa878590 is described below
commit 6fa878590a70f4078e6fbefa2751513662d421b4
Author: Shawn Chang <[email protected]>
AuthorDate: Wed Jan 7 17:42:45 2026 -0800
feat(datafusion): Apply SortExec when writing in clustered mode (#2005)
## Which issue does this PR close?
- Closes #1540
## What changes are included in this PR?
- When writing in clustered mode, use `sort_by_partition` to sort the
data so the clustered writer can comsume it
## Are these changes tested?
- Added simple uts to verify that the `SortExec` is applied correctly
---
.../datafusion/src/physical_plan/sort.rs | 3 -
crates/integrations/datafusion/src/table/mod.rs | 178 ++++++++++++++++++++-
2 files changed, 177 insertions(+), 4 deletions(-)
diff --git a/crates/integrations/datafusion/src/physical_plan/sort.rs
b/crates/integrations/datafusion/src/physical_plan/sort.rs
index ede254753..587ab120c 100644
--- a/crates/integrations/datafusion/src/physical_plan/sort.rs
+++ b/crates/integrations/datafusion/src/physical_plan/sort.rs
@@ -42,9 +42,6 @@ use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN;
/// # Returns
/// * `Ok(Arc<dyn ExecutionPlan>)` - A SortExec that sorts by partition values
/// * `Err` - If the partition column is not found
-///
-/// TODO remove dead_code mark when integrating with insert_into
-#[allow(dead_code)]
pub(crate) fn sort_by_partition(input: Arc<dyn ExecutionPlan>) ->
DFResult<Arc<dyn ExecutionPlan>> {
let schema = input.schema();
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 86a79611b..ad616542a 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -44,6 +44,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::inspect::MetadataTableType;
+use iceberg::spec::TableProperties;
use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
use metadata_table::IcebergMetadataTableProvider;
@@ -53,6 +54,7 @@ use crate::physical_plan::commit::IcebergCommitExec;
use crate::physical_plan::project::project_with_partition;
use crate::physical_plan::repartition::repartition;
use crate::physical_plan::scan::IcebergTableScan;
+use crate::physical_plan::sort::sort_by_partition;
use crate::physical_plan::write::IcebergWriteExec;
/// Catalog-backed table provider with automatic metadata refresh.
@@ -185,9 +187,38 @@ impl TableProvider for IcebergTableProvider {
let repartitioned_plan =
repartition(plan_with_partition, table.metadata_ref(),
target_partitions)?;
+ // Apply sort node when it's not fanout mode
+ let fanout_enabled = table
+ .metadata()
+ .properties()
+ .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
+ .map(|value| {
+ value
+ .parse::<bool>()
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid value for {}, expected 'true' or
'false'",
+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
+ ),
+ )
+ .with_source(e)
+ })
+ .map_err(to_datafusion_error)
+ })
+ .transpose()?
+
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
+
+ let write_input = if fanout_enabled {
+ repartitioned_plan
+ } else {
+ sort_by_partition(repartitioned_plan)?
+ };
+
let write_plan = Arc::new(IcebergWriteExec::new(
table.clone(),
- repartitioned_plan,
+ write_input,
self.schema.clone(),
));
@@ -321,6 +352,7 @@ mod tests {
use std::sync::Arc;
use datafusion::common::Column;
+ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use iceberg::io::FileIO;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
@@ -598,4 +630,148 @@ mod tests {
assert_eq!(logical_field.data_type(), physical_field.data_type());
}
}
+
+ async fn get_partitioned_test_catalog_and_table(
+ fanout_enabled: Option<bool>,
+ ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
+ use iceberg::spec::{Transform, UnboundPartitionSpec};
+
+ let temp_dir = TempDir::new().unwrap();
+ let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
+
+ let catalog = MemoryCatalogBuilder::default()
+ .load(
+ "memory",
+ HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(),
warehouse_path.clone())]),
+ )
+ .await
+ .unwrap();
+
+ let namespace = NamespaceIdent::new("test_ns".to_string());
+ catalog
+ .create_namespace(&namespace, HashMap::new())
+ .await
+ .unwrap();
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "category",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec = UnboundPartitionSpec::builder()
+ .with_spec_id(0)
+ .add_partition_field(2, "category", Transform::Identity)
+ .unwrap()
+ .build();
+
+ let mut properties = HashMap::new();
+ if let Some(enabled) = fanout_enabled {
+ properties.insert(
+
iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
+ .to_string(),
+ enabled.to_string(),
+ );
+ }
+
+ let table_creation = TableCreation::builder()
+ .name("partitioned_table".to_string())
+ .location(format!("{warehouse_path}/partitioned_table"))
+ .schema(schema)
+ .partition_spec(partition_spec)
+ .properties(properties)
+ .build();
+
+ catalog
+ .create_table(&namespace, table_creation)
+ .await
+ .unwrap();
+
+ (
+ Arc::new(catalog),
+ namespace,
+ "partitioned_table".to_string(),
+ temp_dir,
+ )
+ }
+
+ /// Helper to check if a plan contains a SortExec node
+ fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
+ if plan.name() == "SortExec" {
+ return true;
+ }
+ for child in plan.children() {
+ if plan_contains_sort(child) {
+ return true;
+ }
+ }
+ false
+ }
+
+ #[tokio::test]
+ async fn test_insert_plan_fanout_enabled_no_sort() {
+ use datafusion::datasource::TableProvider;
+ use datafusion::logical_expr::dml::InsertOp;
+ use datafusion::physical_plan::empty::EmptyExec;
+
+ // When fanout is enabled (default), no sort node should be added
+ let (catalog, namespace, table_name, _temp_dir) =
+ get_partitioned_test_catalog_and_table(Some(true)).await;
+
+ let provider =
+ IcebergTableProvider::try_new(catalog.clone(), namespace.clone(),
table_name.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let input_schema = provider.schema();
+ let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn
ExecutionPlan>;
+
+ let state = ctx.state();
+ let insert_plan = provider
+ .insert_into(&state, input, InsertOp::Append)
+ .await
+ .unwrap();
+
+ // With fanout enabled, there should be no SortExec in the plan
+ assert!(
+ !plan_contains_sort(&insert_plan),
+ "Plan should NOT contain SortExec when fanout is enabled"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_insert_plan_fanout_disabled_has_sort() {
+ use datafusion::datasource::TableProvider;
+ use datafusion::logical_expr::dml::InsertOp;
+ use datafusion::physical_plan::empty::EmptyExec;
+
+ // When fanout is disabled, a sort node should be added
+ let (catalog, namespace, table_name, _temp_dir) =
+ get_partitioned_test_catalog_and_table(Some(false)).await;
+
+ let provider =
+ IcebergTableProvider::try_new(catalog.clone(), namespace.clone(),
table_name.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let input_schema = provider.schema();
+ let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn
ExecutionPlan>;
+
+ let state = ctx.state();
+ let insert_plan = provider
+ .insert_into(&state, input, InsertOp::Append)
+ .await
+ .unwrap();
+
+ // With fanout disabled, there should be a SortExec in the plan
+ assert!(
+ plan_contains_sort(&insert_plan),
+ "Plan should contain SortExec when fanout is disabled"
+ );
+ }
}