This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ab0dbe  Table scan bindings (#178)
4ab0dbe is described below

commit 4ab0dbecd8329cd30c013d8f1cfe4f00adade763
Author: Jeremy Dyer <[email protected]>
AuthorDate: Tue Feb 14 19:03:51 2023 -0500

    Table scan bindings (#178)
---
 Cargo.lock                         |  56 +++++------------
 Cargo.toml                         |   1 +
 datafusion/__init__.py             |  13 ++--
 datafusion/expr.py                 |  23 +++++++
 datafusion/tests/test_context.py   |   3 +-
 datafusion/tests/test_dataframe.py |   4 +-
 datafusion/tests/test_imports.py   |  12 +++-
 docs/source/api.rst                |   2 +-
 docs/source/api/expression.rst     |   4 +-
 src/dataframe.rs                   |   2 +-
 src/{expression.rs => expr.rs}     |  13 +++-
 src/expr/table_scan.rs             | 121 +++++++++++++++++++++++++++++++++++++
 src/functions.rs                   |   8 +--
 src/lib.rs                         |   8 ++-
 src/udaf.rs                        |   2 +-
 src/udf.rs                         |   2 +-
 16 files changed, 210 insertions(+), 64 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 2d6fffb..7c61090 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -423,18 +423,6 @@ dependencies = [
  "alloc-stdlib",
 ]
 
-[[package]]
-name = "bstr"
-version = "0.2.17"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
-dependencies = [
- "lazy_static",
- "memchr",
- "regex-automata",
- "serde",
-]
-
 [[package]]
 name = "bumpalo"
 version = "3.12.0"
@@ -596,13 +584,12 @@ dependencies = [
 
 [[package]]
 name = "csv"
-version = "1.1.6"
+version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
+checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359"
 dependencies = [
- "bstr",
  "csv-core",
- "itoa 0.4.8",
+ "itoa",
  "ryu",
  "serde",
 ]
@@ -808,6 +795,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr",
  "datafusion-optimizer",
+ "datafusion-sql",
  "datafusion-substrait",
  "futures",
  "mimalloc",
@@ -903,9 +891,9 @@ dependencies = [
 
 [[package]]
 name = "fastrand"
-version = "1.8.0"
+version = "1.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499"
+checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
 dependencies = [
  "instant",
 ]
@@ -1152,7 +1140,7 @@ checksum = 
"75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
 dependencies = [
  "bytes",
  "fnv",
- "itoa 1.0.5",
+ "itoa",
 ]
 
 [[package]]
@@ -1193,7 +1181,7 @@ dependencies = [
  "http-body",
  "httparse",
  "httpdate",
- "itoa 1.0.5",
+ "itoa",
  "pin-project-lite",
  "socket2",
  "tokio",
@@ -1295,12 +1283,6 @@ dependencies = [
  "either",
 ]
 
-[[package]]
-name = "itoa"
-version = "0.4.8"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
-
 [[package]]
 name = "itoa"
 version = "1.0.5"
@@ -1546,14 +1528,14 @@ dependencies = [
 
 [[package]]
 name = "mio"
-version = "0.8.5"
+version = "0.8.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de"
+checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
 dependencies = [
  "libc",
  "log",
  "wasi 0.11.0+wasi-snapshot-preview1",
- "windows-sys 0.42.0",
+ "windows-sys 0.45.0",
 ]
 
 [[package]]
@@ -1679,9 +1661,9 @@ dependencies = [
 
 [[package]]
 name = "once_cell"
-version = "1.17.0"
+version = "1.17.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
+checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
 
 [[package]]
 name = "ordered-float"
@@ -2060,12 +2042,6 @@ dependencies = [
  "regex-syntax",
 ]
 
-[[package]]
-name = "regex-automata"
-version = "0.1.10"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
-
 [[package]]
 name = "regex-syntax"
 version = "0.6.28"
@@ -2329,7 +2305,7 @@ version = "1.0.93"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
 dependencies = [
- "itoa 1.0.5",
+ "itoa",
  "ryu",
  "serde",
 ]
@@ -2352,7 +2328,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
 dependencies = [
  "form_urlencoded",
- "itoa 1.0.5",
+ "itoa",
  "ryu",
  "serde",
 ]
@@ -2364,7 +2340,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8fb06d4b6cdaef0e0c51fa881acb721bed3c924cfaa71d9c94a3b771dfdf6567"
 dependencies = [
  "indexmap",
- "itoa 1.0.5",
+ "itoa",
  "ryu",
  "serde",
  "unsafe-libyaml",
diff --git a/Cargo.toml b/Cargo.toml
index 642da2d..08c5640 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -38,6 +38,7 @@ datafusion = { version = "18.0.0", features = ["pyarrow", 
"avro"] }
 datafusion-expr = "18.0.0"
 datafusion-optimizer = "18.0.0"
 datafusion-common = { version = "18.0.0", features = ["pyarrow"] }
+datafusion-sql = "18.0.0"
 datafusion-substrait = "18.0.0"
 uuid = { version = "1.2", features = ["v4"] }
 mimalloc = { version = "*", optional = true, default-features = false }
diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index ddab950..0b4e896 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -32,10 +32,14 @@ from ._internal import (
     SessionContext,
     SessionConfig,
     RuntimeConfig,
-    Expression,
     ScalarUDF,
 )
 
+from .expr import (
+    Expr,
+    TableScan,
+)
+
 __version__ = importlib_metadata.version(__name__)
 
 __all__ = [
@@ -44,11 +48,12 @@ __all__ = [
     "SessionContext",
     "SessionConfig",
     "RuntimeConfig",
-    "Expression",
+    "Expr",
     "AggregateUDF",
     "ScalarUDF",
     "column",
     "literal",
+    "TableScan",
 ]
 
 
@@ -71,7 +76,7 @@ class Accumulator(metaclass=ABCMeta):
 
 
 def column(value):
-    return Expression.column(value)
+    return Expr.column(value)
 
 
 col = column
@@ -80,7 +85,7 @@ col = column
 def literal(value):
     if not isinstance(value, pa.Scalar):
         value = pa.scalar(value)
-    return Expression.literal(value)
+    return Expr.literal(value)
 
 
 lit = literal
diff --git a/datafusion/expr.py b/datafusion/expr.py
new file mode 100644
index 0000000..e914b85
--- /dev/null
+++ b/datafusion/expr.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from ._internal import expr
+
+
+def __getattr__(name):
+    return getattr(expr, name)
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 48d41c1..6faffaf 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -172,7 +172,8 @@ def test_dataset_filter_nested_data(ctx):
 
     df = ctx.table("t")
 
-    # This filter will not be pushed down to DatasetExec since it isn't 
supported
+    # This filter will not be pushed down to DatasetExec since it
+    # isn't supported
     df = df.select(
         column("nested_data")["a"] + column("nested_data")["b"],
         column("nested_data")["a"] - column("nested_data")["b"],
diff --git a/datafusion/tests/test_dataframe.py 
b/datafusion/tests/test_dataframe.py
index 4d70845..30327ee 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -314,8 +314,8 @@ def test_execution_plan(aggregate_df):
 
     indent = plan.display_indent()
 
-    # indent plan will be different for everyone due to absolute path to 
filename, so
-    # we just check for some expected content
+    # indent plan will be different for everyone due to absolute path
+    # to filename, so we just check for some expected content
     assert "ProjectionExec:" in indent
     assert "AggregateExec:" in indent
     assert "CoalesceBatchesExec:" in indent
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index f571bc4..1e8c796 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -22,11 +22,15 @@ from datafusion import (
     AggregateUDF,
     DataFrame,
     SessionContext,
-    Expression,
     ScalarUDF,
     functions,
 )
 
+from datafusion.expr import (
+    Expr,
+    TableScan,
+)
+
 
 def test_import_datafusion():
     assert datafusion.__name__ == "datafusion"
@@ -39,13 +43,15 @@ def test_datafusion_python_version():
 def test_class_module_is_datafusion():
     for klass in [
         SessionContext,
-        Expression,
         DataFrame,
         ScalarUDF,
         AggregateUDF,
     ]:
         assert klass.__module__ == "datafusion"
 
+    for klass in [Expr, TableScan]:
+        assert klass.__module__ == "datafusion.expr"
+
 
 def test_import_from_functions_submodule():
     from datafusion.functions import abs, sin  # noqa
@@ -62,7 +68,7 @@ def test_classes_are_inheritable():
     class MyExecContext(SessionContext):
         pass
 
-    class MyExpression(Expression):
+    class MyExpression(Expr):
         pass
 
     class MyDataFrame(DataFrame):
diff --git a/docs/source/api.rst b/docs/source/api.rst
index a5d6543..a3e7e24 100644
--- a/docs/source/api.rst
+++ b/docs/source/api.rst
@@ -27,6 +27,6 @@ API Reference
    api/config
    api/dataframe
    api/execution_context
-   api/expression
+   api/expr
    api/functions
    api/object_store
diff --git a/docs/source/api/expression.rst b/docs/source/api/expression.rst
index 45923fb..30137d1 100644
--- a/docs/source/api/expression.rst
+++ b/docs/source/api/expression.rst
@@ -18,10 +18,10 @@
 .. _api.expression:
 .. currentmodule:: datafusion
 
-Expression
+Expr
 ==========
 
 .. autosummary::
    :toctree: ../generated/
 
-   Expression
+   Expr
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 0f73757..4b9fbca 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -18,7 +18,7 @@
 use crate::physical_plan::PyExecutionPlan;
 use crate::sql::logical::PyLogicalPlan;
 use crate::utils::wait_for_future;
-use crate::{errors::DataFusionError, expression::PyExpr};
+use crate::{errors::DataFusionError, expr::PyExpr};
 use datafusion::arrow::datatypes::Schema;
 use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, 
PyArrowType};
 use datafusion::arrow::util::pretty;
diff --git a/src/expression.rs b/src/expr.rs
similarity index 90%
rename from src/expression.rs
rename to src/expr.rs
index 588b492..dceedc1 100644
--- a/src/expression.rs
+++ b/src/expr.rs
@@ -24,8 +24,10 @@ use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField};
 
 use datafusion::scalar::ScalarValue;
 
-/// An PyExpr that can be used on a DataFrame
-#[pyclass(name = "Expression", module = "datafusion", subclass)]
+pub mod table_scan;
+
+/// A PyExpr that can be used on a DataFrame
+#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
 #[derive(Debug, Clone)]
 pub(crate) struct PyExpr {
     pub(crate) expr: Expr,
@@ -133,3 +135,10 @@ impl PyExpr {
         expr.into()
     }
 }
+
+/// Initializes the `expr` module to match the pattern of `datafusion-expr` 
https://docs.rs/datafusion-expr/latest/datafusion_expr/
+pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
+    m.add_class::<PyExpr>()?;
+    m.add_class::<table_scan::PyTableScan>()?;
+    Ok(())
+}
diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs
new file mode 100644
index 0000000..bc7d68a
--- /dev/null
+++ b/src/expr/table_scan.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::TableScan;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::expr::PyExpr;
+
+#[pyclass(name = "TableScan", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyTableScan {
+    table_scan: TableScan,
+}
+
+impl From<PyTableScan> for TableScan {
+    fn from(tbl_scan: PyTableScan) -> TableScan {
+        tbl_scan.table_scan
+    }
+}
+
+impl From<TableScan> for PyTableScan {
+    fn from(table_scan: TableScan) -> PyTableScan {
+        PyTableScan { table_scan }
+    }
+}
+
+impl Display for PyTableScan {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "TableScan\nTable Name: {}
+            \nProjections: {:?}
+            \nProjected Schema: {:?}
+            \nFilters: {:?}",
+            &self.table_scan.table_name,
+            &self.py_projections(),
+            self.table_scan.projected_schema,
+            self.py_filters(),
+        )
+    }
+}
+
+#[pymethods]
+impl PyTableScan {
+    /// Retrieves the name of the table represented by this `TableScan` 
instance
+    #[pyo3(name = "table_name")]
+    fn py_table_name(&self) -> PyResult<&str> {
+        Ok(&self.table_scan.table_name)
+    }
+
+    /// TODO: Bindings for `TableSource` need to exist first. Left as a
+    /// placeholder to display intention to add when able to.
+    // #[pyo3(name = "source")]
+    // fn py_source(&self) -> PyResult<Arc<dyn TableSource>> {
+    //     Ok(self.table_scan.source)
+    // }
+
+    /// The column indexes that should be. Note if this is empty then
+    /// all columns should be read by the `TableProvider`. This function
+    /// provides a Tuple of the (index, column_name) to make things simplier
+    /// for the calling code since often times the name is preferred to
+    /// the index which is a lower level abstraction.
+    #[pyo3(name = "projection")]
+    fn py_projections(&self) -> PyResult<Vec<(usize, String)>> {
+        match &self.table_scan.projection {
+            Some(indices) => {
+                let schema = self.table_scan.source.schema();
+                Ok(indices
+                    .iter()
+                    .map(|i| (*i, schema.field(*i).name().to_string()))
+                    .collect())
+            }
+            None => Ok(vec![]),
+        }
+    }
+
+    /// TODO: Bindings for `DFSchema` need to exist first. Left as a
+    /// placeholder to display intention to add when able to.
+    // /// Resulting schema from the `TableScan` operation
+    // #[pyo3(name = "projectedSchema")]
+    // fn py_projected_schema(&self) -> PyResult<DFSchemaRef> {
+    //     Ok(self.table_scan.projected_schema)
+    // }
+
+    /// Certain `TableProvider` physical readers offer the capability to 
filter rows that
+    /// are read at read time. These `filters` are contained here.
+    #[pyo3(name = "filters")]
+    fn py_filters(&self) -> PyResult<Vec<PyExpr>> {
+        Ok(self
+            .table_scan
+            .filters
+            .iter()
+            .map(|expr| PyExpr::from(expr.clone()))
+            .collect())
+    }
+
+    /// Optional number of rows that should be read at read time by the 
`TableProvider`
+    #[pyo3(name = "fetch")]
+    fn py_fetch(&self) -> PyResult<Option<usize>> {
+        Ok(self.table_scan.fetch)
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("TableScan({})", self))
+    }
+}
diff --git a/src/functions.rs b/src/functions.rs
index 5cabb12..8acffeb 100644
--- a/src/functions.rs
+++ b/src/functions.rs
@@ -24,7 +24,7 @@ use datafusion_expr::window_function::find_df_window_func;
 use datafusion_expr::{aggregate_function, lit, BuiltinScalarFunction, Expr, 
WindowFrame};
 
 use crate::errors::DataFusionError;
-use crate::expression::PyExpr;
+use crate::expr::PyExpr;
 
 #[pyfunction]
 fn in_list(expr: PyExpr, value: Vec<PyExpr>, negated: bool) -> PyExpr {
@@ -66,7 +66,7 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) -> 
PyResult<PyExpr> {
     Ok(datafusion_expr::concat_ws(lit(sep), args).into())
 }
 
-/// Creates a new Sort expression
+/// Creates a new Sort Expr
 #[pyfunction]
 fn order_by(expr: PyExpr, asc: Option<bool>, nulls_first: Option<bool>) -> 
PyResult<PyExpr> {
     Ok(PyExpr {
@@ -78,7 +78,7 @@ fn order_by(expr: PyExpr, asc: Option<bool>, nulls_first: 
Option<bool>) -> PyRes
     })
 }
 
-/// Creates a new Alias expression
+/// Creates a new Alias Expr
 #[pyfunction]
 fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
     Ok(PyExpr {
@@ -86,7 +86,7 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
     })
 }
 
-/// Create a column reference expression
+/// Create a column reference Expr
 #[pyfunction]
 fn col(name: &str) -> PyResult<PyExpr> {
     Ok(PyExpr {
diff --git a/src/lib.rs b/src/lib.rs
index 5391de5..b16ef75 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -32,7 +32,7 @@ mod dataset;
 mod dataset_exec;
 pub mod errors;
 #[allow(clippy::borrow_deref_ref)]
-mod expression;
+mod expr;
 #[allow(clippy::borrow_deref_ref)]
 mod functions;
 pub mod physical_plan;
@@ -64,13 +64,17 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> {
     m.add_class::<context::PySessionConfig>()?;
     m.add_class::<context::PySessionContext>()?;
     m.add_class::<dataframe::PyDataFrame>()?;
-    m.add_class::<expression::PyExpr>()?;
     m.add_class::<udf::PyScalarUDF>()?;
     m.add_class::<udaf::PyAggregateUDF>()?;
     m.add_class::<config::PyConfig>()?;
     m.add_class::<sql::logical::PyLogicalPlan>()?;
     m.add_class::<physical_plan::PyExecutionPlan>()?;
 
+    // Register `expr` as a submodule. Matching `datafusion-expr` 
https://docs.rs/datafusion-expr/latest/datafusion_expr/
+    let expr = PyModule::new(py, "expr")?;
+    expr::init_module(expr)?;
+    m.add_submodule(expr)?;
+
     // Register the functions as a submodule
     let funcs = PyModule::new(py, "functions")?;
     functions::init_module(funcs)?;
diff --git a/src/udaf.rs b/src/udaf.rs
index 7a8bd49..d5866f8 100644
--- a/src/udaf.rs
+++ b/src/udaf.rs
@@ -26,7 +26,7 @@ use datafusion::common::ScalarValue;
 use datafusion::error::{DataFusionError, Result};
 use datafusion_expr::{create_udaf, Accumulator, 
AccumulatorFunctionImplementation, AggregateUDF};
 
-use crate::expression::PyExpr;
+use crate::expr::PyExpr;
 use crate::utils::parse_volatility;
 
 #[derive(Debug)]
diff --git a/src/udf.rs b/src/udf.rs
index 1849711..f3e6cfb 100644
--- a/src/udf.rs
+++ b/src/udf.rs
@@ -28,7 +28,7 @@ use datafusion::physical_plan::udf::ScalarUDF;
 use datafusion_expr::create_udf;
 use datafusion_expr::function::ScalarFunctionImplementation;
 
-use crate::expression::PyExpr;
+use crate::expr::PyExpr;
 use crate::utils::parse_volatility;
 
 /// Create a DataFusion's UDF implementation from a python function

Reply via email to