andygrove commented on code in PR #14:
URL: https://github.com/apache/datafusion-ray/pull/14#discussion_r1807437886


##########
src/context.rs:
##########
@@ -44,75 +41,61 @@ type PyResultSet = Vec<PyObject>;
 
 #[pyclass(name = "Context", module = "datafusion_ray", subclass)]
 pub struct PyContext {
-    pub(crate) ctx: SessionContext,
+    pub(crate) py_ctx: PyObject,
+}
+
+pub(crate) fn execution_plan_from_pyany(
+    py_plan: &Bound<PyAny>,
+) -> PyResult<Arc<dyn ExecutionPlan>> {
+    let py_proto = py_plan.call_method0("to_proto")?;
+    let plan_bytes: &[u8] = py_proto.extract()?;
+    let plan_node = 
protobuf::PhysicalPlanNode::try_decode(plan_bytes).map_err(|e| {
+        PyRuntimeError::new_err(format!(
+            "Unable to decode physical plan protobuf message: {}",
+            e
+        ))
+    })?;
+
+    let codec = ShuffleCodec {};
+    let runtime = RuntimeEnv::default();
+    let registry = SessionContext::new();
+    plan_node
+        .try_into_physical_plan(&registry, &runtime, &codec)
+        .map_err(|e| e.into())
 }
 
 #[pymethods]
 impl PyContext {
     #[new]
-    pub fn new(target_partitions: usize) -> Result<Self> {
-        let config = SessionConfig::default()
-            .with_target_partitions(target_partitions)
-            .with_batch_size(16 * 1024)
-            .with_repartition_aggregations(true)
-            .with_repartition_windows(true)
-            .with_repartition_joins(true)
-            .with_parquet_pruning(true);
-
-        let mem_pool_size = 1024 * 1024 * 1024;
-        let runtime_config = 
datafusion::execution::runtime_env::RuntimeConfig::new()
-            .with_memory_pool(Arc::new(FairSpillPool::new(mem_pool_size)))
-            
.with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp".into()]));
-        let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
-        let ctx = SessionContext::new_with_config_rt(config, runtime);
-        Ok(Self { ctx })
-    }
-
-    pub fn register_csv(
-        &self,
-        name: &str,
-        path: &str,
-        has_header: bool,
-        py: Python,
-    ) -> PyResult<()> {
-        let options = CsvReadOptions::default().has_header(has_header);
-        wait_for_future(py, self.ctx.register_csv(name, path, options))?;
-        Ok(())
-    }
-
-    pub fn register_parquet(&self, name: &str, path: &str, py: Python) -> 
PyResult<()> {
-        let options = ParquetReadOptions::default();
-        wait_for_future(py, self.ctx.register_parquet(name, path, options))?;
-        Ok(())
-    }
-
-    pub fn register_datalake_table(
-        &self,
-        _name: &str,
-        _path: Vec<String>,
-        _py: Python,
-    ) -> PyResult<()> {
-        // let options = ParquetReadOptions::default();
-        // let listing_options = 
options.to_listing_options(&self.ctx.state().config());
-        // wait_for_future(py, self.ctx.register_listing_table(name, path, 
listing_options, None, None))?;
-        // Ok(())
-        unimplemented!()
+    pub fn new(session_ctx: PyObject) -> Result<Self> {
+        Ok(Self {
+            py_ctx: session_ctx,
+        })
     }
 
     /// Execute SQL directly against the DataFusion context. Useful for 
statements
     /// such as "create view" or "drop view"
-    pub fn sql(&self, sql: &str, py: Python) -> PyResult<()> {
-        println!("Executing {}", sql);
-        let _df = wait_for_future(py, self.ctx.sql(sql))?;
+    pub fn sql(&self, query: &str, py: Python) -> PyResult<()> {
+        println!("Executing {}", query);
+        // let _df = wait_for_future(py, self.ctx.sql(sql))?;
+        let _df = self.run_sql(query, py);
         Ok(())
     }
 
+    fn run_sql(&self, query: &str, py: Python) -> PyResult<Py<PyAny>> {
+        let args = PyTuple::new_bound(py, [query]);
+        self.py_ctx.call_method1(py, "sql", args)
+    }
+
     /// Plan a distributed SELECT query for executing against the Ray workers
-    pub fn plan(&self, sql: &str, py: Python) -> PyResult<PyExecutionGraph> {
-        println!("Planning {}", sql);
-        let df = wait_for_future(py, self.ctx.sql(sql))?;
-        let plan = wait_for_future(py, df.create_physical_plan())?;
+    pub fn plan(&self, plan: &Bound<PyAny>) -> PyResult<PyExecutionGraph> {
+        // println!("Planning {}", sql);
+        // let df = wait_for_future(py, self.ctx.sql(sql))?;
+        // let py_df = self.run_sql(sql, py)?;
+        // let py_plan = py_df.call_method0(py, "execution_plan")?;
+        // let py_plan = py_plan.bind(py);

Review Comment:
   can we remove this commented out code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to