This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 12347d9  [branch-0.8] Merge from main (#201)
12347d9 is described below

commit 12347d9134c1ba1f1961e4e7ef8c1a6f240c2721
Author: Andy Grove <[email protected]>
AuthorDate: Wed Feb 22 05:42:24 2023 -0700

    [branch-0.8] Merge from main (#201)
    
    * changelog (#188)
    
    * Add Python wrapper for LogicalPlan::Sort (#196)
    
    * Add Python wrapper for LogicalPlan::Aggregate (#195)
    
    * Add Python wrapper for LogicalPlan::Limit (#193)
    
    * Add Python wrapper for LogicalPlan::Filter (#192)
    
    * Add Python wrapper for LogicalPlan::Filter
    
    * clippy
    
    * clippy
    
    * Update src/expr/filter.rs
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    ---------
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    
    * Add tests for recently added functionality (#199)
    
    * Add experimental support for executing SQL with Polars and Pandas (#190)
    
    * Run `maturin develop` instead of `cargo build` in verification script 
(#200)
    
    * Implement `to_pandas()` (#197)
    
    * Implement to_pandas()
    
    * Update documentation
    
    * Write unit test
    
    * Add support for cudf as a physical execution engine (#205)
    
    * Update README in preparation for 0.8 release (#206)
    
    * Analyze table bindings (#204)
    
    * method for getting the internal LogicalPlan instance
    
    * Add explain plan method
    
    * Add bindings for analyze table
    
    * Add to_variant
    
    * cargo fmt
    
    * blake and flake formatting
    
    * changelog (#209)
    
    ---------
    
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    Co-authored-by: Dejan Simic <[email protected]>
    Co-authored-by: Jeremy Dyer <[email protected]>
---
 CHANGELOG.md                                       |  94 ++++++++++++------
 Cargo.lock                                         |  20 ++--
 README.md                                          |  85 +++++++---------
 conda/environments/datafusion-dev.yaml             |   5 +-
 datafusion/__init__.py                             |   8 ++
 datafusion/cudf.py                                 |  62 ++++++++++++
 datafusion/pandas.py                               |  61 ++++++++++++
 datafusion/polars.py                               |  84 ++++++++++++++++
 datafusion/tests/test_context.py                   |   1 -
 datafusion/tests/test_dataframe.py                 |  11 +++
 datafusion/tests/test_expr.py                      | 110 +++++++++++++++++++++
 datafusion/tests/test_imports.py                   |  25 ++++-
 dev/release/verify-release-candidate.sh            |  12 ++-
 examples/README.md                                 |  34 +++++--
 examples/dataframe-parquet.py                      |   6 +-
 examples/{dataframe-parquet.py => sql-on-cudf.py}  |  13 +--
 .../{dataframe-parquet.py => sql-on-pandas.py}     |  11 +--
 .../{dataframe-parquet.py => sql-on-polars.py}     |  13 +--
 examples/sql-parquet.py                            |   4 +-
 examples/sql-to-pandas.py                          |  10 +-
 examples/substrait.py                              |  53 ++++++++++
 src/dataframe.rs                                   |  18 ++++
 src/expr.rs                                        |  42 ++++++++
 src/expr/aggregate.rs                              | 106 ++++++++++++++++++++
 src/expr/aggregate_expr.rs                         |  73 ++++++++++++++
 src/expr/analyze.rs                                |  76 ++++++++++++++
 src/expr/binary_expr.rs                            |  57 +++++++++++
 src/expr/column.rs                                 |  60 +++++++++++
 src/expr/filter.rs                                 |  83 ++++++++++++++++
 src/expr/limit.rs                                  |  88 +++++++++++++++++
 src/expr/literal.rs                                |  74 ++++++++++++++
 src/expr/projection.rs                             |  45 +++------
 src/expr/sort.rs                                   |  94 ++++++++++++++++++
 src/expr/table_scan.rs                             |  15 +++
 src/sql/logical.rs                                 |  43 +++++++-
 35 files changed, 1427 insertions(+), 169 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8d47fdf..f196916 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,50 +19,36 @@
 
 # Changelog
 
-## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) 
(2023-02-17)
+## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) 
(2023-02-22)
 
-[Full 
Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.8.0)
+[Full 
Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.8.0-rc1...0.8.0)
 
 **Implemented enhancements:**
 
-- Add bindings for datafusion\_common::DFField 
[\#184](https://github.com/apache/arrow-datafusion-python/issues/184)
-- Add bindings for DFSchema/DFSchemaRef 
[\#181](https://github.com/apache/arrow-datafusion-python/issues/181)
-- Add bindings for datafusion\_expr Projection 
[\#179](https://github.com/apache/arrow-datafusion-python/issues/179)
-- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` 
[\#177](https://github.com/apache/arrow-datafusion-python/issues/177)
-- Add a "mapping" struct for types 
[\#172](https://github.com/apache/arrow-datafusion-python/issues/172)
-- Improve string representation of datafusion classes \(dataframe, context, 
expression, ...\) 
[\#158](https://github.com/apache/arrow-datafusion-python/issues/158)
-- Add DataFrame count method 
[\#151](https://github.com/apache/arrow-datafusion-python/issues/151)
-- \[REQUEST\] Github Actions Improvements 
[\#146](https://github.com/apache/arrow-datafusion-python/issues/146)
-- Change default branch name from master to main 
[\#144](https://github.com/apache/arrow-datafusion-python/issues/144)
-- Bump pyo3 to 0.18.0 
[\#140](https://github.com/apache/arrow-datafusion-python/issues/140)
-- Add script for Python linting 
[\#134](https://github.com/apache/arrow-datafusion-python/issues/134)
-- Add Python bindings for substrait module 
[\#132](https://github.com/apache/arrow-datafusion-python/issues/132)
-- Expand unit tests for built-in functions 
[\#128](https://github.com/apache/arrow-datafusion-python/issues/128)
-- support creating arrow-datafusion-python conda environment 
[\#122](https://github.com/apache/arrow-datafusion-python/issues/122)
-- Build Python source distribution in GitHub workflow 
[\#81](https://github.com/apache/arrow-datafusion-python/issues/81)
-- EPIC: Add all functions to python binding `functions` 
[\#72](https://github.com/apache/arrow-datafusion-python/issues/72)
+- Add support for cuDF physical execution engine 
[\#202](https://github.com/apache/arrow-datafusion-python/issues/202)
+- Make it easier to create a Pandas dataframe from DataFusion query results 
[\#139](https://github.com/apache/arrow-datafusion-python/issues/139)
 
 **Fixed bugs:**
 
-- Build is broken 
[\#161](https://github.com/apache/arrow-datafusion-python/issues/161)
-- Out of memory when sorting 
[\#157](https://github.com/apache/arrow-datafusion-python/issues/157)
-- window\_lead test appears to be non-deterministic 
[\#135](https://github.com/apache/arrow-datafusion-python/issues/135)
-- Reading csv does not work 
[\#130](https://github.com/apache/arrow-datafusion-python/issues/130)
-- Github actions produce a lot of warnings 
[\#94](https://github.com/apache/arrow-datafusion-python/issues/94)
-- ASF source release tarball has wrong directory name 
[\#90](https://github.com/apache/arrow-datafusion-python/issues/90)
-- Python Release Build failing after upgrading to maturin 14.2 
[\#87](https://github.com/apache/arrow-datafusion-python/issues/87)
-- Maturin build hangs on Linux ARM64 
[\#84](https://github.com/apache/arrow-datafusion-python/issues/84)
-- Cannot install on Mac M1 from source tarball from testpypi 
[\#82](https://github.com/apache/arrow-datafusion-python/issues/82)
-- ImportPathMismatchError when running pytest locally 
[\#77](https://github.com/apache/arrow-datafusion-python/issues/77)
+- Build error: could not compile `thiserror` due to 2 previous errors 
[\#69](https://github.com/apache/arrow-datafusion-python/issues/69)
 
 **Closed issues:**
 
-- Publish documentation for Python bindings 
[\#39](https://github.com/apache/arrow-datafusion-python/issues/39)
-- Add Python binding for `approx_median` 
[\#32](https://github.com/apache/arrow-datafusion-python/issues/32)
-- Release version 0.7.0 
[\#7](https://github.com/apache/arrow-datafusion-python/issues/7)
+- Integrate with the new `object_store` crate 
[\#22](https://github.com/apache/arrow-datafusion-python/issues/22)
 
 **Merged pull requests:**
 
+- Update README in preparation for 0.8 release 
[\#206](https://github.com/apache/arrow-datafusion-python/pull/206) 
([andygrove](https://github.com/andygrove))
+- Add support for cudf as a physical execution engine 
[\#205](https://github.com/apache/arrow-datafusion-python/pull/205) 
([jdye64](https://github.com/jdye64))
+- Run `maturin develop` instead of `cargo build` in verification script 
[\#200](https://github.com/apache/arrow-datafusion-python/pull/200) 
([andygrove](https://github.com/andygrove))
+- Add tests for recently added functionality 
[\#199](https://github.com/apache/arrow-datafusion-python/pull/199) 
([andygrove](https://github.com/andygrove))
+- Implement `to_pandas()` 
[\#197](https://github.com/apache/arrow-datafusion-python/pull/197) 
([simicd](https://github.com/simicd))
+- Add Python wrapper for LogicalPlan::Sort 
[\#196](https://github.com/apache/arrow-datafusion-python/pull/196) 
([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Aggregate 
[\#195](https://github.com/apache/arrow-datafusion-python/pull/195) 
([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Limit 
[\#193](https://github.com/apache/arrow-datafusion-python/pull/193) 
([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Filter 
[\#192](https://github.com/apache/arrow-datafusion-python/pull/192) 
([andygrove](https://github.com/andygrove))
+- Add experimental support for executing SQL with Polars and Pandas 
[\#190](https://github.com/apache/arrow-datafusion-python/pull/190) 
([andygrove](https://github.com/andygrove))
+- Update changelog for 0.8 release 
[\#188](https://github.com/apache/arrow-datafusion-python/pull/188) 
([andygrove](https://github.com/andygrove))
 - Add ability to execute ExecutionPlan and get a stream of RecordBatch 
[\#186](https://github.com/apache/arrow-datafusion-python/pull/186) 
([andygrove](https://github.com/andygrove))
 - Dffield bindings 
[\#185](https://github.com/apache/arrow-datafusion-python/pull/185) 
([jdye64](https://github.com/jdye64))
 - Add bindings for DFSchema 
[\#183](https://github.com/apache/arrow-datafusion-python/pull/183) 
([jdye64](https://github.com/jdye64))
@@ -118,6 +104,52 @@
 - Update release instructions 
[\#83](https://github.com/apache/arrow-datafusion-python/pull/83) 
([andygrove](https://github.com/andygrove))
 - \[Functions\] - Add python function binding to `functions` 
[\#73](https://github.com/apache/arrow-datafusion-python/pull/73) 
([francis-du](https://github.com/francis-du))
 
+## 
[0.8.0-rc1](https://github.com/apache/arrow-datafusion-python/tree/0.8.0-rc1) 
(2023-02-17)
+
+[Full 
Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0-rc2...0.8.0-rc1)
+
+**Implemented enhancements:**
+
+- Add bindings for datafusion\_common::DFField 
[\#184](https://github.com/apache/arrow-datafusion-python/issues/184)
+- Add bindings for DFSchema/DFSchemaRef 
[\#181](https://github.com/apache/arrow-datafusion-python/issues/181)
+- Add bindings for datafusion\_expr Projection 
[\#179](https://github.com/apache/arrow-datafusion-python/issues/179)
+- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` 
[\#177](https://github.com/apache/arrow-datafusion-python/issues/177)
+- Add a "mapping" struct for types 
[\#172](https://github.com/apache/arrow-datafusion-python/issues/172)
+- Improve string representation of datafusion classes \(dataframe, context, 
expression, ...\) 
[\#158](https://github.com/apache/arrow-datafusion-python/issues/158)
+- Add DataFrame count method 
[\#151](https://github.com/apache/arrow-datafusion-python/issues/151)
+- \[REQUEST\] Github Actions Improvements 
[\#146](https://github.com/apache/arrow-datafusion-python/issues/146)
+- Change default branch name from master to main 
[\#144](https://github.com/apache/arrow-datafusion-python/issues/144)
+- Bump pyo3 to 0.18.0 
[\#140](https://github.com/apache/arrow-datafusion-python/issues/140)
+- Add script for Python linting 
[\#134](https://github.com/apache/arrow-datafusion-python/issues/134)
+- Add Python bindings for substrait module 
[\#132](https://github.com/apache/arrow-datafusion-python/issues/132)
+- Expand unit tests for built-in functions 
[\#128](https://github.com/apache/arrow-datafusion-python/issues/128)
+- support creating arrow-datafusion-python conda environment 
[\#122](https://github.com/apache/arrow-datafusion-python/issues/122)
+- Build Python source distribution in GitHub workflow 
[\#81](https://github.com/apache/arrow-datafusion-python/issues/81)
+- EPIC: Add all functions to python binding `functions` 
[\#72](https://github.com/apache/arrow-datafusion-python/issues/72)
+
+**Fixed bugs:**
+
+- Build is broken 
[\#161](https://github.com/apache/arrow-datafusion-python/issues/161)
+- Out of memory when sorting 
[\#157](https://github.com/apache/arrow-datafusion-python/issues/157)
+- window\_lead test appears to be non-deterministic 
[\#135](https://github.com/apache/arrow-datafusion-python/issues/135)
+- Reading csv does not work 
[\#130](https://github.com/apache/arrow-datafusion-python/issues/130)
+- Github actions produce a lot of warnings 
[\#94](https://github.com/apache/arrow-datafusion-python/issues/94)
+- ASF source release tarball has wrong directory name 
[\#90](https://github.com/apache/arrow-datafusion-python/issues/90)
+- Python Release Build failing after upgrading to maturin 14.2 
[\#87](https://github.com/apache/arrow-datafusion-python/issues/87)
+- Maturin build hangs on Linux ARM64 
[\#84](https://github.com/apache/arrow-datafusion-python/issues/84)
+- Cannot install on Mac M1 from source tarball from testpypi 
[\#82](https://github.com/apache/arrow-datafusion-python/issues/82)
+- ImportPathMismatchError when running pytest locally 
[\#77](https://github.com/apache/arrow-datafusion-python/issues/77)
+
+**Closed issues:**
+
+- Publish documentation for Python bindings 
[\#39](https://github.com/apache/arrow-datafusion-python/issues/39)
+- Add Python binding for `approx_median` 
[\#32](https://github.com/apache/arrow-datafusion-python/issues/32)
+- Release version 0.7.0 
[\#7](https://github.com/apache/arrow-datafusion-python/issues/7)
+
+## 
[0.7.0-rc2](https://github.com/apache/arrow-datafusion-python/tree/0.7.0-rc2) 
(2022-11-26)
+
+[Full 
Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.7.0-rc2)
+
 
 ## 
[Unreleased](https://github.com/datafusion-contrib/datafusion-python/tree/HEAD)
 
diff --git a/Cargo.lock b/Cargo.lock
index 5059afa..04a2ea8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1134,9 +1134,9 @@ dependencies = [
 
 [[package]]
 name = "http"
-version = "0.2.8"
+version = "0.2.9"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
+checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
 dependencies = [
  "bytes",
  "fnv",
@@ -1385,9 +1385,9 @@ checksum = 
"201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79"
 
 [[package]]
 name = "libflate"
-version = "1.2.0"
+version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093"
+checksum = "97822bf791bd4d5b403713886a5fbe8bf49520fe78e323b0dc480ca1a03e50b0"
 dependencies = [
  "adler32",
  "crc32fast",
@@ -1396,9 +1396,9 @@ dependencies = [
 
 [[package]]
 name = "libflate_lz77"
-version = "1.1.0"
+version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a"
+checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf"
 dependencies = [
  "rle-decode-fast",
 ]
@@ -2359,9 +2359,9 @@ dependencies = [
 
 [[package]]
 name = "slab"
-version = "0.4.7"
+version = "0.4.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
+checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d"
 dependencies = [
  "autocfg",
 ]
@@ -2635,9 +2635,9 @@ dependencies = [
 
 [[package]]
 name = "tokio-stream"
-version = "0.1.11"
+version = "0.1.12"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
+checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
 dependencies = [
  "futures-core",
  "pin-project-lite",
diff --git a/README.md b/README.md
index ab89ff6..e78f613 100644
--- a/README.md
+++ b/README.md
@@ -24,19 +24,30 @@
 
 This is a Python library that binds to [Apache 
Arrow](https://arrow.apache.org/) in-memory query engine 
[DataFusion](https://github.com/apache/arrow-datafusion).
 
-Like pyspark, it allows you to build a plan through SQL or a DataFrame API 
against in-memory data, parquet or CSV
-files, run it in a multi-threaded environment, and obtain the result back in 
Python.
+DataFusion's Python bindings can be used as an end-user tool as well as 
providing a foundation for building new systems.
 
-It also allows you to use UDFs and UDAFs for complex operations.
+## Features
 
-The major advantage of this library over other execution engines is that this 
library achieves zero-copy between
-Python and its execution engine: there is no cost in using UDFs, UDAFs, and 
collecting the results to Python apart
-from having to lock the GIL when running those operations.
+- Execute queries using SQL or DataFrames against CSV, Parquet, and JSON data 
sources
+- Queries are optimized using DataFusion's query optimizer
+- Execute user-defined Python code from SQL
+- Exchange data with Pandas and other DataFrame libraries that support PyArrow
+- Serialize and deserialize query plans in Substrait format
+- Experimental support for executing SQL queries against Polars, Pandas and 
cuDF
 
-Its query engine, DataFusion, is written in 
[Rust](https://www.rust-lang.org/), which makes strong assumptions
-about thread safety and lack of memory leaks.
+## Comparison with other projects
 
-Technically, zero-copy is achieved via the [c data 
interface](https://arrow.apache.org/docs/format/CDataInterface.html).
+Here is a comparison with similar projects that may help understand when 
DataFusion might be suitable and unsuitable 
+for your needs:
+
+- [DuckDB](http://www.duckdb.org/) is an open source, in-process analytic 
database. Like DataFusion, it supports 
+ very fast execution, both from its custom file format and directly from 
Parquet files. Unlike DataFusion, it is 
+ written in C/C++ and it is primarily used directly by users as a serverless 
database and query system rather than 
+ as a library for building such database systems.
+
+- [Polars](http://pola.rs/) is one of the fastest DataFrame libraries at the 
time of writing. Like DataFusion, it 
+ is also written in Rust and uses the Apache Arrow memory model, but unlike 
DataFusion it does not provide full SQL 
+ support, nor as many extension points.
 
 ## Example Usage
 
@@ -47,12 +58,8 @@ The Parquet file used in this example can be downloaded from 
the following page:
 
 - https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
 
-See the [examples](examples) directory for more examples.
-
 ```python
 from datafusion import SessionContext
-import pandas as pd
-import pyarrow as pa
 
 # Create a DataFusion context
 ctx = SessionContext()
@@ -67,17 +74,11 @@ df = ctx.sql("select passenger_count, count(*) "
              "group by passenger_count "
              "order by passenger_count")
 
-# collect as list of pyarrow.RecordBatch
-results = df.collect()
-
-# get first batch
-batch = results[0]
-
 # convert to Pandas
-df = batch.to_pandas()
+pandas_df = df.to_pandas()
 
 # create a chart
-fig = df.plot(kind="bar", title="Trip Count by Number of 
Passengers").get_figure()
+fig = pandas_df.plot(kind="bar", title="Trip Count by Number of 
Passengers").get_figure()
 fig.savefig('chart.png')
 ```
 
@@ -85,42 +86,30 @@ This produces the following chart:
 
 ![Chart](examples/chart.png)
 
-## Substrait Support
+## More Examples
 
-`arrow-datafusion-python` has bindings which allow for serializing a SQL query 
to substrait protobuf format and deserializing substrait protobuf bytes to a 
DataFusion `LogicalPlan`, `PyLogicalPlan` in a Python context, which can then 
be executed.
+See [examples](examples/README.md) for more information.
 
-### Example of Serializing/Deserializing Substrait Plans
+### Executing Queries with DataFusion
 
-```python
-from datafusion import SessionContext
-from datafusion import substrait as ss
+- [Query a Parquet file using SQL](./examples/sql-parquet.py)
+- [Query a Parquet file using the DataFrame 
API](./examples/dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas 
DataFrame](./examples/sql-to-pandas.py)
+- [Query PyArrow Data](./examples/query-pyarrow-data.py)
 
-# Create a DataFusion context
-ctx = SessionContext()
-
-# Register table with context
-ctx.register_parquet('aggregate_test_data', 
'./testing/data/csv/aggregate_test_100.csv')
-
-substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM 
aggregate_test_data", ctx)
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+### Running User-Defined Python Code
 
-# Alternative serialization approaches
-# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be 
distributed to file, network, etc safely
-# where they could subsequently be deserialized on the receiving end.
-substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM 
aggregate_test_data", ctx)
+- [Register a Python UDF with DataFusion](./examples/python-udf.py)
+- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)
 
-# Imagine here bytes would be read from network, file, etc ... for example 
brevity this is omitted and variable is simply reused
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
-substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
+### Substrait Support
 
-# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
-df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, 
substrait_plan)
+- [Serialize query plans using Substrait](./examples/substrait.py)
 
-# Back to Substrait Plan just for demonstration purposes
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
-substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
+### Executing SQL against DataFrame Libraries (Experimental)
 
-```
+- [Executing SQL on Polars](./examples/sql-on-polars.py)
+- [Executing SQL on Pandas](./examples/sql-on-pandas.py)
 
 ## How to install (from pip)
 
diff --git a/conda/environments/datafusion-dev.yaml 
b/conda/environments/datafusion-dev.yaml
index 0e17e16..d9405e4 100644
--- a/conda/environments/datafusion-dev.yaml
+++ b/conda/environments/datafusion-dev.yaml
@@ -28,7 +28,7 @@ dependencies:
 - pytest
 - toml
 - importlib_metadata
-- python>=3.7,<3.11
+- python>=3.10
 # Packages useful for building distributions and releasing
 - mamba
 - conda-build
@@ -38,4 +38,7 @@ dependencies:
 - pydata-sphinx-theme==0.8.0
 - myst-parser
 - jinja2
+# GPU packages
+- cudf
+- cudatoolkit=11.8
 name: datafusion-dev
diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index b6cd517..46206f0 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -41,8 +41,12 @@ from .common import (
 )
 
 from .expr import (
+    Analyze,
     Expr,
+    Filter,
+    Limit,
     Projection,
+    Sort,
     TableScan,
 )
 
@@ -63,6 +67,10 @@ __all__ = [
     "Projection",
     "DFSchema",
     "DFField",
+    "Analyze",
+    "Sort",
+    "Limit",
+    "Filter",
 ]
 
 
diff --git a/datafusion/cudf.py b/datafusion/cudf.py
new file mode 100644
index 0000000..c38819c
--- /dev/null
+++ b/datafusion/cudf.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cudf
+import datafusion
+from datafusion.expr import Projection, TableScan, Column
+
+
+class SessionContext:
+    def __init__(self):
+        self.datafusion_ctx = datafusion.SessionContext()
+        self.parquet_tables = {}
+
+    def register_parquet(self, name, path):
+        self.parquet_tables[name] = path
+        self.datafusion_ctx.register_parquet(name, path)
+
+    def to_cudf_expr(self, expr):
+
+        # get Python wrapper for logical expression
+        expr = expr.to_variant()
+
+        if isinstance(expr, Column):
+            return expr.name()
+        else:
+            raise Exception("unsupported expression: {}".format(expr))
+
+    def to_cudf_df(self, plan):
+        # recurse down first to translate inputs into pandas data frames
+        inputs = [self.to_cudf_df(x) for x in plan.inputs()]
+
+        # get Python wrapper for logical operator node
+        node = plan.to_variant()
+
+        if isinstance(node, Projection):
+            args = [self.to_cudf_expr(expr) for expr in node.projections()]
+            return inputs[0][args]
+        elif isinstance(node, TableScan):
+            return cudf.read_parquet(self.parquet_tables[node.table_name()])
+        else:
+            raise Exception(
+                "unsupported logical operator: {}".format(type(node))
+            )
+
+    def sql(self, sql):
+        datafusion_df = self.datafusion_ctx.sql(sql)
+        plan = datafusion_df.logical_plan()
+        return self.to_cudf_df(plan)
diff --git a/datafusion/pandas.py b/datafusion/pandas.py
new file mode 100644
index 0000000..f8e5651
--- /dev/null
+++ b/datafusion/pandas.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pandas as pd
+import datafusion
+from datafusion.expr import Projection, TableScan, Column
+
+
+class SessionContext:
+    def __init__(self):
+        self.datafusion_ctx = datafusion.SessionContext()
+        self.parquet_tables = {}
+
+    def register_parquet(self, name, path):
+        self.parquet_tables[name] = path
+        self.datafusion_ctx.register_parquet(name, path)
+
+    def to_pandas_expr(self, expr):
+        # get Python wrapper for logical expression
+        expr = expr.to_variant()
+
+        if isinstance(expr, Column):
+            return expr.name()
+        else:
+            raise Exception("unsupported expression: {}".format(expr))
+
+    def to_pandas_df(self, plan):
+        # recurse down first to translate inputs into pandas data frames
+        inputs = [self.to_pandas_df(x) for x in plan.inputs()]
+
+        # get Python wrapper for logical operator node
+        node = plan.to_variant()
+
+        if isinstance(node, Projection):
+            args = [self.to_pandas_expr(expr) for expr in node.projections()]
+            return inputs[0][args]
+        elif isinstance(node, TableScan):
+            return pd.read_parquet(self.parquet_tables[node.table_name()])
+        else:
+            raise Exception(
+                "unsupported logical operator: {}".format(type(node))
+            )
+
+    def sql(self, sql):
+        datafusion_df = self.datafusion_ctx.sql(sql)
+        plan = datafusion_df.logical_plan()
+        return self.to_pandas_df(plan)
diff --git a/datafusion/polars.py b/datafusion/polars.py
new file mode 100644
index 0000000..a1bafbe
--- /dev/null
+++ b/datafusion/polars.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import polars
+import datafusion
+from datafusion.expr import Projection, TableScan, Aggregate
+from datafusion.expr import Column, AggregateFunction
+
+
+class SessionContext:
+    def __init__(self):
+        self.datafusion_ctx = datafusion.SessionContext()
+        self.parquet_tables = {}
+
+    def register_parquet(self, name, path):
+        self.parquet_tables[name] = path
+        self.datafusion_ctx.register_parquet(name, path)
+
+    def to_polars_expr(self, expr):
+        # get Python wrapper for logical expression
+        expr = expr.to_variant()
+
+        if isinstance(expr, Column):
+            return polars.col(expr.name())
+        else:
+            raise Exception("unsupported expression: {}".format(expr))
+
+    def to_polars_df(self, plan):
+        # recurse down first to translate inputs into Polars data frames
+        inputs = [self.to_polars_df(x) for x in plan.inputs()]
+
+        # get Python wrapper for logical operator node
+        node = plan.to_variant()
+
+        if isinstance(node, Projection):
+            args = [self.to_polars_expr(expr) for expr in node.projections()]
+            return inputs[0].select(*args)
+        elif isinstance(node, Aggregate):
+            groupby_expr = [
+                self.to_polars_expr(expr) for expr in node.group_by_exprs()
+            ]
+            aggs = []
+            for expr in node.aggregate_exprs():
+                expr = expr.to_variant()
+                if isinstance(expr, AggregateFunction):
+                    if expr.aggregate_type() == "COUNT":
+                        aggs.append(polars.count().alias("{}".format(expr)))
+                    else:
+                        raise Exception(
+                            "Unsupported aggregate function {}".format(
+                                expr.aggregate_type()
+                            )
+                        )
+                else:
+                    raise Exception(
+                        "Unsupported aggregate function {}".format(expr)
+                    )
+            df = inputs[0].groupby(groupby_expr).agg(aggs)
+            return df
+        elif isinstance(node, TableScan):
+            return polars.read_parquet(self.parquet_tables[node.table_name()])
+        else:
+            raise Exception(
+                "unsupported logical operator: {}".format(type(node))
+            )
+
+    def sql(self, sql):
+        datafusion_df = self.datafusion_ctx.sql(sql)
+        plan = datafusion_df.logical_plan()
+        return self.to_polars_df(plan)
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 6faffaf..efa2ede 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -35,7 +35,6 @@ def test_create_context_no_args():
 
 
 def test_create_context_with_all_valid_args():
-
     runtime = (
         RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
     )
diff --git a/datafusion/tests/test_dataframe.py 
b/datafusion/tests/test_dataframe.py
index 1894688..292a4b0 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -533,3 +533,14 @@ def test_cache(df):
 def test_count(df):
     # Get number of rows
     assert df.count() == 3
+
+
+def test_to_pandas(df):
+    # Skip test if pandas is not installed
+    pd = pytest.importorskip("pandas")
+
+    # Convert datafusion dataframe to pandas dataframe
+    pandas_df = df.to_pandas()
+    assert type(pandas_df) == pd.DataFrame
+    assert pandas_df.shape == (3, 3)
+    assert set(pandas_df.columns) == {"a", "b", "c"}
diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py
new file mode 100644
index 0000000..143eea6
--- /dev/null
+++ b/datafusion/tests/test_expr.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion.expr import Column, Literal, BinaryExpr, AggregateFunction
+from datafusion.expr import (
+    Projection,
+    Filter,
+    Aggregate,
+    Limit,
+    Sort,
+    TableScan,
+)
+import pytest
+
+
[email protected]
+def test_ctx():
+    ctx = SessionContext()
+    ctx.register_csv("test", "testing/data/csv/aggregate_test_100.csv")
+    return ctx
+
+
+def test_projection(test_ctx):
+    df = test_ctx.sql("select c1, 123, c1 < 123 from test")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Projection)
+
+    expr = plan.projections()
+
+    col1 = expr[0].to_variant()
+    assert isinstance(col1, Column)
+    assert col1.name() == "c1"
+    assert col1.qualified_name() == "test.c1"
+
+    col2 = expr[1].to_variant()
+    assert isinstance(col2, Literal)
+    assert col2.data_type() == "Int64"
+    assert col2.value_i64() == 123
+
+    col3 = expr[2].to_variant()
+    assert isinstance(col3, BinaryExpr)
+    assert isinstance(col3.left().to_variant(), Column)
+    assert col3.op() == "<"
+    assert isinstance(col3.right().to_variant(), Literal)
+
+    plan = plan.input().to_variant()
+    assert isinstance(plan, TableScan)
+
+
+def test_filter(test_ctx):
+    df = test_ctx.sql("select c1 from test WHERE c1 > 5")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Projection)
+
+    plan = plan.input().to_variant()
+    assert isinstance(plan, Filter)
+
+
+def test_limit(test_ctx):
+    df = test_ctx.sql("select c1 from test LIMIT 10")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Limit)
+
+
+def test_aggregate_query(test_ctx):
+    df = test_ctx.sql("select c1, count(*) from test group by c1")
+    plan = df.logical_plan()
+
+    projection = plan.to_variant()
+    assert isinstance(projection, Projection)
+
+    aggregate = projection.input().to_variant()
+    assert isinstance(aggregate, Aggregate)
+
+    col1 = aggregate.group_by_exprs()[0].to_variant()
+    assert isinstance(col1, Column)
+    assert col1.name() == "c1"
+    assert col1.qualified_name() == "test.c1"
+
+    col2 = aggregate.aggregate_exprs()[0].to_variant()
+    assert isinstance(col2, AggregateFunction)
+
+
+def test_sort(test_ctx):
+    df = test_ctx.sql("select c1 from test order by c1")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Sort)
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index e5d9585..7eb8b7c 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -33,8 +33,17 @@ from datafusion.common import (
 
 from datafusion.expr import (
     Expr,
+    Column,
+    Literal,
+    BinaryExpr,
+    AggregateFunction,
     Projection,
     TableScan,
+    Filter,
+    Limit,
+    Aggregate,
+    Sort,
+    Analyze,
 )
 
 
@@ -55,9 +64,23 @@ def test_class_module_is_datafusion():
     ]:
         assert klass.__module__ == "datafusion"
 
-    for klass in [Expr, Projection, TableScan]:
+    # expressions
+    for klass in [Expr, Column, Literal, BinaryExpr, AggregateFunction]:
         assert klass.__module__ == "datafusion.expr"
 
+    # operators
+    for klass in [
+        Projection,
+        TableScan,
+        Aggregate,
+        Sort,
+        Limit,
+        Filter,
+        Analyze,
+    ]:
+        assert klass.__module__ == "datafusion.expr"
+
+    # schema
     for klass in [DFField, DFSchema]:
         assert klass.__module__ == "datafusion.common"
 
diff --git a/dev/release/verify-release-candidate.sh 
b/dev/release/verify-release-candidate.sh
index fee276c..be86f69 100755
--- a/dev/release/verify-release-candidate.sh
+++ b/dev/release/verify-release-candidate.sh
@@ -125,15 +125,19 @@ test_source_distribution() {
   git clone https://github.com/apache/arrow-testing.git testing
   git clone https://github.com/apache/parquet-testing.git parquet-testing
 
-  cargo build
-  cargo test --all
+  python3 -m venv venv
+  source venv/bin/activate
+  python3 -m pip install -U pip
+  python3 -m pip install -r requirements-310.txt
+  maturin develop
+
+  #TODO: we should really run tests here as well
+  #python3 -m pytest
 
   if ( find -iname 'Cargo.toml' | xargs grep SNAPSHOT ); then
     echo "Cargo.toml version should not contain SNAPSHOT for releases"
     exit 1
   fi
-
-  cargo publish --dry-run
 }
 
 TEST_SUCCESS=no
diff --git a/examples/README.md b/examples/README.md
index a3ae0ba..ce98600 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -19,9 +19,31 @@
 
 # DataFusion Python Examples
 
-- [Query a Parquet file using SQL](./sql-parquet.py)
-- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
-- [Run a SQL query and store the results in a Pandas 
DataFrame](./sql-to-pandas.py)
-- [Query PyArrow Data](./query-pyarrow-data.py)
-- [Register a Python UDF with DataFusion](./python-udf.py)
-- [Register a Python UDAF with DataFusion](./python-udaf.py)
+Some examples rely on data which can be downloaded from the following site:
+
+- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
+
+Here is a direct link to the file used in the examples:
+
+- 
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
+
+### Executing Queries with DataFusion
+
+- [Query a Parquet file using SQL](./examples/sql-parquet.py)
+- [Query a Parquet file using the DataFrame 
API](./examples/dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas 
DataFrame](./examples/sql-to-pandas.py)
+- [Query PyArrow Data](./examples/query-pyarrow-data.py)
+
+### Running User-Defined Python Code
+
+- [Register a Python UDF with DataFusion](./examples/python-udf.py)
+- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)
+
+### Substrait Support
+
+- [Serialize query plans using Substrait](./examples/substrait.py)
+
+### Executing SQL against DataFrame Libraries (Experimental)
+
+- [Executing SQL on Polars](./examples/sql-on-polars.py)
+- [Executing SQL on Pandas](./examples/sql-on-pandas.py)
diff --git a/examples/dataframe-parquet.py b/examples/dataframe-parquet.py
index 31a8aa6..0f2e4b8 100644
--- a/examples/dataframe-parquet.py
+++ b/examples/dataframe-parquet.py
@@ -19,7 +19,7 @@ from datafusion import SessionContext
 from datafusion import functions as f
 
 ctx = SessionContext()
-df = ctx.read_parquet(
-    "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-).aggregate([f.col("passenger_count")], [f.count_star()])
+df = ctx.read_parquet("yellow_tripdata_2021-01.parquet").aggregate(
+    [f.col("passenger_count")], [f.count_star()]
+)
 df.show()
diff --git a/examples/dataframe-parquet.py b/examples/sql-on-cudf.py
similarity index 77%
copy from examples/dataframe-parquet.py
copy to examples/sql-on-cudf.py
index 31a8aa6..407cb1f 100644
--- a/examples/dataframe-parquet.py
+++ b/examples/sql-on-cudf.py
@@ -15,11 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datafusion import SessionContext
-from datafusion import functions as f
+from datafusion.cudf import SessionContext
+
 
 ctx = SessionContext()
-df = ctx.read_parquet(
-    "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-).aggregate([f.col("passenger_count")], [f.count_star()])
-df.show()
+ctx.register_parquet(
+    "taxi", "/home/jeremy/Downloads/yellow_tripdata_2021-01.parquet"
+)
+df = ctx.sql("select passenger_count from taxi")
+print(df)
diff --git a/examples/dataframe-parquet.py b/examples/sql-on-pandas.py
similarity index 77%
copy from examples/dataframe-parquet.py
copy to examples/sql-on-pandas.py
index 31a8aa6..0efd776 100644
--- a/examples/dataframe-parquet.py
+++ b/examples/sql-on-pandas.py
@@ -15,11 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datafusion import SessionContext
-from datafusion import functions as f
+from datafusion.pandas import SessionContext
+
 
 ctx = SessionContext()
-df = ctx.read_parquet(
-    "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-).aggregate([f.col("passenger_count")], [f.count_star()])
-df.show()
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+df = ctx.sql("select passenger_count from taxi")
+print(df)
diff --git a/examples/dataframe-parquet.py b/examples/sql-on-polars.py
similarity index 77%
copy from examples/dataframe-parquet.py
copy to examples/sql-on-polars.py
index 31a8aa6..c208114 100644
--- a/examples/dataframe-parquet.py
+++ b/examples/sql-on-polars.py
@@ -15,11 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from datafusion import SessionContext
-from datafusion import functions as f
+from datafusion.polars import SessionContext
+
 
 ctx = SessionContext()
-df = ctx.read_parquet(
-    "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-).aggregate([f.col("passenger_count")], [f.count_star()])
-df.show()
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+df = ctx.sql(
+    "select passenger_count, count(*) from taxi group by passenger_count"
+)
+print(df)
diff --git a/examples/sql-parquet.py b/examples/sql-parquet.py
index 7b2db6f..3cc9fbd 100644
--- a/examples/sql-parquet.py
+++ b/examples/sql-parquet.py
@@ -18,9 +18,7 @@
 from datafusion import SessionContext
 
 ctx = SessionContext()
-ctx.register_parquet(
-    "taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-)
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
 df = ctx.sql(
     "select passenger_count, count(*) from taxi where passenger_count is not 
null group by passenger_count order by passenger_count"
 )
diff --git a/examples/sql-to-pandas.py b/examples/sql-to-pandas.py
index 3569e6d..3e99b22 100644
--- a/examples/sql-to-pandas.py
+++ b/examples/sql-to-pandas.py
@@ -33,17 +33,11 @@ df = ctx.sql(
     "order by passenger_count"
 )
 
-# collect as list of pyarrow.RecordBatch
-results = df.collect()
-
-# get first batch
-batch = results[0]
-
 # convert to Pandas
-df = batch.to_pandas()
+pandas_df = df.to_pandas()
 
 # create a chart
-fig = df.plot(
+fig = pandas_df.plot(
     kind="bar", title="Trip Count by Number of Passengers"
 ).get_figure()
 fig.savefig("chart.png")
diff --git a/examples/substrait.py b/examples/substrait.py
new file mode 100644
index 0000000..c167f7d
--- /dev/null
+++ b/examples/substrait.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion import substrait as ss
+
+
+# Create a DataFusion context
+ctx = SessionContext()
+
+# Register table with context
+ctx.register_parquet(
+    "aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv"
+)
+
+substrait_plan = ss.substrait.serde.serialize_to_plan(
+    "SELECT * FROM aggregate_test_data", ctx
+)
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+
+# Alternative serialization approaches
+# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be 
distributed to file, network, etc safely
+# where they could subsequently be deserialized on the receiving end.
+substrait_bytes = ss.substrait.serde.serialize_bytes(
+    "SELECT * FROM aggregate_test_data", ctx
+)
+
+# Imagine here bytes would be read from network, file, etc ... for example 
brevity this is omitted and variable is simply reused
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
+
+# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
+df_logical_plan = ss.substrait.consumer.from_substrait_plan(
+    ctx, substrait_plan
+)
+
+# Back to Substrait Plan just for demonstration purposes
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 4b9fbca..a1c68dd 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -313,6 +313,24 @@ impl PyDataFrame {
         Ok(())
     }
 
+    /// Convert to pandas dataframe with pyarrow
+    /// Collect the batches, pass to Arrow Table & then convert to Pandas 
DataFrame
+    fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+        let batches = self.collect(py);
+
+        Python::with_gil(|py| {
+            // Instantiate pyarrow Table object and use its from_batches method
+            let table_class = py.import("pyarrow")?.getattr("Table")?;
+            let args = PyTuple::new(py, batches);
+            let table: PyObject = table_class.call_method1("from_batches", 
args)?.into();
+
+            // Use Table.to_pandas() method to convert batches to pandas 
dataframe
+            // See also: 
https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas
+            let result = table.call_method0(py, "to_pandas")?;
+            Ok(result)
+        })
+    }
+
     // Executes this DataFrame to get the total number of rows.
     fn count(&self, py: Python) -> PyResult<usize> {
         Ok(wait_for_future(py, self.df.as_ref().clone().count())?)
diff --git a/src/expr.rs b/src/expr.rs
index f3695fe..90ce6bf 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -22,10 +22,24 @@ use datafusion::arrow::datatypes::DataType;
 use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField};
 
+use crate::errors::py_runtime_err;
+use crate::expr::aggregate_expr::PyAggregateFunction;
+use crate::expr::binary_expr::PyBinaryExpr;
+use crate::expr::column::PyColumn;
+use crate::expr::literal::PyLiteral;
 use datafusion::scalar::ScalarValue;
 
+pub mod aggregate;
+pub mod aggregate_expr;
+pub mod analyze;
+pub mod binary_expr;
+pub mod column;
+pub mod filter;
+pub mod limit;
+pub mod literal;
 pub mod logical_node;
 pub mod projection;
+pub mod sort;
 pub mod table_scan;
 
 /// A PyExpr that can be used on a DataFrame
@@ -49,6 +63,22 @@ impl From<Expr> for PyExpr {
 
 #[pymethods]
 impl PyExpr {
+    /// Return the specific expression
+    fn to_variant(&self, py: Python) -> PyResult<PyObject> {
+        Python::with_gil(|_| match &self.expr {
+            Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)),
+            Expr::Literal(value) => 
Ok(PyLiteral::from(value.clone()).into_py(py)),
+            Expr::BinaryExpr(expr) => 
Ok(PyBinaryExpr::from(expr.clone()).into_py(py)),
+            Expr::AggregateFunction(expr) => {
+                Ok(PyAggregateFunction::from(expr.clone()).into_py(py))
+            }
+            other => Err(py_runtime_err(format!(
+                "Cannot convert this Expr to a Python object: {:?}",
+                other
+            ))),
+        })
+    }
+
     fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr {
         let expr = match op {
             CompareOp::Lt => self.expr.clone().lt(other.expr),
@@ -140,8 +170,20 @@ impl PyExpr {
 
 /// Initializes the `expr` module to match the pattern of `datafusion-expr` 
https://docs.rs/datafusion-expr/latest/datafusion_expr/
 pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
+    // expressions
     m.add_class::<PyExpr>()?;
+    m.add_class::<PyColumn>()?;
+    m.add_class::<PyLiteral>()?;
+    m.add_class::<PyBinaryExpr>()?;
+    m.add_class::<PyLiteral>()?;
+    m.add_class::<PyAggregateFunction>()?;
+    // operators
     m.add_class::<table_scan::PyTableScan>()?;
     m.add_class::<projection::PyProjection>()?;
+    m.add_class::<filter::PyFilter>()?;
+    m.add_class::<limit::PyLimit>()?;
+    m.add_class::<aggregate::PyAggregate>()?;
+    m.add_class::<sort::PySort>()?;
+    m.add_class::<analyze::PyAnalyze>()?;
     Ok(())
 }
diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs
new file mode 100644
index 0000000..98d1f55
--- /dev/null
+++ b/src/expr/aggregate.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::Aggregate;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Aggregate", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAggregate {
+    aggregate: Aggregate,
+}
+
+impl From<Aggregate> for PyAggregate {
+    fn from(aggregate: Aggregate) -> PyAggregate {
+        PyAggregate { aggregate }
+    }
+}
+
+impl TryFrom<PyAggregate> for Aggregate {
+    type Error = DataFusionError;
+
+    fn try_from(agg: PyAggregate) -> Result<Self, Self::Error> {
+        Ok(agg.aggregate)
+    }
+}
+
+impl Display for PyAggregate {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Aggregate
+            \nGroupBy(s): {:?}
+            \nAggregates(s): {:?}
+            \nInput: {:?}
+            \nProjected Schema: {:?}",
+            &self.aggregate.group_expr,
+            &self.aggregate.aggr_expr,
+            self.aggregate.input,
+            self.aggregate.schema
+        )
+    }
+}
+
+#[pymethods]
+impl PyAggregate {
+    /// Retrieves the grouping expressions for this `Aggregate`
+    fn group_by_exprs(&self) -> PyResult<Vec<PyExpr>> {
+        Ok(self
+            .aggregate
+            .group_expr
+            .iter()
+            .map(|e| PyExpr::from(e.clone()))
+            .collect())
+    }
+
+    /// Retrieves the aggregate expressions for this `Aggregate`
+    fn aggregate_exprs(&self) -> PyResult<Vec<PyExpr>> {
+        Ok(self
+            .aggregate
+            .aggr_expr
+            .iter()
+            .map(|e| PyExpr::from(e.clone()))
+            .collect())
+    }
+
+    // Retrieves the input `LogicalPlan` to this `Aggregate` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.aggregate.input).clone())
+    }
+
+    // Resulting Schema for this `Aggregate` node instance
+    fn schema(&self) -> PyDFSchema {
+        (*self.aggregate.schema).clone().into()
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Aggregate({})", self))
+    }
+}
+
+impl LogicalNode for PyAggregate {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
+    }
+}
diff --git a/src/expr/aggregate_expr.rs b/src/expr/aggregate_expr.rs
new file mode 100644
index 0000000..1801051
--- /dev/null
+++ b/src/expr/aggregate_expr.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expr::PyExpr;
+use datafusion_expr::expr::AggregateFunction;
+use pyo3::prelude::*;
+use std::fmt::{Display, Formatter};
+
+#[pyclass(name = "AggregateFunction", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAggregateFunction {
+    aggr: AggregateFunction,
+}
+
+impl From<PyAggregateFunction> for AggregateFunction {
+    fn from(aggr: PyAggregateFunction) -> Self {
+        aggr.aggr
+    }
+}
+
+impl From<AggregateFunction> for PyAggregateFunction {
+    fn from(aggr: AggregateFunction) -> PyAggregateFunction {
+        PyAggregateFunction { aggr }
+    }
+}
+
+impl Display for PyAggregateFunction {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        let args: Vec<String> = self.aggr.args.iter().map(|expr| 
expr.to_string()).collect();
+        write!(f, "{}({})", self.aggr.fun, args.join(", "))
+    }
+}
+
+#[pymethods]
+impl PyAggregateFunction {
+    /// Get the aggregate type, such as "MIN", or "MAX"
+    fn aggregate_type(&self) -> String {
+        format!("{}", self.aggr.fun)
+    }
+
+    /// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
+    fn is_distinct(&self) -> bool {
+        self.aggr.distinct
+    }
+
+    /// Get the arguments to the aggregate function
+    fn args(&self) -> Vec<PyExpr> {
+        self.aggr
+            .args
+            .iter()
+            .map(|expr| PyExpr::from(expr.clone()))
+            .collect()
+    }
+
+    /// Get a String representation of this column
+    fn __repr__(&self) -> String {
+        format!("{}", self)
+    }
+}
diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs
new file mode 100644
index 0000000..095fab0
--- /dev/null
+++ b/src/expr/analyze.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Analyze;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Analyze", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAnalyze {
+    analyze: Analyze,
+}
+
+impl PyAnalyze {
+    pub fn new(analyze: Analyze) -> Self {
+        Self { analyze }
+    }
+}
+
+impl From<Analyze> for PyAnalyze {
+    fn from(analyze: Analyze) -> PyAnalyze {
+        PyAnalyze { analyze }
+    }
+}
+
+impl From<PyAnalyze> for Analyze {
+    fn from(analyze: PyAnalyze) -> Self {
+        analyze.analyze
+    }
+}
+
+impl Display for PyAnalyze {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(f, "Analyze Table")
+    }
+}
+
+#[pymethods]
+impl PyAnalyze {
+    fn verbose(&self) -> PyResult<bool> {
+        Ok(self.analyze.verbose)
+    }
+
+    /// Resulting Schema for this `Analyze` node instance
+    fn schema(&self) -> PyResult<PyDFSchema> {
+        Ok((*self.analyze.schema).clone().into())
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Analyze({})", self))
+    }
+}
+
+impl LogicalNode for PyAnalyze {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.analyze.input).clone())]
+    }
+}
diff --git a/src/expr/binary_expr.rs b/src/expr/binary_expr.rs
new file mode 100644
index 0000000..5f382b7
--- /dev/null
+++ b/src/expr/binary_expr.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expr::PyExpr;
+use datafusion_expr::BinaryExpr;
+use pyo3::prelude::*;
+
+#[pyclass(name = "BinaryExpr", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyBinaryExpr {
+    expr: BinaryExpr,
+}
+
+impl From<PyBinaryExpr> for BinaryExpr {
+    fn from(expr: PyBinaryExpr) -> Self {
+        expr.expr
+    }
+}
+
+impl From<BinaryExpr> for PyBinaryExpr {
+    fn from(expr: BinaryExpr) -> PyBinaryExpr {
+        PyBinaryExpr { expr }
+    }
+}
+
+#[pymethods]
+impl PyBinaryExpr {
+    fn left(&self) -> PyExpr {
+        self.expr.left.as_ref().clone().into()
+    }
+
+    fn right(&self) -> PyExpr {
+        self.expr.right.as_ref().clone().into()
+    }
+
+    fn op(&self) -> String {
+        format!("{}", self.expr.op)
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("{}", self.expr))
+    }
+}
diff --git a/src/expr/column.rs b/src/expr/column.rs
new file mode 100644
index 0000000..16b8bce
--- /dev/null
+++ b/src/expr/column.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::Column;
+use pyo3::prelude::*;
+
+#[pyclass(name = "Column", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyColumn {
+    pub col: Column,
+}
+
+impl PyColumn {
+    pub fn new(col: Column) -> Self {
+        Self { col }
+    }
+}
+
+impl From<Column> for PyColumn {
+    fn from(col: Column) -> PyColumn {
+        PyColumn { col }
+    }
+}
+
+#[pymethods]
+impl PyColumn {
+    /// Get the column name
+    fn name(&self) -> String {
+        self.col.name.clone()
+    }
+
+    /// Get the column relation
+    fn relation(&self) -> Option<String> {
+        self.col.relation.clone()
+    }
+
+    /// Get the fully-qualified column name
+    fn qualified_name(&self) -> String {
+        self.col.flat_name()
+    }
+
+    /// Get a String representation of this column
+    fn __repr__(&self) -> String {
+        self.qualified_name()
+    }
+}
diff --git a/src/expr/filter.rs b/src/expr/filter.rs
new file mode 100644
index 0000000..b7b48b9
--- /dev/null
+++ b/src/expr/filter.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Filter;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Filter", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyFilter {
+    filter: Filter,
+}
+
+impl From<Filter> for PyFilter {
+    fn from(filter: Filter) -> PyFilter {
+        PyFilter { filter }
+    }
+}
+
+impl From<PyFilter> for Filter {
+    fn from(filter: PyFilter) -> Self {
+        filter.filter
+    }
+}
+
+impl Display for PyFilter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Filter
+            \nPredicate: {:?}
+            \nInput: {:?}",
+            &self.filter.predicate, &self.filter.input
+        )
+    }
+}
+
+#[pymethods]
+impl PyFilter {
+    /// Retrieves the predicate expression for this `Filter`
+    fn predicate(&self) -> PyExpr {
+        PyExpr::from(self.filter.predicate.clone())
+    }
+
+    /// Retrieves the input `LogicalPlan` to this `Filter` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.filter.input).clone())
+    }
+
+    /// Resulting Schema for this `Filter` node instance
+    fn schema(&self) -> PyDFSchema {
+        self.filter.input.schema().as_ref().clone().into()
+    }
+
+    fn __repr__(&self) -> String {
+        format!("Filter({})", self)
+    }
+}
+
+impl LogicalNode for PyFilter {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.filter.input).clone())]
+    }
+}
diff --git a/src/expr/limit.rs b/src/expr/limit.rs
new file mode 100644
index 0000000..a50e5b8
--- /dev/null
+++ b/src/expr/limit.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Limit;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Limit", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyLimit {
+    limit: Limit,
+}
+
+impl From<Limit> for PyLimit {
+    fn from(limit: Limit) -> PyLimit {
+        PyLimit { limit }
+    }
+}
+
+impl From<PyLimit> for Limit {
+    fn from(limit: PyLimit) -> Self {
+        limit.limit
+    }
+}
+
+impl Display for PyLimit {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Limit
+            \nSkip: {}
+            \nFetch: {:?}
+            \nInput: {:?}",
+            &self.limit.skip, &self.limit.fetch, &self.limit.input
+        )
+    }
+}
+
+#[pymethods]
+impl PyLimit {
+    /// Retrieves the skip value for this `Limit`
+    fn skip(&self) -> usize {
+        self.limit.skip
+    }
+
+    /// Retrieves the fetch value for this `Limit`
+    fn fetch(&self) -> Option<usize> {
+        self.limit.fetch
+    }
+
+    /// Retrieves the input `LogicalPlan` to this `Limit` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.limit.input).clone())
+    }
+
+    /// Resulting Schema for this `Limit` node instance
+    fn schema(&self) -> PyResult<PyDFSchema> {
+        Ok(self.limit.input.schema().as_ref().clone().into())
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Limit({})", self))
+    }
+}
+
+impl LogicalNode for PyLimit {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.limit.input).clone())]
+    }
+}
diff --git a/src/expr/literal.rs b/src/expr/literal.rs
new file mode 100644
index 0000000..27674ce
--- /dev/null
+++ b/src/expr/literal.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::py_runtime_err;
+use datafusion_common::ScalarValue;
+use pyo3::prelude::*;
+
+#[pyclass(name = "Literal", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyLiteral {
+    pub value: ScalarValue,
+}
+
+impl From<PyLiteral> for ScalarValue {
+    fn from(lit: PyLiteral) -> ScalarValue {
+        lit.value
+    }
+}
+
+impl From<ScalarValue> for PyLiteral {
+    fn from(value: ScalarValue) -> PyLiteral {
+        PyLiteral { value }
+    }
+}
+
+#[pymethods]
+impl PyLiteral {
+    /// Get the data type of this literal value
+    fn data_type(&self) -> String {
+        format!("{}", self.value.get_datatype())
+    }
+
+    fn value_i32(&self) -> PyResult<i32> {
+        if let ScalarValue::Int32(Some(n)) = &self.value {
+            Ok(*n)
+        } else {
+            Err(py_runtime_err("Cannot access value as i32"))
+        }
+    }
+
+    fn value_i64(&self) -> PyResult<i64> {
+        if let ScalarValue::Int64(Some(n)) = &self.value {
+            Ok(*n)
+        } else {
+            Err(py_runtime_err("Cannot access value as i64"))
+        }
+    }
+
+    fn value_str(&self) -> PyResult<String> {
+        if let ScalarValue::Utf8(Some(str)) = &self.value {
+            Ok(str.clone())
+        } else {
+            Err(py_runtime_err("Cannot access value as string"))
+        }
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("{}", self.value))
+    }
+}
diff --git a/src/expr/projection.rs b/src/expr/projection.rs
index 6d04e59..4c158f7 100644
--- a/src/expr/projection.rs
+++ b/src/expr/projection.rs
@@ -15,13 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion_common::DataFusionError;
 use datafusion_expr::logical_plan::Projection;
 use pyo3::prelude::*;
 use std::fmt::{self, Display, Formatter};
 
 use crate::common::df_schema::PyDFSchema;
-use crate::errors::py_runtime_err;
 use crate::expr::logical_node::LogicalNode;
 use crate::expr::PyExpr;
 use crate::sql::logical::PyLogicalPlan;
@@ -32,21 +30,21 @@ pub struct PyProjection {
     projection: Projection,
 }
 
+impl PyProjection {
+    pub fn new(projection: Projection) -> Self {
+        Self { projection }
+    }
+}
+
 impl From<Projection> for PyProjection {
     fn from(projection: Projection) -> PyProjection {
         PyProjection { projection }
     }
 }
 
-impl TryFrom<PyProjection> for Projection {
-    type Error = DataFusionError;
-
-    fn try_from(py_proj: PyProjection) -> Result<Self, Self::Error> {
-        Projection::try_new_with_schema(
-            py_proj.projection.expr,
-            py_proj.projection.input.clone(),
-            py_proj.projection.schema,
-        )
+impl From<PyProjection> for Projection {
+    fn from(proj: PyProjection) -> Self {
+        proj.projection
     }
 }
 
@@ -66,8 +64,7 @@ impl Display for PyProjection {
 #[pymethods]
 impl PyProjection {
     /// Retrieves the expressions for this `Projection`
-    #[pyo3(name = "projections")]
-    fn py_projections(&self) -> PyResult<Vec<PyExpr>> {
+    fn projections(&self) -> PyResult<Vec<PyExpr>> {
         Ok(self
             .projection
             .expr
@@ -76,25 +73,13 @@ impl PyProjection {
             .collect())
     }
 
-    // Retrieves the input `LogicalPlan` to this `Projection` node
-    #[pyo3(name = "input")]
-    fn py_input(&self) -> PyResult<PyLogicalPlan> {
-        // DataFusion make a loose guarantee that each Projection should have 
an input, however
-        // we check for that hear since we are performing explicit index 
retrieval
-        let inputs = LogicalNode::input(self);
-        if !inputs.is_empty() {
-            return Ok(inputs[0].clone());
-        }
-
-        Err(py_runtime_err(format!(
-            "Expected `input` field for Projection node: {}",
-            self
-        )))
+    /// Retrieves the input `LogicalPlan` to this `Projection` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.projection.input).clone())
     }
 
-    // Resulting Schema for this `Projection` node instance
-    #[pyo3(name = "schema")]
-    fn py_schema(&self) -> PyResult<PyDFSchema> {
+    /// Resulting Schema for this `Projection` node instance
+    fn schema(&self) -> PyResult<PyDFSchema> {
         Ok((*self.projection.schema).clone().into())
     }
 
diff --git a/src/expr/sort.rs b/src/expr/sort.rs
new file mode 100644
index 0000000..1d0a7f6
--- /dev/null
+++ b/src/expr/sort.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::Sort;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Sort", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PySort {
+    sort: Sort,
+}
+
+impl From<Sort> for PySort {
+    fn from(sort: Sort) -> PySort {
+        PySort { sort }
+    }
+}
+
+impl TryFrom<PySort> for Sort {
+    type Error = DataFusionError;
+
+    fn try_from(agg: PySort) -> Result<Self, Self::Error> {
+        Ok(agg.sort)
+    }
+}
+
+impl Display for PySort {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Sort
+            \nExpr(s): {:?}
+            \nInput: {:?}
+            \nSchema: {:?}",
+            &self.sort.expr,
+            self.sort.input,
+            self.sort.input.schema()
+        )
+    }
+}
+
+#[pymethods]
+impl PySort {
+    /// Retrieves the sort expressions for this `Sort`
+    fn sort_exprs(&self) -> PyResult<Vec<PyExpr>> {
+        Ok(self
+            .sort
+            .expr
+            .iter()
+            .map(|e| PyExpr::from(e.clone()))
+            .collect())
+    }
+
+    /// Retrieves the input `LogicalPlan` to this `Sort` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.sort.input).clone())
+    }
+
+    /// Resulting Schema for this `Sort` node instance
+    fn schema(&self) -> PyDFSchema {
+        self.sort.input.schema().as_ref().clone().into()
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Sort({})", self))
+    }
+}
+
+impl LogicalNode for PySort {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.sort.input).clone())]
+    }
+}
diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs
index 00504b9..2784523 100644
--- a/src/expr/table_scan.rs
+++ b/src/expr/table_scan.rs
@@ -19,6 +19,8 @@ use datafusion_expr::logical_plan::TableScan;
 use pyo3::prelude::*;
 use std::fmt::{self, Display, Formatter};
 
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
 use crate::{common::df_schema::PyDFSchema, expr::PyExpr};
 
 #[pyclass(name = "TableScan", module = "datafusion.expr", subclass)]
@@ -27,6 +29,12 @@ pub struct PyTableScan {
     table_scan: TableScan,
 }
 
+impl PyTableScan {
+    pub fn new(table_scan: TableScan) -> Self {
+        Self { table_scan }
+    }
+}
+
 impl From<PyTableScan> for TableScan {
     fn from(tbl_scan: PyTableScan) -> TableScan {
         tbl_scan.table_scan
@@ -117,3 +125,10 @@ impl PyTableScan {
         Ok(format!("TableScan({})", self))
     }
 }
+
+impl LogicalNode for PyTableScan {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        // table scans are leaf nodes and do not have inputs
+        vec![]
+    }
+}
diff --git a/src/sql/logical.rs b/src/sql/logical.rs
index dcd7baa..ee48f1e 100644
--- a/src/sql/logical.rs
+++ b/src/sql/logical.rs
@@ -17,6 +17,14 @@
 
 use std::sync::Arc;
 
+use crate::errors::py_runtime_err;
+use crate::expr::aggregate::PyAggregate;
+use crate::expr::analyze::PyAnalyze;
+use crate::expr::filter::PyFilter;
+use crate::expr::limit::PyLimit;
+use crate::expr::projection::PyProjection;
+use crate::expr::sort::PySort;
+use crate::expr::table_scan::PyTableScan;
 use datafusion_expr::LogicalPlan;
 use pyo3::prelude::*;
 
@@ -33,12 +41,33 @@ impl PyLogicalPlan {
             plan: Arc::new(plan),
         }
     }
+
+    pub fn plan(&self) -> Arc<LogicalPlan> {
+        self.plan.clone()
+    }
 }
 
 #[pymethods]
 impl PyLogicalPlan {
+    /// Return the specific logical operator
+    fn to_variant(&self, py: Python) -> PyResult<PyObject> {
+        Python::with_gil(|_| match self.plan.as_ref() {
+            LogicalPlan::Aggregate(plan) => 
Ok(PyAggregate::from(plan.clone()).into_py(py)),
+            LogicalPlan::Analyze(plan) => 
Ok(PyAnalyze::from(plan.clone()).into_py(py)),
+            LogicalPlan::Filter(plan) => 
Ok(PyFilter::from(plan.clone()).into_py(py)),
+            LogicalPlan::Limit(plan) => 
Ok(PyLimit::from(plan.clone()).into_py(py)),
+            LogicalPlan::Projection(plan) => 
Ok(PyProjection::from(plan.clone()).into_py(py)),
+            LogicalPlan::Sort(plan) => 
Ok(PySort::from(plan.clone()).into_py(py)),
+            LogicalPlan::TableScan(plan) => 
Ok(PyTableScan::from(plan.clone()).into_py(py)),
+            other => Err(py_runtime_err(format!(
+                "Cannot convert this plan to a LogicalNode: {:?}",
+                other
+            ))),
+        })
+    }
+
     /// Get the inputs to this plan
-    pub fn inputs(&self) -> Vec<PyLogicalPlan> {
+    fn inputs(&self) -> Vec<PyLogicalPlan> {
         let mut inputs = vec![];
         for input in self.plan.inputs() {
             inputs.push(input.to_owned().into());
@@ -46,19 +75,23 @@ impl PyLogicalPlan {
         inputs
     }
 
-    pub fn display(&self) -> String {
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("{:?}", self.plan))
+    }
+
+    fn display(&self) -> String {
         format!("{}", self.plan.display())
     }
 
-    pub fn display_indent(&self) -> String {
+    fn display_indent(&self) -> String {
         format!("{}", self.plan.display_indent())
     }
 
-    pub fn display_indent_schema(&self) -> String {
+    fn display_indent_schema(&self) -> String {
         format!("{}", self.plan.display_indent_schema())
     }
 
-    pub fn display_graphviz(&self) -> String {
+    fn display_graphviz(&self) -> String {
         format!("{}", self.plan.display_indent_schema())
     }
 }

Reply via email to