This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new d949e5fe added a BallistaContext to ballista to allow for Remote or
standalone (#1100)
d949e5fe is described below
commit d949e5fe33ad22da1388657dc8a6dc5f400023bc
Author: Trevor <[email protected]>
AuthorDate: Mon Nov 18 22:53:59 2024 -0800
added a BallistaContext to ballista to allow for Remote or standalone
(#1100)
* added a pycontext to ballista
* added a pycontext to ballista
* added a pycontext to ballista
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updating the pyballista package to ballista
* changing the packagaing naming convention from pyballista to ballista
* changing the packagaing naming convention from pyballista to ballista
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* updated python to have two static methods for creating a ballista context
* Updating BallistaContext and Config
* Updating BallistaContext and Config
* updated python to have two static methods for creating a ballista context
* Updating BallistaContext and Config, calling it for the night, will
complete tomorrow
* Updating BallistaContext and Config, calling it for the night, will
complete tomorrow
* Adding config to ballista context
* Adding config to ballista context
* Adding config to ballista context
* Adding config to ballista context
* Updated Builder and Docs
* Updated Builder and Docs
* Updated Builder and Docs
* Updated Builder and Docs
* Updated Builder and Docs
* Updated Builder and Docs
---------
Co-authored-by: Trevor Barnes <[email protected]>
---
docs/source/user-guide/python.md | 20 +-
python/Cargo.toml | 10 +-
python/README.md | 4 +-
python/{pyballista => ballista}/__init__.py | 8 +-
python/{pyballista => ballista}/tests/__init__.py | 0
.../{pyballista => ballista}/tests/test_context.py | 21 +-
.../__init__.py => examples/example.py} | 30 +-
python/pyproject.toml | 6 +-
python/src/context.rs | 353 ---------------------
python/src/lib.rs | 85 ++++-
python/testdata/test.csv | 0
11 files changed, 131 insertions(+), 406 deletions(-)
diff --git a/docs/source/user-guide/python.md b/docs/source/user-guide/python.md
index 674850c7..f17ac68d 100644
--- a/docs/source/user-guide/python.md
+++ b/docs/source/user-guide/python.md
@@ -28,9 +28,20 @@ popular file formats files, run it in a distributed
environment, and obtain the
The following code demonstrates how to create a Ballista context and connect
to a scheduler.
+If you are running a standalone cluster (runs locally), all you need to do is
call the stand alone cluster method `standalone()` or your BallistaContext. If
you are running a cluster in remote mode, you need to provide the URL
`Ballista.remote("http://my-remote-ip:50050")`.
+
```text
->>> import ballista
->>> ctx = ballista.BallistaContext("localhost", 50050)
+>>> from ballista import BallistaBuilder
+>>> # for a standalone instance
+>>> # Ballista will initiate with an empty config
+>>> # set config variables with `config()`
+>>> ballista = BallistaBuilder()\
+>>> .config("ballista.job.name", "example ballista")
+>>>
+>>> ctx = ballista.standalone()
+>>>
+>>> # for a remote instance provide the URL
+>>> ctx = ballista.remote("df://url-path-to-scheduler:50050")
```
## SQL
@@ -103,14 +114,15 @@ The `explain` method can be used to show the logical and
physical query plans fo
The following example demonstrates creating arrays with PyArrow and then
creating a Ballista DataFrame.
```python
-import ballista
+from ballista import BallistaBuilder
import pyarrow
# an alias
+# TODO implement Functions
f = ballista.functions
# create a context
-ctx = ballista.BallistaContext("localhost", 50050)
+ctx = Ballista().standalone()
# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
diff --git a/python/Cargo.toml b/python/Cargo.toml
index dbe419d2..b03f1e99 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -26,14 +26,14 @@ readme = "README.md"
license = "Apache-2.0"
edition = "2021"
rust-version = "1.72"
-include = ["/src", "/pyballista", "/LICENSE.txt", "pyproject.toml",
"Cargo.toml", "Cargo.lock"]
+include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml",
"Cargo.toml", "Cargo.lock"]
publish = false
[dependencies]
async-trait = "0.1.77"
-ballista = { path = "../ballista/client", version = "0.12.0" }
+ballista = { path = "../ballista/client", version = "0.12.0", features =
["standalone"] }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
-datafusion = { version = "42" }
+datafusion = { version = "42", features = ["pyarrow", "avro"] }
datafusion-proto = { version = "42" }
datafusion-python = { version = "42" }
@@ -43,6 +43,4 @@ tokio = { version = "1.35", features = ["macros", "rt",
"rt-multi-thread", "sync
[lib]
crate-type = ["cdylib"]
-name = "pyballista"
-
-
+name = "ballista"
diff --git a/python/README.md b/python/README.md
index 2898cb16..01b0a7f9 100644
--- a/python/README.md
+++ b/python/README.md
@@ -29,8 +29,8 @@ part of the default Cargo workspace so that it doesn't cause
overhead for mainta
Creates a new context and connects to a Ballista scheduler process.
```python
-from pyballista import SessionContext
->>> ctx = SessionContext("localhost", 50050)
+from ballista import BallistaBuilder
+>>> ctx = BallistaBuilder().standalone()
```
## Example SQL Usage
diff --git a/python/pyballista/__init__.py b/python/ballista/__init__.py
similarity index 92%
copy from python/pyballista/__init__.py
copy to python/ballista/__init__.py
index 62a6bc79..a143f17e 100644
--- a/python/pyballista/__init__.py
+++ b/python/ballista/__init__.py
@@ -25,12 +25,12 @@ except ImportError:
import pyarrow as pa
-from .pyballista_internal import (
- SessionContext,
+from .ballista_internal import (
+ BallistaBuilder,
)
__version__ = importlib_metadata.version(__name__)
__all__ = [
- "SessionContext",
-]
+ "BallistaBuilder",
+]
\ No newline at end of file
diff --git a/python/pyballista/tests/__init__.py
b/python/ballista/tests/__init__.py
similarity index 100%
rename from python/pyballista/tests/__init__.py
rename to python/ballista/tests/__init__.py
diff --git a/python/pyballista/tests/test_context.py
b/python/ballista/tests/test_context.py
similarity index 81%
rename from python/pyballista/tests/test_context.py
rename to python/ballista/tests/test_context.py
index b440bb27..a0af9592 100644
--- a/python/pyballista/tests/test_context.py
+++ b/python/ballista/tests/test_context.py
@@ -15,27 +15,27 @@
# specific language governing permissions and limitations
# under the License.
-from pyballista import SessionContext
+from ballista import BallistaBuilder
import pytest
def test_create_context():
- SessionContext("localhost", 50050)
+ BallistaBuilder().standalone()
def test_select_one():
- ctx = SessionContext("localhost", 50050)
+ ctx = BallistaBuilder().standalone()
df = ctx.sql("SELECT 1")
batches = df.collect()
assert len(batches) == 1
def test_read_csv():
- ctx = SessionContext("localhost", 50050)
+ ctx = BallistaBuilder().standalone()
df = ctx.read_csv("testdata/test.csv", has_header=True)
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 1
def test_register_csv():
- ctx = SessionContext("localhost", 50050)
+ ctx = BallistaBuilder().standalone()
ctx.register_csv("test", "testdata/test.csv", has_header=True)
df = ctx.sql("SELECT * FROM test")
batches = df.collect()
@@ -43,14 +43,14 @@ def test_register_csv():
assert len(batches[0]) == 1
def test_read_parquet():
- ctx = SessionContext("localhost", 50050)
+ ctx = BallistaBuilder().standalone()
df = ctx.read_parquet("testdata/test.parquet")
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 8
def test_register_parquet():
- ctx = SessionContext("localhost", 50050)
+ ctx = BallistaBuilder().standalone()
ctx.register_parquet("test", "testdata/test.parquet")
df = ctx.sql("SELECT * FROM test")
batches = df.collect()
@@ -58,7 +58,7 @@ def test_register_parquet():
assert len(batches[0]) == 8
def test_read_dataframe_api():
- ctx = SessionContext("localhost", 50050)
+ ctx = BallistaBuilder().standalone()
df = ctx.read_csv("testdata/test.csv", has_header=True) \
.select_columns('a', 'b') \
.limit(1)
@@ -67,11 +67,12 @@ def test_read_dataframe_api():
assert len(batches[0]) == 1
def test_execute_plan():
- ctx = SessionContext("localhost", 50050)
+ ctx = BallistaBuilder().standalone()
df = ctx.read_csv("testdata/test.csv", has_header=True) \
.select_columns('a', 'b') \
.limit(1)
- df = ctx.execute_logical_plan(df.logical_plan())
+ # TODO research SessionContext Logical Plan for DataFusionPython
+ #df = ctx.execute_logical_plan(df.logical_plan())
batches = df.collect()
assert len(batches) == 1
assert len(batches[0]) == 1
diff --git a/python/pyballista/__init__.py b/python/examples/example.py
similarity index 61%
rename from python/pyballista/__init__.py
rename to python/examples/example.py
index 62a6bc79..61a9abbd 100644
--- a/python/pyballista/__init__.py
+++ b/python/examples/example.py
@@ -15,22 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-from abc import ABCMeta, abstractmethod
-from typing import List
+from ballista import BallistaBuilder
+from datafusion.context import SessionContext
-try:
- import importlib.metadata as importlib_metadata
-except ImportError:
- import importlib_metadata
+# Ballista will initiate with an empty config
+# set config variables with `config`
+ctx: SessionContext = BallistaBuilder()\
+ .config("ballista.job.name", "example ballista")\
+ .config("ballista.shuffle.partitions", "16")\
+ .standalone()
+
+#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050)
-import pyarrow as pa
-
-from .pyballista_internal import (
- SessionContext,
-)
-
-__version__ = importlib_metadata.version(__name__)
-
-__all__ = [
- "SessionContext",
-]
+# Select 1 to verify its working
+ctx.sql("SELECT 1").show()
+#ctx_remote.sql("SELECT 2").show()
\ No newline at end of file
diff --git a/python/pyproject.toml b/python/pyproject.toml
index dbb76e59..2d06b225 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -20,7 +20,7 @@ requires = ["maturin>=0.15,<0.16"]
build-backend = "maturin"
[project]
-name = "pyballista"
+name = "ballista"
description = "Python client for Apache Arrow Ballista Distributed SQL Query
Engine"
readme = "README.md"
license = {file = "LICENSE.txt"}
@@ -55,10 +55,10 @@ repository = "https://github.com/apache/arrow-ballista"
profile = "black"
[tool.maturin]
-module-name = "pyballista.pyballista_internal"
+module-name = "ballista.ballista_internal"
include = [
{ path = "Cargo.lock", format = "sdist" }
]
exclude = [".github/**", "ci/**", ".asf.yaml"]
# Require Cargo.lock is up to date
-locked = true
+locked = true
\ No newline at end of file
diff --git a/python/src/context.rs b/python/src/context.rs
deleted file mode 100644
index d27d5314..00000000
--- a/python/src/context.rs
+++ /dev/null
@@ -1,353 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::utils::to_pyerr;
-use datafusion::logical_expr::SortExpr;
-use pyo3::exceptions::PyValueError;
-use pyo3::prelude::*;
-use std::path::PathBuf;
-
-use ballista::prelude::*;
-use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::pyarrow::PyArrowType;
-use datafusion::prelude::*;
-use datafusion_python::catalog::PyTable;
-use datafusion_python::context::{
- convert_table_partition_cols, parse_file_compression_type,
-};
-use datafusion_python::dataframe::PyDataFrame;
-use datafusion_python::errors::DataFusionError;
-use datafusion_python::expr::sort_expr::PySortExpr;
-use datafusion_python::sql::logical::PyLogicalPlan;
-use datafusion_python::utils::wait_for_future;
-
-/// PyBallista session context. This is largely a duplicate of
-/// DataFusion's PySessionContext, with the main difference being
-/// that this operates on a BallistaContext instead of DataFusion's
-/// SessionContext. We could probably add extra extension points to
-/// DataFusion to allow for a pluggable context and remove much of
-/// this code.
-#[pyclass(name = "SessionContext", module = "pyballista", subclass)]
-pub struct PySessionContext {
- ctx: BallistaContext,
-}
-
-#[pymethods]
-impl PySessionContext {
- /// Create a new SessionContext by connecting to a Ballista scheduler
process.
- #[new]
- pub fn new(host: &str, port: u16, py: Python) -> PyResult<Self> {
- let config = BallistaConfig::default();
- let ballista_context = BallistaContext::remote(host, port, &config);
- let ctx = wait_for_future(py, ballista_context).map_err(to_pyerr)?;
- Ok(Self { ctx })
- }
-
- pub fn sql(&mut self, query: &str, py: Python) -> PyResult<PyDataFrame> {
- let result = self.ctx.sql(query);
- let df = wait_for_future(py, result)?;
- Ok(PyDataFrame::new(df))
- }
-
- #[allow(clippy::too_many_arguments)]
- #[pyo3(signature = (path, schema=None, table_partition_cols=vec![],
file_extension=".avro"))]
- pub fn read_avro(
- &self,
- path: &str,
- schema: Option<PyArrowType<Schema>>,
- table_partition_cols: Vec<(String, String)>,
- file_extension: &str,
- py: Python,
- ) -> PyResult<PyDataFrame> {
- let mut options = AvroReadOptions::default()
-
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
- options.file_extension = file_extension;
- let df = if let Some(schema) = schema {
- options.schema = Some(&schema.0);
- let read_future = self.ctx.read_avro(path, options);
- wait_for_future(py, read_future).map_err(DataFusionError::from)?
- } else {
- let read_future = self.ctx.read_avro(path, options);
- wait_for_future(py, read_future).map_err(DataFusionError::from)?
- };
- Ok(PyDataFrame::new(df))
- }
-
- #[allow(clippy::too_many_arguments)]
- #[pyo3(signature = (
- path,
- schema=None,
- has_header=true,
- delimiter=",",
- schema_infer_max_records=1000,
- file_extension=".csv",
- table_partition_cols=vec![],
- file_compression_type=None))]
- pub fn read_csv(
- &self,
- path: PathBuf,
- schema: Option<PyArrowType<Schema>>,
- has_header: bool,
- delimiter: &str,
- schema_infer_max_records: usize,
- file_extension: &str,
- table_partition_cols: Vec<(String, String)>,
- file_compression_type: Option<String>,
- py: Python,
- ) -> PyResult<PyDataFrame> {
- let path = path
- .to_str()
- .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
-
- let delimiter = delimiter.as_bytes();
- if delimiter.len() != 1 {
- return Err(PyValueError::new_err(
- "Delimiter must be a single character",
- ));
- };
-
- let mut options = CsvReadOptions::new()
- .has_header(has_header)
- .delimiter(delimiter[0])
- .schema_infer_max_records(schema_infer_max_records)
- .file_extension(file_extension)
-
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
-
.file_compression_type(parse_file_compression_type(file_compression_type)?);
-
- if let Some(py_schema) = schema {
- options.schema = Some(&py_schema.0);
- let result = self.ctx.read_csv(path, options);
- let df = PyDataFrame::new(wait_for_future(py, result)?);
- Ok(df)
- } else {
- let result = self.ctx.read_csv(path, options);
- let df = PyDataFrame::new(wait_for_future(py, result)?);
- Ok(df)
- }
- }
-
- #[allow(clippy::too_many_arguments)]
- #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000,
file_extension=".json", table_partition_cols=vec![],
file_compression_type=None))]
- pub fn read_json(
- &mut self,
- path: PathBuf,
- schema: Option<PyArrowType<Schema>>,
- schema_infer_max_records: usize,
- file_extension: &str,
- table_partition_cols: Vec<(String, String)>,
- file_compression_type: Option<String>,
- py: Python,
- ) -> PyResult<PyDataFrame> {
- let path = path
- .to_str()
- .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
- let mut options = NdJsonReadOptions::default()
-
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
-
.file_compression_type(parse_file_compression_type(file_compression_type)?);
- options.schema_infer_max_records = schema_infer_max_records;
- options.file_extension = file_extension;
- let df = if let Some(schema) = schema {
- options.schema = Some(&schema.0);
- let result = self.ctx.read_json(path, options);
- wait_for_future(py, result).map_err(DataFusionError::from)?
- } else {
- let result = self.ctx.read_json(path, options);
- wait_for_future(py, result).map_err(DataFusionError::from)?
- };
- Ok(PyDataFrame::new(df))
- }
-
- #[allow(clippy::too_many_arguments)]
- #[pyo3(signature = (
- path,
- table_partition_cols=vec![],
- parquet_pruning=true,
- file_extension=".parquet",
- skip_metadata=true,
- schema=None,
- file_sort_order=None))]
- pub fn read_parquet(
- &self,
- path: &str,
- table_partition_cols: Vec<(String, String)>,
- parquet_pruning: bool,
- file_extension: &str,
- skip_metadata: bool,
- schema: Option<PyArrowType<Schema>>,
- file_sort_order: Option<Vec<Vec<PySortExpr>>>,
- py: Python,
- ) -> PyResult<PyDataFrame> {
- let mut options = ParquetReadOptions::default()
-
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
- .parquet_pruning(parquet_pruning)
- .skip_metadata(skip_metadata);
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
- options.file_sort_order = file_sort_order
- .unwrap_or_default()
- .into_iter()
- .map(|e| {
- e.into_iter()
- .map(|f| {
- let sort_expr: SortExpr = f.into();
- sort_expr
- })
- .collect()
- })
- .collect();
-
- let result = self.ctx.read_parquet(path, options);
- let df =
- PyDataFrame::new(wait_for_future(py,
result).map_err(DataFusionError::from)?);
- Ok(df)
- }
-
- #[allow(clippy::too_many_arguments)]
- #[pyo3(signature = (name,
- path,
- schema=None,
- file_extension=".avro",
- table_partition_cols=vec![]))]
- pub fn register_avro(
- &mut self,
- name: &str,
- path: PathBuf,
- schema: Option<PyArrowType<Schema>>,
- file_extension: &str,
- table_partition_cols: Vec<(String, String)>,
- py: Python,
- ) -> PyResult<()> {
- let path = path
- .to_str()
- .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
-
- let mut options = AvroReadOptions::default()
-
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?);
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
-
- let result = self.ctx.register_avro(name, path, options);
- wait_for_future(py, result).map_err(DataFusionError::from)?;
-
- Ok(())
- }
-
- #[allow(clippy::too_many_arguments)]
- #[pyo3(signature = (name,
- path,
- schema=None,
- has_header=true,
- delimiter=",",
- schema_infer_max_records=1000,
- file_extension=".csv",
- file_compression_type=None))]
- pub fn register_csv(
- &mut self,
- name: &str,
- path: PathBuf,
- schema: Option<PyArrowType<Schema>>,
- has_header: bool,
- delimiter: &str,
- schema_infer_max_records: usize,
- file_extension: &str,
- file_compression_type: Option<String>,
- py: Python,
- ) -> PyResult<()> {
- let path = path
- .to_str()
- .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
- let delimiter = delimiter.as_bytes();
- if delimiter.len() != 1 {
- return Err(PyValueError::new_err(
- "Delimiter must be a single character",
- ));
- }
-
- let mut options = CsvReadOptions::new()
- .has_header(has_header)
- .delimiter(delimiter[0])
- .schema_infer_max_records(schema_infer_max_records)
- .file_extension(file_extension)
-
.file_compression_type(parse_file_compression_type(file_compression_type)?);
- options.schema = schema.as_ref().map(|x| &x.0);
-
- let result = self.ctx.register_csv(name, path, options);
- wait_for_future(py, result).map_err(DataFusionError::from)?;
-
- Ok(())
- }
-
- #[allow(clippy::too_many_arguments)]
- #[pyo3(signature = (name, path, table_partition_cols=vec![],
- parquet_pruning=true,
- file_extension=".parquet",
- skip_metadata=true,
- schema=None,
- file_sort_order=None))]
- pub fn register_parquet(
- &mut self,
- name: &str,
- path: &str,
- table_partition_cols: Vec<(String, String)>,
- parquet_pruning: bool,
- file_extension: &str,
- skip_metadata: bool,
- schema: Option<PyArrowType<Schema>>,
- file_sort_order: Option<Vec<Vec<PySortExpr>>>,
- py: Python,
- ) -> PyResult<()> {
- let mut options = ParquetReadOptions::default()
-
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
- .parquet_pruning(parquet_pruning)
- .skip_metadata(skip_metadata);
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
- options.file_sort_order = file_sort_order
- .unwrap_or_default()
- .into_iter()
- .map(|e| {
- e.into_iter()
- .map(|f| {
- let sort_expr: SortExpr = f.into();
- sort_expr
- })
- .collect()
- })
- .collect();
-
- let result = self.ctx.register_parquet(name, path, options);
- wait_for_future(py, result).map_err(DataFusionError::from)?;
- Ok(())
- }
-
- pub fn register_table(&mut self, name: &str, table: &PyTable) ->
PyResult<()> {
- self.ctx
- .register_table(name, table.table())
- .map_err(DataFusionError::from)?;
- Ok(())
- }
-
- pub fn execute_logical_plan(
- &mut self,
- logical_plan: PyLogicalPlan,
- py: Python,
- ) -> PyResult<PyDataFrame> {
- let result = self.ctx.execute_logical_plan(logical_plan.into());
- let df = wait_for_future(py, result).unwrap();
- Ok(PyDataFrame::new(df))
- }
-}
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 5fbd2491..41b4b6d3 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -15,18 +15,89 @@
// specific language governing permissions and limitations
// under the License.
+use ballista::prelude::*;
+use datafusion::execution::SessionStateBuilder;
+use datafusion::prelude::*;
+use datafusion_python::context::PySessionContext;
+use datafusion_python::utils::wait_for_future;
+
+use std::collections::HashMap;
+
use pyo3::prelude::*;
-pub mod context;
mod utils;
-
-pub use crate::context::PySessionContext;
+use utils::to_pyerr;
#[pymodule]
-fn pyballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
+fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
pyo3_log::init();
- // Ballista structs
- m.add_class::<PySessionContext>()?;
- // DataFusion structs
+ // BallistaBuilder struct
+ m.add_class::<PyBallistaBuilder>()?;
+ // DataFusion struct
m.add_class::<datafusion_python::dataframe::PyDataFrame>()?;
Ok(())
}
+
+// Ballista Builder will take a HasMap/Dict Cionfg
+#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)]
+pub struct PyBallistaBuilder {
+ conf: HashMap<String, String>,
+}
+
+#[pymethods]
+impl PyBallistaBuilder {
+ #[new]
+ pub fn new() -> Self {
+ Self {
+ conf: HashMap::new(),
+ }
+ }
+
+ pub fn config(
+ mut slf: PyRefMut<'_, Self>,
+ k: &str,
+ v: &str,
+ py: Python,
+ ) -> PyResult<PyObject> {
+ slf.conf.insert(k.into(), v.into());
+
+ Ok(slf.into_py(py))
+ }
+
+ /// Construct the standalone instance from the SessionContext
+ pub fn standalone(&self, py: Python) -> PyResult<PySessionContext> {
+ // Build the config
+ let config: SessionConfig =
SessionConfig::from_string_hash_map(&self.conf)?;
+ // Build the state
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
+ // Build the context
+ let standalone_session = SessionContext::standalone_with_state(state);
+
+ // SessionContext is an async function
+ let ctx = wait_for_future(py, standalone_session)?;
+
+ // Convert the SessionContext into a Python SessionContext
+ Ok(ctx.into())
+ }
+
+ /// Construct the remote instance from the SessionContext
+ pub fn remote(&self, url: &str, py: Python) -> PyResult<PySessionContext> {
+ // Build the config
+ let config: SessionConfig =
SessionConfig::from_string_hash_map(&self.conf)?;
+ // Build the state
+ let state = SessionStateBuilder::new()
+ .with_config(config)
+ .with_default_features()
+ .build();
+ // Build the context
+ let remote_session = SessionContext::remote_with_state(url, state);
+
+ // SessionContext is an async function
+ let ctx = wait_for_future(py, remote_session)?;
+
+ // Convert the SessionContext into a Python SessionContext
+ Ok(ctx.into())
+ }
+}
diff --git a/python/testdata/test.csv b/python/testdata/test.csv
old mode 100644
new mode 100755
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]