This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch dp_poc1
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git

commit 8a23ac92d13d1f7b23c1bc27e46621c38b789cf9
Author: Yijie Shen <[email protected]>
AuthorDate: Thu Aug 22 17:01:07 2024 -0700

    wip: overall framework on comet side
---
 native/Cargo.lock                                  |  1 +
 native/core/Cargo.toml                             |  1 +
 native/core/src/errors.rs                          |  6 ++++
 .../core/src/execution/datafusion/ir_translator.rs |  2 +-
 native/core/src/execution/datafusion/planner.rs    | 39 ++++++++++++++++++++--
 .../core/src/execution/operators/falcon/filter.rs  |  8 +++--
 native/core/src/execution/operators/falcon/mod.rs  | 32 +++++++++++++++++-
 .../src/execution/operators/falcon/projection.rs   |  8 +++--
 native/core/src/execution/operators/mod.rs         |  1 +
 9 files changed, 87 insertions(+), 11 deletions(-)

diff --git a/native/Cargo.lock b/native/Cargo.lock
index a4b9cb96..6a741a37 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -2330,6 +2330,7 @@ dependencies = [
  "flate2",
  "futures",
  "half",
+ "help",
  "hex",
  "insight",
  "itertools 0.11.0",
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index c471ffc0..29867bef 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -82,6 +82,7 @@ datafusion-comet-proto = { workspace = true }
 # DataPelago dependencies
 qflow = { path = "../../../nucleus/qflow", default-features = false }
 insight = { path = "../../../nucleus/insight", default-features = false }
+help = { path = "../../../nucleus/help" }
 
 [dev-dependencies]
 pprof = { version = "0.13.0", features = ["flamegraph"] }
diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs
index 92799bcf..05d1db07 100644
--- a/native/core/src/errors.rs
+++ b/native/core/src/errors.rs
@@ -142,6 +142,12 @@ pub enum CometError {
         msg: String,
         throwable: GlobalRef,
     },
+
+    #[error(transparent)]
+    Nucleus {
+        #[from]
+        source: help::Error,
+    }
 }
 
 pub fn init() {
diff --git a/native/core/src/execution/datafusion/ir_translator.rs 
b/native/core/src/execution/datafusion/ir_translator.rs
index 5c54ddd4..547baff1 100644
--- a/native/core/src/execution/datafusion/ir_translator.rs
+++ b/native/core/src/execution/datafusion/ir_translator.rs
@@ -13,7 +13,7 @@ use 
datafusion_comet_proto::spark_operator::operator::OpStruct;
 
 use crate::execution::datafusion::{TranslateResult as Result, 
TranslationError};
 
-fn translate_operator(op: &Operator) -> Result<Node> {
+pub(crate) fn translate_operator(op: &Operator) -> Result<Node> {
     debug!("Translating operator: {:?}", op.op_struct);
     match &op.op_struct {
         Some(OpStruct::Projection(proj)) => translate_project(proj, op),
diff --git a/native/core/src/execution/datafusion/planner.rs 
b/native/core/src/execution/datafusion/planner.rs
index b66e05de..b94fb5dc 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -19,7 +19,7 @@
 
 use super::expressions::EvalMode;
 use 
crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun;
-use crate::execution::operators::{CopyMode, FilterExec, FalconCore, 
FalconFilterProjectExec};
+use crate::execution::operators::{CopyMode, FilterExec, FalconProjectionExec, 
FalconFilterExec};
 use crate::{
     errors::ExpressionError,
     execution::{
@@ -112,6 +112,8 @@ use jni::objects::GlobalRef;
 use num::{BigInt, ToPrimitive};
 use std::cmp::max;
 use std::{collections::HashMap, sync::Arc};
+use log::warn;
+use crate::execution::datafusion::ir_translator::translate_operator;
 
 // For clippy error on type_complexity.
 type ExecResult<T> = Result<T, ExecutionError>;
@@ -766,7 +768,23 @@ impl PhysicalPlanner {
                             .map(|r| (r, format!("col_{}", idx)))
                     })
                     .collect();
-                Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, child)?)))
+
+                // Attempt to translate Spark operator to QFlow node
+                match translate_operator(spark_plan) {
+                    Ok(qflow_node) => {
+                        // Create FalconProjectionExec if translation succeeded
+                        Ok((scans, Arc::new(FalconProjectionExec::try_new(
+                            exprs?,
+                            child.clone(),
+                            child,
+                            qflow_node,
+                        )?)))
+                    },
+                    Err(e) => {
+                        warn!("Falling back to regular ProjectionExec: {}", e);
+                        Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, 
child)?)))
+                    }
+                }
             }
             OpStruct::Filter(filter) => {
                 assert!(children.len() == 1);
@@ -774,7 +792,22 @@ impl PhysicalPlanner {
                 let predicate =
                     self.create_expr(filter.predicate.as_ref().unwrap(), 
child.schema())?;
 
-                Ok((scans, Arc::new(FilterExec::try_new(predicate, child)?)))
+                // Attempt to translate Spark operator to QFlow node
+                match translate_operator(spark_plan) {
+                    Ok(qflow_node) => {
+                        // Create FalconFilterExec if translation succeeded
+                        Ok((scans, Arc::new(FalconFilterExec::try_new(
+                            predicate,
+                            child.clone(),
+                            child,
+                            qflow_node,
+                        )?)))
+                    },
+                    Err(e) => {
+                        warn!("Falling back to regular FilterExec: {}", e);
+                        Ok((scans, Arc::new(FilterExec::try_new(predicate, 
child)?)))
+                    }
+                }
             }
             OpStruct::HashAgg(agg) => {
                 assert!(children.len() == 1);
diff --git a/native/core/src/execution/operators/falcon/filter.rs 
b/native/core/src/execution/operators/falcon/filter.rs
index 1fad71fe..a5c6cba9 100644
--- a/native/core/src/execution/operators/falcon/filter.rs
+++ b/native/core/src/execution/operators/falcon/filter.rs
@@ -15,6 +15,7 @@ use futures::stream::{Stream, StreamExt};
 use datafusion_common::Result;
 use log::trace;
 use qflow::ir::plan::Node;
+use crate::execution::operators::{compile, run_dfg, DFG};
 use crate::execution::operators::falcon::{get_falcon_input_stream, FalconPlan};
 
 #[derive(Debug)]
@@ -88,6 +89,7 @@ impl ExecutionPlan for FalconFilterExec {
         trace!("Start FalconFilterExec::execute for partition {} of context 
session_id {} and task_id {:?}", partition, context.session_id(), 
context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         Ok(Box::pin(FalconFilterExecStream {
+            dfg: compile(&self.qnode)?,
             schema: self.input.schema(),
             input: self.input.execute(partition, context)?,
             baseline_metrics,
@@ -101,7 +103,7 @@ impl ExecutionPlan for FalconFilterExec {
 
 impl FalconPlan for FalconFilterExec {
     fn qflow_node(&self) -> &Node {
-        todo!()
+        &self.qnode
     }
 
     fn input_stream(&self, partition: usize, context: Arc<TaskContext>) -> 
Result<SendableRecordBatchStream> {
@@ -110,6 +112,7 @@ impl FalconPlan for FalconFilterExec {
 }
 
 struct FalconFilterExecStream {
+    dfg: DFG,
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     baseline_metrics: BaselineMetrics,
@@ -117,8 +120,7 @@ struct FalconFilterExecStream {
 
 impl FalconFilterExecStream {
     fn filter_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-        // FIXME: call Falcon to filter the batch
-        Ok(batch)
+        run_dfg(&self.dfg, &batch)
     }
 }
 
diff --git a/native/core/src/execution/operators/falcon/mod.rs 
b/native/core/src/execution/operators/falcon/mod.rs
index 3a88f1de..aa73c166 100644
--- a/native/core/src/execution/operators/falcon/mod.rs
+++ b/native/core/src/execution/operators/falcon/mod.rs
@@ -1,11 +1,16 @@
 use std::sync::Arc;
+use arrow_array::RecordBatch;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_common::{Result};
+use insight::Graph;
+use qflow::ir;
+use qflow::ir::Plan;
 use qflow::ir::plan::Node;
-
+use qflow::ir::plan::plan::CacheLocation;
 pub use projection::FalconProjectionExec;
 pub use filter::FalconFilterExec;
+use crate::errors::CometResult;
 
 mod projection;
 mod filter;
@@ -32,3 +37,28 @@ pub fn get_falcon_input_stream(plan: &Arc<dyn 
ExecutionPlan>, partition: usize,
         plan.execute(partition, context)
     }
 }
+
+pub type DFG = (Graph, Vec<CacheLocation>);
+
+pub fn compile(node: &Node) -> CometResult<DFG> {
+    let plan = Plan::with_root(node.clone());
+    let mut plan = ir::IR::Plan(plan);
+
+    let mut context = ir::compiler::Context::empty();
+    let dfg = plan.compile(&mut context)?;
+    Ok(dfg)
+}
+
+pub fn run_dfg(_dfg: &DFG, input: &RecordBatch) -> Result<RecordBatch> {
+    // FIXME:
+    // 1. record batch to falcon format
+    // 2. run the dfg
+    // 3. falcon format to record batch
+
+    // let (graph, _cache_locations) = dfg;
+    // let input = input.convert_to_falcon();
+    // let output = graph.run(input);
+    // let output = output.convert_to_arrow();
+
+    Ok(input.clone())
+}
diff --git a/native/core/src/execution/operators/falcon/projection.rs 
b/native/core/src/execution/operators/falcon/projection.rs
index 566845ba..8c103366 100644
--- a/native/core/src/execution/operators/falcon/projection.rs
+++ b/native/core/src/execution/operators/falcon/projection.rs
@@ -15,6 +15,7 @@ use futures::stream::{Stream, StreamExt};
 use datafusion_common::Result;
 use log::trace;
 use qflow::ir::plan::Node;
+use crate::execution::operators::{compile, run_dfg, DFG};
 use crate::execution::operators::falcon::{get_falcon_input_stream, FalconPlan};
 
 #[derive(Debug)]
@@ -100,6 +101,7 @@ impl ExecutionPlan for FalconProjectionExec {
     fn execute(&self, partition: usize, context: Arc<TaskContext>) -> 
Result<SendableRecordBatchStream> {
         trace!("Start FalconProjectionExec::execute for partition {} of 
context session_id {} and task_id {:?}", partition, context.session_id(), 
context.task_id());
         Ok(Box::pin(FalconProjectionStream {
+            dfg: compile(&self.qnode)?,
             schema: self.schema(),
             input: self.input_stream(partition, context.clone())?,
             baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
@@ -123,6 +125,7 @@ impl FalconPlan for FalconProjectionExec {
 }
 
 struct FalconProjectionStream {
+    dfg: DFG,
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     baseline_metrics: BaselineMetrics,
@@ -132,9 +135,8 @@ impl FalconProjectionStream {
     fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
         // record time on drop
         let _timer = self.baseline_metrics.elapsed_compute().timer();
-
-        // FIXME: call Falcon to project the batch
-        Ok(batch.clone())
+        let output = run_dfg(&self.dfg, batch)?;
+        Ok(output)
     }
 }
 
diff --git a/native/core/src/execution/operators/mod.rs 
b/native/core/src/execution/operators/mod.rs
index 353367cf..d11e9515 100644
--- a/native/core/src/execution/operators/mod.rs
+++ b/native/core/src/execution/operators/mod.rs
@@ -25,6 +25,7 @@ pub use copy::*;
 pub use filter::comet_filter_record_batch;
 pub use filter::FilterExec;
 pub use scan::*;
+pub use falcon::*;
 
 mod copy;
 mod filter;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to