This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4ab0dbe Table scan bindings (#178)
4ab0dbe is described below
commit 4ab0dbecd8329cd30c013d8f1cfe4f00adade763
Author: Jeremy Dyer <[email protected]>
AuthorDate: Tue Feb 14 19:03:51 2023 -0500
Table scan bindings (#178)
---
Cargo.lock | 56 +++++------------
Cargo.toml | 1 +
datafusion/__init__.py | 13 ++--
datafusion/expr.py | 23 +++++++
datafusion/tests/test_context.py | 3 +-
datafusion/tests/test_dataframe.py | 4 +-
datafusion/tests/test_imports.py | 12 +++-
docs/source/api.rst | 2 +-
docs/source/api/expression.rst | 4 +-
src/dataframe.rs | 2 +-
src/{expression.rs => expr.rs} | 13 +++-
src/expr/table_scan.rs | 121 +++++++++++++++++++++++++++++++++++++
src/functions.rs | 8 +--
src/lib.rs | 8 ++-
src/udaf.rs | 2 +-
src/udf.rs | 2 +-
16 files changed, 210 insertions(+), 64 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 2d6fffb..7c61090 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -423,18 +423,6 @@ dependencies = [
"alloc-stdlib",
]
-[[package]]
-name = "bstr"
-version = "0.2.17"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
-dependencies = [
- "lazy_static",
- "memchr",
- "regex-automata",
- "serde",
-]
-
[[package]]
name = "bumpalo"
version = "3.12.0"
@@ -596,13 +584,12 @@ dependencies = [
[[package]]
name = "csv"
-version = "1.1.6"
+version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
+checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359"
dependencies = [
- "bstr",
"csv-core",
- "itoa 0.4.8",
+ "itoa",
"ryu",
"serde",
]
@@ -808,6 +795,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-optimizer",
+ "datafusion-sql",
"datafusion-substrait",
"futures",
"mimalloc",
@@ -903,9 +891,9 @@ dependencies = [
[[package]]
name = "fastrand"
-version = "1.8.0"
+version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499"
+checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
dependencies = [
"instant",
]
@@ -1152,7 +1140,7 @@ checksum =
"75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes",
"fnv",
- "itoa 1.0.5",
+ "itoa",
]
[[package]]
@@ -1193,7 +1181,7 @@ dependencies = [
"http-body",
"httparse",
"httpdate",
- "itoa 1.0.5",
+ "itoa",
"pin-project-lite",
"socket2",
"tokio",
@@ -1295,12 +1283,6 @@ dependencies = [
"either",
]
-[[package]]
-name = "itoa"
-version = "0.4.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
-
[[package]]
name = "itoa"
version = "1.0.5"
@@ -1546,14 +1528,14 @@ dependencies = [
[[package]]
name = "mio"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de"
+checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
- "windows-sys 0.42.0",
+ "windows-sys 0.45.0",
]
[[package]]
@@ -1679,9 +1661,9 @@ dependencies = [
[[package]]
name = "once_cell"
-version = "1.17.0"
+version = "1.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
+checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
[[package]]
name = "ordered-float"
@@ -2060,12 +2042,6 @@ dependencies = [
"regex-syntax",
]
-[[package]]
-name = "regex-automata"
-version = "0.1.10"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
-
[[package]]
name = "regex-syntax"
version = "0.6.28"
@@ -2329,7 +2305,7 @@ version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
dependencies = [
- "itoa 1.0.5",
+ "itoa",
"ryu",
"serde",
]
@@ -2352,7 +2328,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
- "itoa 1.0.5",
+ "itoa",
"ryu",
"serde",
]
@@ -2364,7 +2340,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb06d4b6cdaef0e0c51fa881acb721bed3c924cfaa71d9c94a3b771dfdf6567"
dependencies = [
"indexmap",
- "itoa 1.0.5",
+ "itoa",
"ryu",
"serde",
"unsafe-libyaml",
diff --git a/Cargo.toml b/Cargo.toml
index 642da2d..08c5640 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -38,6 +38,7 @@ datafusion = { version = "18.0.0", features = ["pyarrow",
"avro"] }
datafusion-expr = "18.0.0"
datafusion-optimizer = "18.0.0"
datafusion-common = { version = "18.0.0", features = ["pyarrow"] }
+datafusion-sql = "18.0.0"
datafusion-substrait = "18.0.0"
uuid = { version = "1.2", features = ["v4"] }
mimalloc = { version = "*", optional = true, default-features = false }
diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index ddab950..0b4e896 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -32,10 +32,14 @@ from ._internal import (
SessionContext,
SessionConfig,
RuntimeConfig,
- Expression,
ScalarUDF,
)
+from .expr import (
+ Expr,
+ TableScan,
+)
+
__version__ = importlib_metadata.version(__name__)
__all__ = [
@@ -44,11 +48,12 @@ __all__ = [
"SessionContext",
"SessionConfig",
"RuntimeConfig",
- "Expression",
+ "Expr",
"AggregateUDF",
"ScalarUDF",
"column",
"literal",
+ "TableScan",
]
@@ -71,7 +76,7 @@ class Accumulator(metaclass=ABCMeta):
def column(value):
- return Expression.column(value)
+ return Expr.column(value)
col = column
@@ -80,7 +85,7 @@ col = column
def literal(value):
if not isinstance(value, pa.Scalar):
value = pa.scalar(value)
- return Expression.literal(value)
+ return Expr.literal(value)
lit = literal
diff --git a/datafusion/expr.py b/datafusion/expr.py
new file mode 100644
index 0000000..e914b85
--- /dev/null
+++ b/datafusion/expr.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from ._internal import expr
+
+
+def __getattr__(name):
+ return getattr(expr, name)
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 48d41c1..6faffaf 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -172,7 +172,8 @@ def test_dataset_filter_nested_data(ctx):
df = ctx.table("t")
- # This filter will not be pushed down to DatasetExec since it isn't
supported
+ # This filter will not be pushed down to DatasetExec since it
+ # isn't supported
df = df.select(
column("nested_data")["a"] + column("nested_data")["b"],
column("nested_data")["a"] - column("nested_data")["b"],
diff --git a/datafusion/tests/test_dataframe.py
b/datafusion/tests/test_dataframe.py
index 4d70845..30327ee 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -314,8 +314,8 @@ def test_execution_plan(aggregate_df):
indent = plan.display_indent()
- # indent plan will be different for everyone due to absolute path to
filename, so
- # we just check for some expected content
+ # indent plan will be different for everyone due to absolute path
+ # to filename, so we just check for some expected content
assert "ProjectionExec:" in indent
assert "AggregateExec:" in indent
assert "CoalesceBatchesExec:" in indent
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index f571bc4..1e8c796 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -22,11 +22,15 @@ from datafusion import (
AggregateUDF,
DataFrame,
SessionContext,
- Expression,
ScalarUDF,
functions,
)
+from datafusion.expr import (
+ Expr,
+ TableScan,
+)
+
def test_import_datafusion():
assert datafusion.__name__ == "datafusion"
@@ -39,13 +43,15 @@ def test_datafusion_python_version():
def test_class_module_is_datafusion():
for klass in [
SessionContext,
- Expression,
DataFrame,
ScalarUDF,
AggregateUDF,
]:
assert klass.__module__ == "datafusion"
+ for klass in [Expr, TableScan]:
+ assert klass.__module__ == "datafusion.expr"
+
def test_import_from_functions_submodule():
from datafusion.functions import abs, sin # noqa
@@ -62,7 +68,7 @@ def test_classes_are_inheritable():
class MyExecContext(SessionContext):
pass
- class MyExpression(Expression):
+ class MyExpression(Expr):
pass
class MyDataFrame(DataFrame):
diff --git a/docs/source/api.rst b/docs/source/api.rst
index a5d6543..a3e7e24 100644
--- a/docs/source/api.rst
+++ b/docs/source/api.rst
@@ -27,6 +27,6 @@ API Reference
api/config
api/dataframe
api/execution_context
- api/expression
+ api/expr
api/functions
api/object_store
diff --git a/docs/source/api/expression.rst b/docs/source/api/expression.rst
index 45923fb..30137d1 100644
--- a/docs/source/api/expression.rst
+++ b/docs/source/api/expression.rst
@@ -18,10 +18,10 @@
.. _api.expression:
.. currentmodule:: datafusion
-Expression
+Expr
==========
.. autosummary::
:toctree: ../generated/
- Expression
+ Expr
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 0f73757..4b9fbca 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -18,7 +18,7 @@
use crate::physical_plan::PyExecutionPlan;
use crate::sql::logical::PyLogicalPlan;
use crate::utils::wait_for_future;
-use crate::{errors::DataFusionError, expression::PyExpr};
+use crate::{errors::DataFusionError, expr::PyExpr};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException,
PyArrowType};
use datafusion::arrow::util::pretty;
diff --git a/src/expression.rs b/src/expr.rs
similarity index 90%
rename from src/expression.rs
rename to src/expr.rs
index 588b492..dceedc1 100644
--- a/src/expression.rs
+++ b/src/expr.rs
@@ -24,8 +24,10 @@ use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField};
use datafusion::scalar::ScalarValue;
-/// An PyExpr that can be used on a DataFrame
-#[pyclass(name = "Expression", module = "datafusion", subclass)]
+pub mod table_scan;
+
+/// A PyExpr that can be used on a DataFrame
+#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
#[derive(Debug, Clone)]
pub(crate) struct PyExpr {
pub(crate) expr: Expr,
@@ -133,3 +135,10 @@ impl PyExpr {
expr.into()
}
}
+
+/// Initializes the `expr` module to match the pattern of `datafusion-expr`
https://docs.rs/datafusion-expr/latest/datafusion_expr/
+pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
+ m.add_class::<PyExpr>()?;
+ m.add_class::<table_scan::PyTableScan>()?;
+ Ok(())
+}
diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs
new file mode 100644
index 0000000..bc7d68a
--- /dev/null
+++ b/src/expr/table_scan.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::TableScan;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::expr::PyExpr;
+
+#[pyclass(name = "TableScan", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyTableScan {
+ table_scan: TableScan,
+}
+
+impl From<PyTableScan> for TableScan {
+ fn from(tbl_scan: PyTableScan) -> TableScan {
+ tbl_scan.table_scan
+ }
+}
+
+impl From<TableScan> for PyTableScan {
+ fn from(table_scan: TableScan) -> PyTableScan {
+ PyTableScan { table_scan }
+ }
+}
+
+impl Display for PyTableScan {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "TableScan\nTable Name: {}
+ \nProjections: {:?}
+ \nProjected Schema: {:?}
+ \nFilters: {:?}",
+ &self.table_scan.table_name,
+ &self.py_projections(),
+ self.table_scan.projected_schema,
+ self.py_filters(),
+ )
+ }
+}
+
+#[pymethods]
+impl PyTableScan {
+ /// Retrieves the name of the table represented by this `TableScan`
instance
+ #[pyo3(name = "table_name")]
+ fn py_table_name(&self) -> PyResult<&str> {
+ Ok(&self.table_scan.table_name)
+ }
+
+ /// TODO: Bindings for `TableSource` need to exist first. Left as a
+ /// placeholder to display intention to add when able to.
+ // #[pyo3(name = "source")]
+ // fn py_source(&self) -> PyResult<Arc<dyn TableSource>> {
+ // Ok(self.table_scan.source)
+ // }
+
+ /// The column indexes that should be. Note if this is empty then
+ /// all columns should be read by the `TableProvider`. This function
+ /// provides a Tuple of the (index, column_name) to make things simplier
+ /// for the calling code since often times the name is preferred to
+ /// the index which is a lower level abstraction.
+ #[pyo3(name = "projection")]
+ fn py_projections(&self) -> PyResult<Vec<(usize, String)>> {
+ match &self.table_scan.projection {
+ Some(indices) => {
+ let schema = self.table_scan.source.schema();
+ Ok(indices
+ .iter()
+ .map(|i| (*i, schema.field(*i).name().to_string()))
+ .collect())
+ }
+ None => Ok(vec![]),
+ }
+ }
+
+ /// TODO: Bindings for `DFSchema` need to exist first. Left as a
+ /// placeholder to display intention to add when able to.
+ // /// Resulting schema from the `TableScan` operation
+ // #[pyo3(name = "projectedSchema")]
+ // fn py_projected_schema(&self) -> PyResult<DFSchemaRef> {
+ // Ok(self.table_scan.projected_schema)
+ // }
+
+ /// Certain `TableProvider` physical readers offer the capability to
filter rows that
+ /// are read at read time. These `filters` are contained here.
+ #[pyo3(name = "filters")]
+ fn py_filters(&self) -> PyResult<Vec<PyExpr>> {
+ Ok(self
+ .table_scan
+ .filters
+ .iter()
+ .map(|expr| PyExpr::from(expr.clone()))
+ .collect())
+ }
+
+ /// Optional number of rows that should be read at read time by the
`TableProvider`
+ #[pyo3(name = "fetch")]
+ fn py_fetch(&self) -> PyResult<Option<usize>> {
+ Ok(self.table_scan.fetch)
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("TableScan({})", self))
+ }
+}
diff --git a/src/functions.rs b/src/functions.rs
index 5cabb12..8acffeb 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -24,7 +24,7 @@ use datafusion_expr::window_function::find_df_window_func;
use datafusion_expr::{aggregate_function, lit, BuiltinScalarFunction, Expr,
WindowFrame};
use crate::errors::DataFusionError;
-use crate::expression::PyExpr;
+use crate::expr::PyExpr;
#[pyfunction]
fn in_list(expr: PyExpr, value: Vec<PyExpr>, negated: bool) -> PyExpr {
@@ -66,7 +66,7 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) ->
PyResult<PyExpr> {
Ok(datafusion_expr::concat_ws(lit(sep), args).into())
}
-/// Creates a new Sort expression
+/// Creates a new Sort Expr
#[pyfunction]
fn order_by(expr: PyExpr, asc: Option<bool>, nulls_first: Option<bool>) ->
PyResult<PyExpr> {
Ok(PyExpr {
@@ -78,7 +78,7 @@ fn order_by(expr: PyExpr, asc: Option<bool>, nulls_first:
Option<bool>) -> PyRes
})
}
-/// Creates a new Alias expression
+/// Creates a new Alias Expr
#[pyfunction]
fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
Ok(PyExpr {
@@ -86,7 +86,7 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
})
}
-/// Create a column reference expression
+/// Create a column reference Expr
#[pyfunction]
fn col(name: &str) -> PyResult<PyExpr> {
Ok(PyExpr {
diff --git a/src/lib.rs b/src/lib.rs
index 5391de5..b16ef75 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -32,7 +32,7 @@ mod dataset;
mod dataset_exec;
pub mod errors;
#[allow(clippy::borrow_deref_ref)]
-mod expression;
+mod expr;
#[allow(clippy::borrow_deref_ref)]
mod functions;
pub mod physical_plan;
@@ -64,13 +64,17 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<context::PySessionConfig>()?;
m.add_class::<context::PySessionContext>()?;
m.add_class::<dataframe::PyDataFrame>()?;
- m.add_class::<expression::PyExpr>()?;
m.add_class::<udf::PyScalarUDF>()?;
m.add_class::<udaf::PyAggregateUDF>()?;
m.add_class::<config::PyConfig>()?;
m.add_class::<sql::logical::PyLogicalPlan>()?;
m.add_class::<physical_plan::PyExecutionPlan>()?;
+ // Register `expr` as a submodule. Matching `datafusion-expr`
https://docs.rs/datafusion-expr/latest/datafusion_expr/
+ let expr = PyModule::new(py, "expr")?;
+ expr::init_module(expr)?;
+ m.add_submodule(expr)?;
+
// Register the functions as a submodule
let funcs = PyModule::new(py, "functions")?;
functions::init_module(funcs)?;
diff --git a/src/udaf.rs b/src/udaf.rs
index 7a8bd49..d5866f8 100644
--- a/src/udaf.rs
+++ b/src/udaf.rs
@@ -26,7 +26,7 @@ use datafusion::common::ScalarValue;
use datafusion::error::{DataFusionError, Result};
use datafusion_expr::{create_udaf, Accumulator,
AccumulatorFunctionImplementation, AggregateUDF};
-use crate::expression::PyExpr;
+use crate::expr::PyExpr;
use crate::utils::parse_volatility;
#[derive(Debug)]
diff --git a/src/udf.rs b/src/udf.rs
index 1849711..f3e6cfb 100644
--- a/src/udf.rs
+++ b/src/udf.rs
@@ -28,7 +28,7 @@ use datafusion::physical_plan::udf::ScalarUDF;
use datafusion_expr::create_udf;
use datafusion_expr::function::ScalarFunctionImplementation;
-use crate::expression::PyExpr;
+use crate::expr::PyExpr;
use crate::utils::parse_volatility;
/// Create a DataFusion's UDF implementation from a python function