This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new d949e5fe added a BallistaContext to ballista to allow for Remote or 
standalone (#1100)
d949e5fe is described below

commit d949e5fe33ad22da1388657dc8a6dc5f400023bc
Author: Trevor <[email protected]>
AuthorDate: Mon Nov 18 22:53:59 2024 -0800

    added a BallistaContext to ballista to allow for Remote or standalone 
(#1100)
    
    * added a pycontext to ballista
    
    * added a pycontext to ballista
    
    * added a pycontext to ballista
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updating the pyballista package to ballista
    
    * changing the packagaing naming convention from pyballista to ballista
    
    * changing the packagaing naming convention from pyballista to ballista
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * updated python to have two static methods for creating a ballista context
    
    * Updating BallistaContext and Config
    
    * Updating BallistaContext and Config
    
    * updated python to have two static methods for creating a ballista context
    
    * Updating BallistaContext and Config, calling it for the night, will 
complete tomorrow
    
    * Updating BallistaContext and Config, calling it for the night, will 
complete tomorrow
    
    * Adding config to ballista context
    
    * Adding config to ballista context
    
    * Adding config to ballista context
    
    * Adding config to ballista context
    
    * Updated Builder and Docs
    
    * Updated Builder and Docs
    
    * Updated Builder and Docs
    
    * Updated Builder and Docs
    
    * Updated Builder and Docs
    
    * Updated Builder and Docs
    
    ---------
    
    Co-authored-by: Trevor Barnes <[email protected]>
---
 docs/source/user-guide/python.md                   |  20 +-
 python/Cargo.toml                                  |  10 +-
 python/README.md                                   |   4 +-
 python/{pyballista => ballista}/__init__.py        |   8 +-
 python/{pyballista => ballista}/tests/__init__.py  |   0
 .../{pyballista => ballista}/tests/test_context.py |  21 +-
 .../__init__.py => examples/example.py}            |  30 +-
 python/pyproject.toml                              |   6 +-
 python/src/context.rs                              | 353 ---------------------
 python/src/lib.rs                                  |  85 ++++-
 python/testdata/test.csv                           |   0
 11 files changed, 131 insertions(+), 406 deletions(-)

diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md
index 674850c7..f17ac68d 100644
--- a/docs/source/user-guide/python.md
+++ b/docs/source/user-guide/python.md
@@ -28,9 +28,20 @@ popular file formats files, run it in a distributed 
environment, and obtain the
 
 The following code demonstrates how to create a Ballista context and connect 
to a scheduler.
 
+If you are running a standalone cluster (runs locally), all you need to do is 
call the stand alone cluster method `standalone()` or your BallistaContext. If 
you are running a cluster in remote mode, you need to provide the URL 
`Ballista.remote("http://my-remote-ip:50050";)`.
+
 ```text
->>> import ballista
->>> ctx = ballista.BallistaContext("localhost", 50050)
+>>> from ballista import BallistaBuilder
+>>> # for a standalone instance
+>>> # Ballista will initiate with an empty config
+>>> # set config variables with `config()`
+>>> ballista = BallistaBuilder()\
+>>>    .config("ballista.job.name", "example ballista")
+>>>
+>>> ctx = ballista.standalone()
+>>>
+>>> # for a remote instance provide the URL
+>>> ctx = ballista.remote("df://url-path-to-scheduler:50050")
 ```
 
 ## SQL
@@ -103,14 +114,15 @@ The `explain` method can be used to show the logical and 
physical query plans fo
 The following example demonstrates creating arrays with PyArrow and then 
creating a Ballista DataFrame.
 
 ```python
-import ballista
+from ballista import BallistaBuilder
 import pyarrow
 
 # an alias
+# TODO implement Functions
 f = ballista.functions
 
 # create a context
-ctx = ballista.BallistaContext("localhost", 50050)
+ctx = Ballista().standalone()
 
 # create a RecordBatch and a new DataFrame from it
 batch = pyarrow.RecordBatch.from_arrays(
diff --git a/python/Cargo.toml b/python/Cargo.toml
index dbe419d2..b03f1e99 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -26,14 +26,14 @@ readme = "README.md"
 license = "Apache-2.0"
 edition = "2021"
 rust-version = "1.72"
-include = ["/src", "/pyballista", "/LICENSE.txt", "pyproject.toml", 
"Cargo.toml", "Cargo.lock"]
+include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml", 
"Cargo.toml", "Cargo.lock"]
 publish = false
 
 [dependencies]
 async-trait = "0.1.77"
-ballista = { path = "../ballista/client", version = "0.12.0" }
+ballista = { path = "../ballista/client", version = "0.12.0", features = 
["standalone"] }
 ballista-core = { path = "../ballista/core", version = "0.12.0" }
-datafusion = { version = "42" }
+datafusion = { version = "42", features = ["pyarrow", "avro"] }
 datafusion-proto = { version = "42" }
 datafusion-python = { version = "42" }
 
@@ -43,6 +43,4 @@ tokio = { version = "1.35", features = ["macros", "rt", 
"rt-multi-thread", "sync
 
 [lib]
 crate-type = ["cdylib"]
-name = "pyballista"
-
-
+name = "ballista"
diff --git a/python/README.md b/python/README.md
index 2898cb16..01b0a7f9 100644
--- a/python/README.md
+++ b/python/README.md
@@ -29,8 +29,8 @@ part of the default Cargo workspace so that it doesn't cause 
overhead for mainta
 Creates a new context and connects to a Ballista scheduler process.
 
 ```python
-from pyballista import SessionContext
->>> ctx = SessionContext("localhost", 50050)
+from ballista import BallistaBuilder
+>>> ctx = BallistaBuilder().standalone()
 ```
 
 ## Example SQL Usage
diff --git a/python/pyballista/__init__.py b/python/ballista/__init__.py
similarity index 92%
copy from python/pyballista/__init__.py
copy to python/ballista/__init__.py
index 62a6bc79..a143f17e 100644
--- a/python/pyballista/__init__.py
+++ b/python/ballista/__init__.py
@@ -25,12 +25,12 @@ except ImportError:
 
 import pyarrow as pa
 
-from .pyballista_internal import (
-    SessionContext,
+from .ballista_internal import (
+    BallistaBuilder,
 )
 
 __version__ = importlib_metadata.version(__name__)
 
 __all__ = [
-    "SessionContext",
-]
+    "BallistaBuilder",
+]
\ No newline at end of file
diff --git a/python/pyballista/tests/__init__.py 
b/python/ballista/tests/__init__.py
similarity index 100%
rename from python/pyballista/tests/__init__.py
rename to python/ballista/tests/__init__.py
diff --git a/python/pyballista/tests/test_context.py 
b/python/ballista/tests/test_context.py
similarity index 81%
rename from python/pyballista/tests/test_context.py
rename to python/ballista/tests/test_context.py
index b440bb27..a0af9592 100644
--- a/python/pyballista/tests/test_context.py
+++ b/python/ballista/tests/test_context.py
@@ -15,27 +15,27 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from pyballista import SessionContext
+from ballista import BallistaBuilder
 import pytest
 
 def test_create_context():
-    SessionContext("localhost", 50050)
+    BallistaBuilder().standalone()
 
 def test_select_one():
-    ctx = SessionContext("localhost", 50050)
+    ctx = BallistaBuilder().standalone()
     df = ctx.sql("SELECT 1")
     batches = df.collect()
     assert len(batches) == 1
 
 def test_read_csv():
-    ctx = SessionContext("localhost", 50050)
+    ctx = BallistaBuilder().standalone()
     df = ctx.read_csv("testdata/test.csv", has_header=True)
     batches = df.collect()
     assert len(batches) == 1
     assert len(batches[0]) == 1
 
 def test_register_csv():
-    ctx = SessionContext("localhost", 50050)
+    ctx = BallistaBuilder().standalone()
     ctx.register_csv("test", "testdata/test.csv", has_header=True)
     df = ctx.sql("SELECT * FROM test")
     batches = df.collect()
@@ -43,14 +43,14 @@ def test_register_csv():
     assert len(batches[0]) == 1
 
 def test_read_parquet():
-    ctx = SessionContext("localhost", 50050)
+    ctx = BallistaBuilder().standalone()
     df = ctx.read_parquet("testdata/test.parquet")
     batches = df.collect()
     assert len(batches) == 1
     assert len(batches[0]) == 8
 
 def test_register_parquet():
-    ctx = SessionContext("localhost", 50050)
+    ctx = BallistaBuilder().standalone()
     ctx.register_parquet("test", "testdata/test.parquet")
     df = ctx.sql("SELECT * FROM test")
     batches = df.collect()
@@ -58,7 +58,7 @@ def test_register_parquet():
     assert len(batches[0]) == 8
 
 def test_read_dataframe_api():
-    ctx = SessionContext("localhost", 50050)
+    ctx = BallistaBuilder().standalone()
     df = ctx.read_csv("testdata/test.csv", has_header=True) \
         .select_columns('a', 'b') \
         .limit(1)
@@ -67,11 +67,12 @@ def test_read_dataframe_api():
     assert len(batches[0]) == 1
 
 def test_execute_plan():
-    ctx = SessionContext("localhost", 50050)
+    ctx = BallistaBuilder().standalone()
     df = ctx.read_csv("testdata/test.csv", has_header=True) \
         .select_columns('a', 'b') \
         .limit(1)
-    df = ctx.execute_logical_plan(df.logical_plan())
+    # TODO research SessionContext Logical Plan for DataFusionPython
+    #df = ctx.execute_logical_plan(df.logical_plan())
     batches = df.collect()
     assert len(batches) == 1
     assert len(batches[0]) == 1
diff --git a/python/pyballista/__init__.py b/python/examples/example.py
similarity index 61%
rename from python/pyballista/__init__.py
rename to python/examples/example.py
index 62a6bc79..61a9abbd 100644
--- a/python/pyballista/__init__.py
+++ b/python/examples/example.py
@@ -15,22 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from abc import ABCMeta, abstractmethod
-from typing import List
+from ballista import BallistaBuilder
+from datafusion.context import SessionContext
 
-try:
-    import importlib.metadata as importlib_metadata
-except ImportError:
-    import importlib_metadata
+# Ballista will initiate with an empty config
+# set config variables with `config`
+ctx: SessionContext = BallistaBuilder()\
+    .config("ballista.job.name", "example ballista")\
+    .config("ballista.shuffle.partitions", "16")\
+    .standalone()
+    
+#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050)
 
-import pyarrow as pa
-
-from .pyballista_internal import (
-    SessionContext,
-)
-
-__version__ = importlib_metadata.version(__name__)
-
-__all__ = [
-    "SessionContext",
-]
+# Select 1 to verify its working
+ctx.sql("SELECT 1").show()
+#ctx_remote.sql("SELECT 2").show()
\ No newline at end of file
diff --git a/python/pyproject.toml b/python/pyproject.toml
index dbb76e59..2d06b225 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -20,7 +20,7 @@ requires = ["maturin>=0.15,<0.16"]
 build-backend = "maturin"
 
 [project]
-name = "pyballista"
+name = "ballista"
 description = "Python client for Apache Arrow Ballista Distributed SQL Query 
Engine"
 readme = "README.md"
 license = {file = "LICENSE.txt"}
@@ -55,10 +55,10 @@ repository = "https://github.com/apache/arrow-ballista";
 profile = "black"
 
 [tool.maturin]
-module-name = "pyballista.pyballista_internal"
+module-name = "ballista.ballista_internal"
 include = [
     { path = "Cargo.lock", format = "sdist" }
 ]
 exclude = [".github/**", "ci/**", ".asf.yaml"]
 # Require Cargo.lock is up to date
-locked = true
+locked = true
\ No newline at end of file
diff --git a/python/src/context.rs b/python/src/context.rs
deleted file mode 100644
index d27d5314..00000000
--- a/python/src/context.rs
+++ /dev/null
@@ -1,353 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::utils::to_pyerr;
-use datafusion::logical_expr::SortExpr;
-use pyo3::exceptions::PyValueError;
-use pyo3::prelude::*;
-use std::path::PathBuf;
-
-use ballista::prelude::*;
-use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::pyarrow::PyArrowType;
-use datafusion::prelude::*;
-use datafusion_python::catalog::PyTable;
-use datafusion_python::context::{
-    convert_table_partition_cols, parse_file_compression_type,
-};
-use datafusion_python::dataframe::PyDataFrame;
-use datafusion_python::errors::DataFusionError;
-use datafusion_python::expr::sort_expr::PySortExpr;
-use datafusion_python::sql::logical::PyLogicalPlan;
-use datafusion_python::utils::wait_for_future;
-
-/// PyBallista session context. This is largely a duplicate of
-/// DataFusion's PySessionContext, with the main difference being
-/// that this operates on a BallistaContext instead of DataFusion's
-/// SessionContext. We could probably add extra extension points to
-/// DataFusion to allow for a pluggable context and remove much of
-/// this code.
-#[pyclass(name = "SessionContext", module = "pyballista", subclass)]
-pub struct PySessionContext {
-    ctx: BallistaContext,
-}
-
-#[pymethods]
-impl PySessionContext {
-    /// Create a new SessionContext by connecting to a Ballista scheduler 
process.
-    #[new]
-    pub fn new(host: &str, port: u16, py: Python) -> PyResult<Self> {
-        let config = BallistaConfig::default();
-        let ballista_context = BallistaContext::remote(host, port, &config);
-        let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?;
-        Ok(Self { ctx })
-    }
-
-    pub fn sql(&mut self, query: &str, py: Python) -> PyResult<PyDataFrame> {
-        let result = self.ctx.sql(query);
-        let df = wait_for_future(py, result)?;
-        Ok(PyDataFrame::new(df))
-    }
-
-    #[allow(clippy::too_many_arguments)]
-    #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], 
file_extension=".avro"))]
-    pub fn read_avro(
-        &self,
-        path: &str,
-        schema: Option<PyArrowType<Schema>>,
-        table_partition_cols: Vec<(String, String)>,
-        file_extension: &str,
-        py: Python,
-    ) -> PyResult<PyDataFrame> {
-        let mut options = AvroReadOptions::default()
-            
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
-        options.file_extension = file_extension;
-        let df = if let Some(schema) = schema {
-            options.schema = Some(&schema.0);
-            let read_future = self.ctx.read_avro(path, options);
-            wait_for_future(py, read_future).map_err(DataFusionError::from)?
-        } else {
-            let read_future = self.ctx.read_avro(path, options);
-            wait_for_future(py, read_future).map_err(DataFusionError::from)?
-        };
-        Ok(PyDataFrame::new(df))
-    }
-
-    #[allow(clippy::too_many_arguments)]
-    #[pyo3(signature = (
-        path,
-        schema=None,
-        has_header=true,
-        delimiter=",",
-        schema_infer_max_records=1000,
-        file_extension=".csv",
-        table_partition_cols=vec![],
-        file_compression_type=None))]
-    pub fn read_csv(
-        &self,
-        path: PathBuf,
-        schema: Option<PyArrowType<Schema>>,
-        has_header: bool,
-        delimiter: &str,
-        schema_infer_max_records: usize,
-        file_extension: &str,
-        table_partition_cols: Vec<(String, String)>,
-        file_compression_type: Option<String>,
-        py: Python,
-    ) -> PyResult<PyDataFrame> {
-        let path = path
-            .to_str()
-            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a 
string"))?;
-
-        let delimiter = delimiter.as_bytes();
-        if delimiter.len() != 1 {
-            return Err(PyValueError::new_err(
-                "Delimiter must be a single character",
-            ));
-        };
-
-        let mut options = CsvReadOptions::new()
-            .has_header(has_header)
-            .delimiter(delimiter[0])
-            .schema_infer_max_records(schema_infer_max_records)
-            .file_extension(file_extension)
-            
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
-            
.file_compression_type(parse_file_compression_type(file_compression_type)?);
-
-        if let Some(py_schema) = schema {
-            options.schema = Some(&py_schema.0);
-            let result = self.ctx.read_csv(path, options);
-            let df = PyDataFrame::new(wait_for_future(py, result)?);
-            Ok(df)
-        } else {
-            let result = self.ctx.read_csv(path, options);
-            let df = PyDataFrame::new(wait_for_future(py, result)?);
-            Ok(df)
-        }
-    }
-
-    #[allow(clippy::too_many_arguments)]
-    #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, 
file_extension=".json", table_partition_cols=vec![], 
file_compression_type=None))]
-    pub fn read_json(
-        &mut self,
-        path: PathBuf,
-        schema: Option<PyArrowType<Schema>>,
-        schema_infer_max_records: usize,
-        file_extension: &str,
-        table_partition_cols: Vec<(String, String)>,
-        file_compression_type: Option<String>,
-        py: Python,
-    ) -> PyResult<PyDataFrame> {
-        let path = path
-            .to_str()
-            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a 
string"))?;
-        let mut options = NdJsonReadOptions::default()
-            
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
-            
.file_compression_type(parse_file_compression_type(file_compression_type)?);
-        options.schema_infer_max_records = schema_infer_max_records;
-        options.file_extension = file_extension;
-        let df = if let Some(schema) = schema {
-            options.schema = Some(&schema.0);
-            let result = self.ctx.read_json(path, options);
-            wait_for_future(py, result).map_err(DataFusionError::from)?
-        } else {
-            let result = self.ctx.read_json(path, options);
-            wait_for_future(py, result).map_err(DataFusionError::from)?
-        };
-        Ok(PyDataFrame::new(df))
-    }
-
-    #[allow(clippy::too_many_arguments)]
-    #[pyo3(signature = (
-        path,
-        table_partition_cols=vec![],
-        parquet_pruning=true,
-        file_extension=".parquet",
-        skip_metadata=true,
-        schema=None,
-        file_sort_order=None))]
-    pub fn read_parquet(
-        &self,
-        path: &str,
-        table_partition_cols: Vec<(String, String)>,
-        parquet_pruning: bool,
-        file_extension: &str,
-        skip_metadata: bool,
-        schema: Option<PyArrowType<Schema>>,
-        file_sort_order: Option<Vec<Vec<PySortExpr>>>,
-        py: Python,
-    ) -> PyResult<PyDataFrame> {
-        let mut options = ParquetReadOptions::default()
-            
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
-            .parquet_pruning(parquet_pruning)
-            .skip_metadata(skip_metadata);
-        options.file_extension = file_extension;
-        options.schema = schema.as_ref().map(|x| &x.0);
-        options.file_sort_order = file_sort_order
-            .unwrap_or_default()
-            .into_iter()
-            .map(|e| {
-                e.into_iter()
-                    .map(|f| {
-                        let sort_expr: SortExpr = f.into();
-                        sort_expr
-                    })
-                    .collect()
-            })
-            .collect();
-
-        let result = self.ctx.read_parquet(path, options);
-        let df =
-            PyDataFrame::new(wait_for_future(py, 
result).map_err(DataFusionError::from)?);
-        Ok(df)
-    }
-
-    #[allow(clippy::too_many_arguments)]
-    #[pyo3(signature = (name,
-    path,
-    schema=None,
-    file_extension=".avro",
-    table_partition_cols=vec![]))]
-    pub fn register_avro(
-        &mut self,
-        name: &str,
-        path: PathBuf,
-        schema: Option<PyArrowType<Schema>>,
-        file_extension: &str,
-        table_partition_cols: Vec<(String, String)>,
-        py: Python,
-    ) -> PyResult<()> {
-        let path = path
-            .to_str()
-            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a 
string"))?;
-
-        let mut options = AvroReadOptions::default()
-            
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
-        options.file_extension = file_extension;
-        options.schema = schema.as_ref().map(|x| &x.0);
-
-        let result = self.ctx.register_avro(name, path, options);
-        wait_for_future(py, result).map_err(DataFusionError::from)?;
-
-        Ok(())
-    }
-
-    #[allow(clippy::too_many_arguments)]
-    #[pyo3(signature = (name,
-    path,
-    schema=None,
-    has_header=true,
-    delimiter=",",
-    schema_infer_max_records=1000,
-    file_extension=".csv",
-    file_compression_type=None))]
-    pub fn register_csv(
-        &mut self,
-        name: &str,
-        path: PathBuf,
-        schema: Option<PyArrowType<Schema>>,
-        has_header: bool,
-        delimiter: &str,
-        schema_infer_max_records: usize,
-        file_extension: &str,
-        file_compression_type: Option<String>,
-        py: Python,
-    ) -> PyResult<()> {
-        let path = path
-            .to_str()
-            .ok_or_else(|| PyValueError::new_err("Unable to convert path to a 
string"))?;
-        let delimiter = delimiter.as_bytes();
-        if delimiter.len() != 1 {
-            return Err(PyValueError::new_err(
-                "Delimiter must be a single character",
-            ));
-        }
-
-        let mut options = CsvReadOptions::new()
-            .has_header(has_header)
-            .delimiter(delimiter[0])
-            .schema_infer_max_records(schema_infer_max_records)
-            .file_extension(file_extension)
-            
.file_compression_type(parse_file_compression_type(file_compression_type)?);
-        options.schema = schema.as_ref().map(|x| &x.0);
-
-        let result = self.ctx.register_csv(name, path, options);
-        wait_for_future(py, result).map_err(DataFusionError::from)?;
-
-        Ok(())
-    }
-
-    #[allow(clippy::too_many_arguments)]
-    #[pyo3(signature = (name, path, table_partition_cols=vec![],
-    parquet_pruning=true,
-    file_extension=".parquet",
-    skip_metadata=true,
-    schema=None,
-    file_sort_order=None))]
-    pub fn register_parquet(
-        &mut self,
-        name: &str,
-        path: &str,
-        table_partition_cols: Vec<(String, String)>,
-        parquet_pruning: bool,
-        file_extension: &str,
-        skip_metadata: bool,
-        schema: Option<PyArrowType<Schema>>,
-        file_sort_order: Option<Vec<Vec<PySortExpr>>>,
-        py: Python,
-    ) -> PyResult<()> {
-        let mut options = ParquetReadOptions::default()
-            
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
-            .parquet_pruning(parquet_pruning)
-            .skip_metadata(skip_metadata);
-        options.file_extension = file_extension;
-        options.schema = schema.as_ref().map(|x| &x.0);
-        options.file_sort_order = file_sort_order
-            .unwrap_or_default()
-            .into_iter()
-            .map(|e| {
-                e.into_iter()
-                    .map(|f| {
-                        let sort_expr: SortExpr = f.into();
-                        sort_expr
-                    })
-                    .collect()
-            })
-            .collect();
-
-        let result = self.ctx.register_parquet(name, path, options);
-        wait_for_future(py, result).map_err(DataFusionError::from)?;
-        Ok(())
-    }
-
-    pub fn register_table(&mut self, name: &str, table: &PyTable) -> 
PyResult<()> {
-        self.ctx
-            .register_table(name, table.table())
-            .map_err(DataFusionError::from)?;
-        Ok(())
-    }
-
-    pub fn execute_logical_plan(
-        &mut self,
-        logical_plan: PyLogicalPlan,
-        py: Python,
-    ) -> PyResult<PyDataFrame> {
-        let result = self.ctx.execute_logical_plan(logical_plan.into());
-        let df = wait_for_future(py, result).unwrap();
-        Ok(PyDataFrame::new(df))
-    }
-}
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 5fbd2491..41b4b6d3 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -15,18 +15,89 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use ballista::prelude::*;
+use datafusion::execution::SessionStateBuilder;
+use datafusion::prelude::*;
+use datafusion_python::context::PySessionContext;
+use datafusion_python::utils::wait_for_future;
+
+use std::collections::HashMap;
+
 use pyo3::prelude::*;
-pub mod context;
 mod utils;
-
-pub use crate::context::PySessionContext;
+use utils::to_pyerr;
 
 #[pymodule]
-fn pyballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
+fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
     pyo3_log::init();
-    // Ballista structs
-    m.add_class::<PySessionContext>()?;
-    // DataFusion structs
+    // BallistaBuilder struct
+    m.add_class::<PyBallistaBuilder>()?;
+    // DataFusion struct
     m.add_class::<datafusion_python::dataframe::PyDataFrame>()?;
     Ok(())
 }
+
+// Ballista Builder will take a HasMap/Dict Cionfg
+#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)]
+pub struct PyBallistaBuilder {
+    conf: HashMap<String, String>,
+}
+
+#[pymethods]
+impl PyBallistaBuilder {
+    #[new]
+    pub fn new() -> Self {
+        Self {
+            conf: HashMap::new(),
+        }
+    }
+
+    pub fn config(
+        mut slf: PyRefMut<'_, Self>,
+        k: &str,
+        v: &str,
+        py: Python,
+    ) -> PyResult<PyObject> {
+        slf.conf.insert(k.into(), v.into());
+
+        Ok(slf.into_py(py))
+    }
+
+    /// Construct the standalone instance from the SessionContext
+    pub fn standalone(&self, py: Python) -> PyResult<PySessionContext> {
+        // Build the config
+        let config: SessionConfig = 
SessionConfig::from_string_hash_map(&self.conf)?;
+        // Build the state
+        let state = SessionStateBuilder::new()
+            .with_config(config)
+            .with_default_features()
+            .build();
+        // Build the context
+        let standalone_session = SessionContext::standalone_with_state(state);
+
+        // SessionContext is an async function
+        let ctx = wait_for_future(py, standalone_session)?;
+
+        // Convert the SessionContext into a Python SessionContext
+        Ok(ctx.into())
+    }
+
+    /// Construct the remote instance from the SessionContext
+    pub fn remote(&self, url: &str, py: Python) -> PyResult<PySessionContext> {
+        // Build the config
+        let config: SessionConfig = 
SessionConfig::from_string_hash_map(&self.conf)?;
+        // Build the state
+        let state = SessionStateBuilder::new()
+            .with_config(config)
+            .with_default_features()
+            .build();
+        // Build the context
+        let remote_session = SessionContext::remote_with_state(url, state);
+
+        // SessionContext is an async function
+        let ctx = wait_for_future(py, remote_session)?;
+
+        // Convert the SessionContext into a Python SessionContext
+        Ok(ctx.into())
+    }
+}
diff --git a/python/testdata/test.csv b/python/testdata/test.csv
old mode 100644
new mode 100755


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to