martin-g commented on code in PR #1906:
URL:
https://github.com/apache/datafusion-ballista/pull/1906#discussion_r3490146420
##########
ballista/core/src/serde/mod.rs:
##########
@@ -381,15 +382,15 @@ impl PhysicalExtensionCodec for
BallistaPhysicalExtensionCodec {
)
})?;
let converter = DefaultPhysicalProtoConverter {};
+ let decode_ctx = PhysicalPlanDecodeContext::new(ctx,
self.default_codec.as_ref());
Review Comment:
```suggestion
let decode_ctx = PhysicalPlanDecodeContext::new(ctx, self);
```
##########
ballista/executor/src/execution_engine.rs:
##########
@@ -101,19 +106,61 @@ impl DefaultExecutionEngine {
}
}
+/// Restrict a `DataSourceExec` to the file group for `partition_id`.
+///
+/// DataFusion 54's `DataSourceExec` hands file groups to partition streams
from
+/// a shared work-queue that is only divided across partitions when all
+/// partitions of one plan instance are polled concurrently. Ballista runs one
+/// partition per task on its own plan instance, so a task that polls a single
+/// partition in isolation would otherwise drain the whole queue and scan the
+/// entire table. Keeping only this task's file group (other slots emptied,
+/// partition count preserved) makes the lone `execute(partition_id)` read just
+/// that group.
+///
+/// Returns `None` for any node that is not a file-backed `DataSourceExec`, and
+/// for a `partition_id` outside the source's file groups (e.g. when an
operator
+/// between the scan and the stage output changed the partition count).
+fn restrict_scan_to_partition(
+ plan: &Arc<dyn ExecutionPlan>,
+ partition_id: usize,
+) -> Option<Arc<dyn ExecutionPlan>> {
+ let exec = plan.downcast_ref::<DataSourceExec>()?;
+ let source: &dyn Any = exec.data_source().as_ref();
+ let config = source.downcast_ref::<FileScanConfig>()?;
Review Comment:
If the config is different type, e.g. `MemorySourceConfig` then
[Ok(Transformed::no(p))](https://github.com/apache/datafusion-ballista/pull/1906/changes#diff-708c49b754b125819c73edc3dd46dee0f92f2031bd09426a11990ecd05251601R179)
will be executed and it may return duplicate results, no ?
Maybe log a WARN here before returning `None` ?!
##########
ballista/core/src/registry.rs:
##########
@@ -106,6 +107,19 @@ impl FunctionRegistry for BallistaFunctionRegistry {
self.window_functions.keys().cloned().collect()
}
+ fn higher_order_function_names(&self) -> HashSet<String> {
+ HashSet::new()
Review Comment:
This will break the built-in HOFs from DataFusion.
IMO it would be better to copy them from the state at
https://github.com/apache/datafusion-ballista/blob/85805db74ffaa7aca25009099ecd06819163dc22/ballista/core/src/registry.rs#L154-L166
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]