GaneshPatil7517 commented on code in PR #19619:
URL: https://github.com/apache/datafusion/pull/19619#discussion_r2660775150
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -93,53 +93,129 @@ pub struct FilterExec {
fetch: Option<usize>,
}
+/// Builder for [`FilterExec`] to set optional parameters
+pub struct FilterExecBuilder {
+ predicate: Arc<dyn PhysicalExpr>,
+ input: Arc<dyn ExecutionPlan>,
+ projection: Option<Vec<usize>>,
+ default_selectivity: u8,
+ batch_size: usize,
+ fetch: Option<usize>,
+}
+
+impl FilterExecBuilder {
+ /// Create a new builder with required parameters (predicate and input)
+ pub fn new(predicate: Arc<dyn PhysicalExpr>, input: Arc<dyn
ExecutionPlan>) -> Self {
+ Self {
+ predicate,
+ input,
+ projection: None,
+ default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY,
+ batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
+ fetch: None,
+ }
+ }
+
+ /// Set the projection
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Set the default selectivity
+ pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self
{
+ self.default_selectivity = default_selectivity;
+ self
+ }
+
+ /// Set the batch size
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
+ /// Set the fetch limit
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Build the FilterExec, computing properties once with all configured
parameters
+ pub fn build(self) -> Result<FilterExec> {
+ // Validate predicate type
+ match self.predicate.data_type(self.input.schema().as_ref())? {
+ DataType::Boolean => {}
+ other => {
+ return plan_err!(
+ "Filter predicate must return BOOLEAN values, got
{other:?}"
+ );
+ }
+ }
+
+ // Validate selectivity
+ if self.default_selectivity > 100 {
+ return plan_err!(
+ "Default filter selectivity value needs to be less than or
equal to 100"
+ );
+ }
+
+ // Validate projection if provided
+ if let Some(ref proj) = self.projection {
+ can_project(&self.input.schema(), Some(proj))?;
+ }
+
+ // Compute properties once with all parameters
+ let cache = FilterExec::compute_properties(
+ &self.input,
+ &self.predicate,
+ self.default_selectivity,
+ self.projection.as_ref(),
+ )?;
+
+ Ok(FilterExec {
+ predicate: self.predicate,
+ input: self.input,
+ metrics: ExecutionPlanMetricsSet::new(),
+ default_selectivity: self.default_selectivity,
+ cache,
+ projection: self.projection,
+ batch_size: self.batch_size,
+ fetch: self.fetch,
+ })
+ }
+}
+
impl FilterExec {
- /// Create a FilterExec on an input
- #[expect(clippy::needless_pass_by_value)]
+ /// Create a FilterExec on an input using the builder pattern
pub fn try_new(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
- match predicate.data_type(input.schema().as_ref())? {
- DataType::Boolean => {
- let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY;
- let cache = Self::compute_properties(
- &input,
- &predicate,
- default_selectivity,
- None,
- )?;
- Ok(Self {
- predicate,
- input: Arc::clone(&input),
- metrics: ExecutionPlanMetricsSet::new(),
- default_selectivity,
- cache,
- projection: None,
- batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE,
- fetch: None,
- })
- }
- other => {
- plan_err!("Filter predicate must return BOOLEAN values, got
{other:?}")
- }
- }
+ FilterExecBuilder::new(predicate, input).build()
}
+ /// Set the default selectivity
+ ///
+ /// # Deprecated
+ /// Use [`FilterExecBuilder::with_default_selectivity`] instead
+ #[deprecated(since = "52.0.0", note = "Use
FilterExecBuilder::with_default_selectivity instead")]
pub fn with_default_selectivity(
- mut self,
+ self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
- if default_selectivity > 100 {
- return plan_err!(
- "Default filter selectivity value needs to be less than or
equal to 100"
- );
- }
- self.default_selectivity = default_selectivity;
- Ok(self)
+ FilterExecBuilder::new(Arc::clone(&self.predicate),
Arc::clone(&self.input))
+ .with_projection(self.projection.clone())
+ .with_default_selectivity(default_selectivity)
+ .with_batch_size(self.batch_size)
+ .with_fetch(self.fetch)
+ .build()
}
Review Comment:
ok ill work on it...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]