gstvg commented on code in PR #21193:
URL: https://github.com/apache/datafusion/pull/21193#discussion_r3208776104
##########
datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs:
##########
@@ -594,6 +662,124 @@ impl SubstraitConsumer for DefaultSubstraitConsumer<'_> {
let plan = plan.with_exprs_and_inputs(plan.expressions(), inputs)?;
Ok(LogicalPlan::Extension(Extension { node: plan }))
}
+
+ fn with_lambda_parameters(
+ &self,
+ lambda_parameters: &[Type],
+ input_schema: &DFSchema,
+ ) -> datafusion::common::Result<(Vec<String>, Self)> {
+ let (names, lambda_consumer) =
self.lambda_consumer.with_lambda_parameters(
+ self,
+ lambda_parameters,
+ input_schema,
+ )?;
+
+ Ok((
+ names,
+ Self {
+ extensions: self.extensions,
+ state: self.state,
+ outer_schemas:
RwLock::new(self.outer_schemas.read().unwrap().clone()),
+ lambda_consumer,
+ },
+ ))
+ }
+
+ fn lambda_variable(
+ &self,
+ steps_out: usize,
+ field_idx: usize,
+ ) -> datafusion::common::Result<Expr> {
+ self.lambda_consumer.lambda_variable(steps_out, field_idx)
+ }
+}
+
+/// Default implementation of lambda related methods of the
[SubstraitConsumer] trait
+///
+/// Can be embedded into a custom [SubstraitConsumer] to implement them
+pub struct DefaultSubstraitLambdaConsumer {
Review Comment:
The existing required methods for `trait SubstraitConsumer` are trivial to
implement, but that's not true for the newly added lambda methods. This is a
just a convenience to custom implementations which don't want to customize the
default lambda handling, should I remove it?
```rust
struct CustomSubstraitConsumer {
extensions: Arc<Extensions>,
state: Arc<SessionState>,
// You can reuse existing consumer code related to lambdas
lambda_consumer: DefaultSubstraitLambdaConsumer,
}
#[async_trait]
impl SubstraitConsumer for CustomSubstraitConsumer {
async fn resolve_table_ref(
&self,
table_ref: &TableReference,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table = table_ref.table().to_string();
let schema = self.state.schema_for_ref(table_ref.clone())?;
let table_provider = schema.table(&table).await?;
Ok(table_provider)
}
fn get_extensions(&self) -> &Extensions {
self.extensions.as_ref()
}
fn get_function_registry(&self) -> &impl FunctionRegistry {
self.state.as_ref()
}
fn with_lambda_parameters(
&self,
lambda_parameters: &[Type],
input_schema: &DFSchema,
) -> datafusion::common::Result<(Vec<String>, Self)> {
let (names, lambda_consumer) =
self.lambda_consumer.with_lambda_parameters(
self,
lambda_parameters,
input_schema,
)?;
Ok((
names,
Self {
extensions: self.extensions.clone(),
state: self.state.clone(),
lambda_consumer,
},
))
}
fn lambda_variable(
&self,
steps_out: usize,
field_idx: usize,
) -> datafusion::common::Result<Expr> {
self.lambda_consumer.lambda_variable(steps_out, field_idx)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]