This is an automated email from the ASF dual-hosted git repository. yjshen pushed a commit to branch dp_poc1 in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
commit 8a23ac92d13d1f7b23c1bc27e46621c38b789cf9 Author: Yijie Shen <[email protected]> AuthorDate: Thu Aug 22 17:01:07 2024 -0700 wip: overall framework on comet side --- native/Cargo.lock | 1 + native/core/Cargo.toml | 1 + native/core/src/errors.rs | 6 ++++ .../core/src/execution/datafusion/ir_translator.rs | 2 +- native/core/src/execution/datafusion/planner.rs | 39 ++++++++++++++++++++-- .../core/src/execution/operators/falcon/filter.rs | 8 +++-- native/core/src/execution/operators/falcon/mod.rs | 32 +++++++++++++++++- .../src/execution/operators/falcon/projection.rs | 8 +++-- native/core/src/execution/operators/mod.rs | 1 + 9 files changed, 87 insertions(+), 11 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index a4b9cb96..6a741a37 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -2330,6 +2330,7 @@ dependencies = [ "flate2", "futures", "half", + "help", "hex", "insight", "itertools 0.11.0", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index c471ffc0..29867bef 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -82,6 +82,7 @@ datafusion-comet-proto = { workspace = true } # DataPelago dependencies qflow = { path = "../../../nucleus/qflow", default-features = false } insight = { path = "../../../nucleus/insight", default-features = false } +help = { path = "../../../nucleus/help" } [dev-dependencies] pprof = { version = "0.13.0", features = ["flamegraph"] } diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index 92799bcf..05d1db07 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -142,6 +142,12 @@ pub enum CometError { msg: String, throwable: GlobalRef, }, + + #[error(transparent)] + Nucleus { + #[from] + source: help::Error, + } } pub fn init() { diff --git a/native/core/src/execution/datafusion/ir_translator.rs b/native/core/src/execution/datafusion/ir_translator.rs index 5c54ddd4..547baff1 100644 --- a/native/core/src/execution/datafusion/ir_translator.rs +++ b/native/core/src/execution/datafusion/ir_translator.rs @@ -13,7 +13,7 @@ use datafusion_comet_proto::spark_operator::operator::OpStruct; use crate::execution::datafusion::{TranslateResult as Result, TranslationError}; -fn translate_operator(op: &Operator) -> Result<Node> { +pub(crate) fn translate_operator(op: &Operator) -> Result<Node> { debug!("Translating operator: {:?}", op.op_struct); match &op.op_struct { Some(OpStruct::Projection(proj)) => translate_project(proj, op), diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index b66e05de..b94fb5dc 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -19,7 +19,7 @@ use super::expressions::EvalMode; use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun; -use crate::execution::operators::{CopyMode, FilterExec, FalconCore, FalconFilterProjectExec}; +use crate::execution::operators::{CopyMode, FilterExec, FalconProjectionExec, FalconFilterExec}; use crate::{ errors::ExpressionError, execution::{ @@ -112,6 +112,8 @@ use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; use std::cmp::max; use std::{collections::HashMap, sync::Arc}; +use log::warn; +use crate::execution::datafusion::ir_translator::translate_operator; // For clippy error on type_complexity. type ExecResult<T> = Result<T, ExecutionError>; @@ -766,7 +768,23 @@ impl PhysicalPlanner { .map(|r| (r, format!("col_{}", idx))) }) .collect(); - Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, child)?))) + + // Attempt to translate Spark operator to QFlow node + match translate_operator(spark_plan) { + Ok(qflow_node) => { + // Create FalconProjectionExec if translation succeeded + Ok((scans, Arc::new(FalconProjectionExec::try_new( + exprs?, + child.clone(), + child, + qflow_node, + )?))) + }, + Err(e) => { + warn!("Falling back to regular ProjectionExec: {}", e); + Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, child)?))) + } + } } OpStruct::Filter(filter) => { assert!(children.len() == 1); @@ -774,7 +792,22 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - Ok((scans, Arc::new(FilterExec::try_new(predicate, child)?))) + // Attempt to translate Spark operator to QFlow node + match translate_operator(spark_plan) { + Ok(qflow_node) => { + // Create FalconFilterExec if translation succeeded + Ok((scans, Arc::new(FalconFilterExec::try_new( + predicate, + child.clone(), + child, + qflow_node, + )?))) + }, + Err(e) => { + warn!("Falling back to regular FilterExec: {}", e); + Ok((scans, Arc::new(FilterExec::try_new(predicate, child)?))) + } + } } OpStruct::HashAgg(agg) => { assert!(children.len() == 1); diff --git a/native/core/src/execution/operators/falcon/filter.rs b/native/core/src/execution/operators/falcon/filter.rs index 1fad71fe..a5c6cba9 100644 --- a/native/core/src/execution/operators/falcon/filter.rs +++ b/native/core/src/execution/operators/falcon/filter.rs @@ -15,6 +15,7 @@ use futures::stream::{Stream, StreamExt}; use datafusion_common::Result; use log::trace; use qflow::ir::plan::Node; +use crate::execution::operators::{compile, run_dfg, DFG}; use crate::execution::operators::falcon::{get_falcon_input_stream, FalconPlan}; #[derive(Debug)] @@ -88,6 +89,7 @@ impl ExecutionPlan for FalconFilterExec { trace!("Start FalconFilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(FalconFilterExecStream { + dfg: compile(&self.qnode)?, schema: self.input.schema(), input: self.input.execute(partition, context)?, baseline_metrics, @@ -101,7 +103,7 @@ impl ExecutionPlan for FalconFilterExec { impl FalconPlan for FalconFilterExec { fn qflow_node(&self) -> &Node { - todo!() + &self.qnode } fn input_stream(&self, partition: usize, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> { @@ -110,6 +112,7 @@ impl FalconPlan for FalconFilterExec { } struct FalconFilterExecStream { + dfg: DFG, schema: SchemaRef, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, @@ -117,8 +120,7 @@ struct FalconFilterExecStream { impl FalconFilterExecStream { fn filter_batch(&self, batch: RecordBatch) -> Result<RecordBatch> { - // FIXME: call Falcon to filter the batch - Ok(batch) + run_dfg(&self.dfg, &batch) } } diff --git a/native/core/src/execution/operators/falcon/mod.rs b/native/core/src/execution/operators/falcon/mod.rs index 3a88f1de..aa73c166 100644 --- a/native/core/src/execution/operators/falcon/mod.rs +++ b/native/core/src/execution/operators/falcon/mod.rs @@ -1,11 +1,16 @@ use std::sync::Arc; +use arrow_array::RecordBatch; use datafusion::physical_plan::ExecutionPlan; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_common::{Result}; +use insight::Graph; +use qflow::ir; +use qflow::ir::Plan; use qflow::ir::plan::Node; - +use qflow::ir::plan::plan::CacheLocation; pub use projection::FalconProjectionExec; pub use filter::FalconFilterExec; +use crate::errors::CometResult; mod projection; mod filter; @@ -32,3 +37,28 @@ pub fn get_falcon_input_stream(plan: &Arc<dyn ExecutionPlan>, partition: usize, plan.execute(partition, context) } } + +pub type DFG = (Graph, Vec<CacheLocation>); + +pub fn compile(node: &Node) -> CometResult<DFG> { + let plan = Plan::with_root(node.clone()); + let mut plan = ir::IR::Plan(plan); + + let mut context = ir::compiler::Context::empty(); + let dfg = plan.compile(&mut context)?; + Ok(dfg) +} + +pub fn run_dfg(_dfg: &DFG, input: &RecordBatch) -> Result<RecordBatch> { + // FIXME: + // 1. record batch to falcon format + // 2. run the dfg + // 3. falcon format to record batch + + // let (graph, _cache_locations) = dfg; + // let input = input.convert_to_falcon(); + // let output = graph.run(input); + // let output = output.convert_to_arrow(); + + Ok(input.clone()) +} diff --git a/native/core/src/execution/operators/falcon/projection.rs b/native/core/src/execution/operators/falcon/projection.rs index 566845ba..8c103366 100644 --- a/native/core/src/execution/operators/falcon/projection.rs +++ b/native/core/src/execution/operators/falcon/projection.rs @@ -15,6 +15,7 @@ use futures::stream::{Stream, StreamExt}; use datafusion_common::Result; use log::trace; use qflow::ir::plan::Node; +use crate::execution::operators::{compile, run_dfg, DFG}; use crate::execution::operators::falcon::{get_falcon_input_stream, FalconPlan}; #[derive(Debug)] @@ -100,6 +101,7 @@ impl ExecutionPlan for FalconProjectionExec { fn execute(&self, partition: usize, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> { trace!("Start FalconProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); Ok(Box::pin(FalconProjectionStream { + dfg: compile(&self.qnode)?, schema: self.schema(), input: self.input_stream(partition, context.clone())?, baseline_metrics: BaselineMetrics::new(&self.metrics, partition), @@ -123,6 +125,7 @@ impl FalconPlan for FalconProjectionExec { } struct FalconProjectionStream { + dfg: DFG, schema: SchemaRef, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, @@ -132,9 +135,8 @@ impl FalconProjectionStream { fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> { // record time on drop let _timer = self.baseline_metrics.elapsed_compute().timer(); - - // FIXME: call Falcon to project the batch - Ok(batch.clone()) + let output = run_dfg(&self.dfg, batch)?; + Ok(output) } } diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 353367cf..d11e9515 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -25,6 +25,7 @@ pub use copy::*; pub use filter::comet_filter_record_batch; pub use filter::FilterExec; pub use scan::*; +pub use falcon::*; mod copy; mod filter; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
