This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a6fc1f9ee GH-33976: [Python] Add scan ExecNode options (#34530)
3a6fc1f9ee is described below

commit 3a6fc1f9eedd41df2d8ffbcbdfbdab911ff6d82e
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Tue Mar 14 09:41:02 2023 +0100

    GH-33976: [Python] Add scan ExecNode options (#34530)
    
    Continuing GH-34102, this adds the exec node options classes defined in the 
dataset module (scan, not yet write).
    
    * Issue: #33976
    
    Authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 python/pyarrow/_dataset.pyx        | 54 +++++++++++++++++++++++++++++++++++---
 python/pyarrow/tests/test_acero.py | 37 ++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 3 deletions(-)

diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 16158f6749..67b82c8c87 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -29,6 +29,7 @@ import pyarrow as pa
 from pyarrow.lib cimport *
 from pyarrow.lib import ArrowTypeError, frombytes, tobytes, _pc
 from pyarrow.includes.libarrow_dataset cimport *
+from pyarrow._acero cimport ExecNodeOptions
 from pyarrow._compute cimport Expression, _bind
 from pyarrow._compute import _forbid_instantiation
 from pyarrow._fs cimport FileSystem, FileSelector
@@ -37,14 +38,19 @@ from pyarrow._csv cimport (
 from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
 
 
-_orc_fileformat = None
-_orc_imported = False
-
 _DEFAULT_BATCH_SIZE = 2**17
 _DEFAULT_BATCH_READAHEAD = 16
 _DEFAULT_FRAGMENT_READAHEAD = 4
 
 
+# Initialise support for Datasets in ExecPlan
+Initialize()
+
+
+_orc_fileformat = None
+_orc_imported = False
+
+
 def _get_orc_fileformat():
     """
     Import OrcFileFormat on first usage (to avoid circular import issue
@@ -3634,3 +3640,45 @@ def _filesystemdataset_write(
     c_scanner = data.unwrap()
     with nogil:
         check_status(CFileSystemDataset.Write(c_options, c_scanner))
+
+
+cdef class _ScanNodeOptions(ExecNodeOptions):
+
+    def _set_options(self, Dataset dataset, dict scan_options):
+        cdef:
+            shared_ptr[CScanOptions] c_scan_options
+
+        c_scan_options = Scanner._make_scan_options(dataset, scan_options)
+
+        self.wrapped.reset(
+            new CScanNodeOptions(dataset.unwrap(), c_scan_options)
+        )
+
+
+class ScanNodeOptions(_ScanNodeOptions):
+    """
+    A Source node which yields batches from a Dataset scan.
+
+    This is the option class for the "scan" node factory.
+
+    This node is capable of applying pushdown projections or filters
+    to the file readers which reduce the amount of data that needs to
+    be read (if supported by the file format). But note that this does not
+    construct associated filter or project nodes to perform the final
+    filtering or projection. Rather, you may supply the same filter
+    expression or projection to the scan node that you also supply
+    to the filter or project node.
+
+    Yielded batches will be augmented with fragment/batch indices to
+    enable stable ordering for simple ExecPlans.
+
+    Parameters
+    ----------
+    dataset : pyarrow.dataset.Dataset
+        The table which acts as the data source.
+    **kwargs : dict, optional
+        Scan options. See `Scanner.from_dataset` for possible arguments.
+    """
+
+    def __init__(self, Dataset dataset, **kwargs):
+        self._set_options(dataset, kwargs)
diff --git a/python/pyarrow/tests/test_acero.py 
b/python/pyarrow/tests/test_acero.py
index 4cba55f7e4..9ac54824e1 100644
--- a/python/pyarrow/tests/test_acero.py
+++ b/python/pyarrow/tests/test_acero.py
@@ -20,6 +20,7 @@ import pytest
 import pyarrow as pa
 import pyarrow.compute as pc
 from pyarrow.compute import field
+import pyarrow.dataset as ds
 
 from pyarrow._acero import (
     TableSourceNodeOptions,
@@ -29,6 +30,7 @@ from pyarrow._acero import (
     HashJoinNodeOptions,
     Declaration,
 )
+from pyarrow._dataset import ScanNodeOptions
 
 
 @pytest.fixture
@@ -291,3 +293,38 @@ def test_hash_join():
         names=["key", "a", "b"]
     )
     assert result.sort_by("a").equals(expected)
+
+
+def test_scan(tempdir):
+    table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+    ds.write_dataset(table, tempdir / "dataset", format="parquet")
+    dataset = ds.dataset(tempdir / "dataset", format="parquet")
+    decl = Declaration("scan", ScanNodeOptions(dataset))
+    result = decl.to_table()
+    assert result.schema.names == [
+        "a", "b", "__fragment_index", "__batch_index",
+        "__last_in_fragment", "__filename"
+    ]
+    assert result.select(["a", "b"]).equals(table)
+
+    # using a filter only does pushdown (depending on file format), not actual 
filter
+
+    scan_opts = ScanNodeOptions(dataset, filter=field('a') > 1)
+    decl = Declaration("scan", scan_opts)
+    # fragment not filtered based on min/max statistics
+    assert decl.to_table().num_rows == 3
+
+    scan_opts = ScanNodeOptions(dataset, filter=field('a') > 4)
+    decl = Declaration("scan", scan_opts)
+    # full fragment filtered based on min/max statistics
+    assert decl.to_table().num_rows == 0
+
+    # projection scan option
+
+    scan_opts = ScanNodeOptions(dataset, columns={"a2": 
pc.multiply(field("a"), 2)})
+    decl = Declaration("scan", scan_opts)
+    result = decl.to_table()
+    # "a" is included in the result (needed later on for the actual projection)
+    assert result["a"].to_pylist() == [1, 2, 3]
+    # "b" is still included, but without data as it will be removed by the 
projection
+    assert pc.all(result["b"].is_null()).as_py()

Reply via email to