This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 12c4c862f8 feat: Use `SchemaRef` in `JoinFilter` (#14182)
12c4c862f8 is described below
commit 12c4c862f805a324beea0e24417d8c565f462ed4
Author: irenjj <[email protected]>
AuthorDate: Mon Jan 20 09:07:00 2025 +0800
feat: Use `SchemaRef` in `JoinFilter` (#14182)
* feat: Use `SchemaRef` in `JoinFilter`
* Update datafusion/core/src/physical_optimizer/projection_pushdown.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/physical-plan/src/joins/join_filter.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/physical-plan/src/joins/join_filter.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/physical-plan/src/joins/join_filter.rs
Co-authored-by: Andrew Lamb <[email protected]>
* fix
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../src/physical_optimizer/projection_pushdown.rs | 16 ++++-----
datafusion/core/src/physical_planner.rs | 2 +-
datafusion/core/tests/fuzz_cases/join_fuzz.rs | 7 ++--
.../physical-optimizer/src/join_selection.rs | 2 +-
datafusion/physical-plan/src/joins/hash_join.rs | 42 +++++++++++++++-------
datafusion/physical-plan/src/joins/join_filter.rs | 10 +++---
.../physical-plan/src/joins/nested_loop_join.rs | 12 +++++--
.../physical-plan/src/joins/sort_merge_join.rs | 10 +++---
.../physical-plan/src/joins/stream_join_utils.rs | 9 +++--
.../physical-plan/src/joins/symmetric_hash_join.rs | 36 ++++++++++++-------
datafusion/proto/src/physical_plan/mod.rs | 12 +++----
11 files changed, 98 insertions(+), 60 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 2f1cf3a8e6..cf8d5e352e 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1255,7 +1255,7 @@ fn update_join_filter(
side: col_idx.side,
})
.collect(),
- join_filter.schema().clone(),
+ Arc::clone(join_filter.schema()),
)
})
}
@@ -2246,11 +2246,11 @@ mod tests {
side: JoinSide::Left,
},
],
- Schema::new(vec![
+ Arc::new(Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
- ]),
+ ])),
)),
&JoinType::Inner,
true,
@@ -2360,11 +2360,11 @@ mod tests {
side: JoinSide::Left,
},
],
- Schema::new(vec![
+ Arc::new(Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
- ]),
+ ])),
)),
&JoinType::Inner,
true,
@@ -2462,7 +2462,7 @@ mod tests {
Some(JoinFilter::new(
filter_expr,
filter_column_indices,
- filter_schema,
+ Arc::new(filter_schema),
)),
&JoinType::Inner,
None,
@@ -2536,11 +2536,11 @@ mod tests {
side: JoinSide::Left,
},
],
- Schema::new(vec![
+ Arc::new(Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
- ]),
+ ])),
)),
&JoinType::Inner,
None,
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 77bf4bb71d..03ba5688de 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1090,7 +1090,7 @@ impl DefaultPhysicalPlanner {
Some(join_utils::JoinFilter::new(
filter_expr,
column_indices,
- filter_schema,
+ Arc::new(filter_schema),
))
}
_ => None,
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index 04eb48e41c..41c12193f0 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -86,7 +86,7 @@ fn col_lt_col_filter(schema1: Arc<Schema>, schema2:
Arc<Schema>) -> JoinFilter {
.with_nullable(true),
]);
- JoinFilter::new(less_filter, column_indices, intermediate_schema)
+ JoinFilter::new(less_filter, column_indices, Arc::new(intermediate_schema))
}
#[tokio::test]
@@ -327,7 +327,7 @@ impl JoinFuzzTestCase {
/// on-condition schema
fn intermediate_schema(&self) -> Schema {
let filter_schema = if let Some(filter) = self.join_filter() {
- filter.schema().to_owned()
+ filter.schema().as_ref().to_owned()
} else {
Schema::empty()
};
@@ -483,7 +483,8 @@ impl JoinFuzzTestCase {
let intermediate_schema = self.intermediate_schema();
let expression = self.composite_filter_expression();
- let filter = JoinFilter::new(expression, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(expression, column_indices,
Arc::new(intermediate_schema));
Arc::new(
NestedLoopJoinExec::try_new(left, right, Some(filter),
&self.join_type, None)
diff --git a/datafusion/physical-optimizer/src/join_selection.rs
b/datafusion/physical-optimizer/src/join_selection.rs
index 5f7f1f396a..d5f70938a7 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -716,7 +716,7 @@ mod tests_statistical {
Some(JoinFilter::new(
expression,
column_indices,
- intermediate_schema,
+ Arc::new(intermediate_schema),
))
}
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index 8ccefb0397..d130b42a9d 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -2576,7 +2576,7 @@ mod tests {
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
- intermediate_schema.clone(),
+ Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
@@ -2611,8 +2611,11 @@ mod tests {
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
- let filter =
- JoinFilter::new(filter_expression, column_indices,
intermediate_schema);
+ let filter = JoinFilter::new(
+ filter_expression,
+ column_indices,
+ Arc::new(intermediate_schema),
+ );
let join = join_with_filter(left, right, on, filter,
&JoinType::LeftSemi, false)?;
@@ -2700,7 +2703,7 @@ mod tests {
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
- intermediate_schema.clone(),
+ Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
@@ -2738,8 +2741,11 @@ mod tests {
Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
)) as Arc<dyn PhysicalExpr>;
- let filter =
- JoinFilter::new(filter_expression, column_indices,
intermediate_schema);
+ let filter = JoinFilter::new(
+ filter_expression,
+ column_indices,
+ Arc::new(intermediate_schema.clone()),
+ );
let join =
join_with_filter(left, right, on, filter, &JoinType::RightSemi,
false)?;
@@ -2822,7 +2828,7 @@ mod tests {
let filter = JoinFilter::new(
filter_expression,
column_indices.clone(),
- intermediate_schema.clone(),
+ Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
@@ -2861,8 +2867,11 @@ mod tests {
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
- let filter =
- JoinFilter::new(filter_expression, column_indices,
intermediate_schema);
+ let filter = JoinFilter::new(
+ filter_expression,
+ column_indices,
+ Arc::new(intermediate_schema),
+ );
let join = join_with_filter(left, right, on, filter,
&JoinType::LeftAnti, false)?;
@@ -2951,7 +2960,7 @@ mod tests {
let filter = JoinFilter::new(
filter_expression,
column_indices,
- intermediate_schema.clone(),
+ Arc::new(intermediate_schema.clone()),
);
let join = join_with_filter(
@@ -2995,8 +3004,11 @@ mod tests {
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
- let filter =
- JoinFilter::new(filter_expression, column_indices,
intermediate_schema);
+ let filter = JoinFilter::new(
+ filter_expression,
+ column_indices,
+ Arc::new(intermediate_schema),
+ );
let join =
join_with_filter(left, right, on, filter, &JoinType::RightAnti,
false)?;
@@ -3359,7 +3371,11 @@ mod tests {
Arc::new(Column::new("c", 1)),
)) as Arc<dyn PhysicalExpr>;
- JoinFilter::new(filter_expression, column_indices, intermediate_schema)
+ JoinFilter::new(
+ filter_expression,
+ column_indices,
+ Arc::new(intermediate_schema),
+ )
}
#[apply(batch_sizes)]
diff --git a/datafusion/physical-plan/src/joins/join_filter.rs
b/datafusion/physical-plan/src/joins/join_filter.rs
index b99afd87c9..cfc7ad2c10 100644
--- a/datafusion/physical-plan/src/joins/join_filter.rs
+++ b/datafusion/physical-plan/src/joins/join_filter.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::joins::utils::ColumnIndex;
-use arrow_schema::Schema;
+use arrow_schema::SchemaRef;
use datafusion_common::JoinSide;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::sync::Arc;
@@ -30,7 +30,7 @@ pub struct JoinFilter {
/// Column indices required to construct intermediate batch for filtering
pub(crate) column_indices: Vec<ColumnIndex>,
/// Physical schema of intermediate batch
- pub(crate) schema: Schema,
+ pub(crate) schema: SchemaRef,
}
impl JoinFilter {
@@ -38,7 +38,7 @@ impl JoinFilter {
pub fn new(
expression: Arc<dyn PhysicalExpr>,
column_indices: Vec<ColumnIndex>,
- schema: Schema,
+ schema: SchemaRef,
) -> JoinFilter {
JoinFilter {
expression,
@@ -76,7 +76,7 @@ impl JoinFilter {
}
/// Intermediate batch schema
- pub fn schema(&self) -> &Schema {
+ pub fn schema(&self) -> &SchemaRef {
&self.schema
}
@@ -94,7 +94,7 @@ impl JoinFilter {
JoinFilter::new(
Arc::clone(self.expression()),
column_indices,
- self.schema().clone(),
+ Arc::clone(self.schema()),
)
}
}
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index f416694e38..d2fa2fdc7b 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -1104,7 +1104,11 @@ pub(crate) mod tests {
Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter))
as Arc<dyn PhysicalExpr>;
- JoinFilter::new(filter_expression, column_indices, intermediate_schema)
+ JoinFilter::new(
+ filter_expression,
+ column_indices,
+ Arc::new(intermediate_schema),
+ )
}
pub(crate) async fn multi_partitioned_join_collect(
@@ -1514,7 +1518,11 @@ pub(crate) mod tests {
Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter))
as Arc<dyn PhysicalExpr>;
- JoinFilter::new(filter_expression, column_indices, intermediate_schema)
+ JoinFilter::new(
+ filter_expression,
+ column_indices,
+ Arc::new(intermediate_schema),
+ )
}
fn generate_columns(num_columns: usize, num_rows: usize) -> Vec<Vec<i32>> {
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index bcacc7dcae..650d4c1de1 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -1759,10 +1759,8 @@ impl SortMergeJoinStream {
if !filter_columns.is_empty() {
if let Some(f) = &self.filter {
// Construct batch with only filter columns
- let filter_batch = RecordBatch::try_new(
- Arc::new(f.schema().clone()),
- filter_columns,
- )?;
+ let filter_batch =
+ RecordBatch::try_new(Arc::clone(f.schema()),
filter_columns)?;
let filter_result = f
.expression()
@@ -3182,10 +3180,10 @@ mod tests {
side: JoinSide::Right,
},
],
- Schema::new(vec![
+ Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
- ]),
+ ])),
);
let (_, batches) =
join_collect_with_filter(left, right, on, filter,
RightAnti).await?;
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index cea04ccad3..6d4f06b3ae 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -856,7 +856,8 @@ pub mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
let left_sort_filter_expr = build_filter_input_order(
JoinSide::Left,
@@ -983,7 +984,8 @@ pub mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
let left_schema = Arc::new(left_schema);
let right_schema = Arc::new(right_schema);
@@ -1055,7 +1057,8 @@ pub mod tests {
side: JoinSide::Left,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 72fd5a0feb..b050d3adfe 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -1799,7 +1799,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -1864,7 +1865,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -1916,7 +1918,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -2013,7 +2016,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -2071,7 +2075,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
}
@@ -2129,7 +2134,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -2189,7 +2195,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -2246,7 +2253,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -2310,7 +2318,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -2394,7 +2403,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
}
@@ -2468,7 +2478,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
@@ -2539,7 +2550,8 @@ mod tests {
side: JoinSide::Right,
},
];
- let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+ let filter =
+ JoinFilter::new(filter_expr, column_indices,
Arc::new(intermediate_schema));
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 25fca611df..a266d55b46 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -625,7 +625,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
})
.collect::<Result<Vec<_>>>()?;
- Ok(JoinFilter::new(expression, column_indices, schema))
+ Ok(JoinFilter::new(expression, column_indices,
Arc::new(schema)))
})
.map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
@@ -739,7 +739,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
})
.collect::<Result<_>>()?;
- Ok(JoinFilter::new(expression, column_indices, schema))
+ Ok(JoinFilter::new(expression, column_indices,
Arc::new(schema)))
})
.map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
@@ -992,7 +992,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
})
.collect::<Result<Vec<_>>>()?;
- Ok(JoinFilter::new(expression, column_indices, schema))
+ Ok(JoinFilter::new(expression, column_indices,
Arc::new(schema)))
})
.map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
@@ -1316,7 +1316,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
}
})
.collect();
- let schema = f.schema().try_into()?;
+ let schema = f.schema().as_ref().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
@@ -1388,7 +1388,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
}
})
.collect();
- let schema = f.schema().try_into()?;
+ let schema = f.schema().as_ref().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
@@ -1833,7 +1833,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
}
})
.collect();
- let schema = f.schema().try_into()?;
+ let schema = f.schema().as_ref().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]