This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 3a6fc1f9ee GH-33976: [Python] Add scan ExecNode options (#34530)
3a6fc1f9ee is described below
commit 3a6fc1f9eedd41df2d8ffbcbdfbdab911ff6d82e
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Tue Mar 14 09:41:02 2023 +0100
GH-33976: [Python] Add scan ExecNode options (#34530)
Continuing GH-34102, this adds the exec node options classes defined in the
dataset module (scan, not yet write).
* Issue: #33976
Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
python/pyarrow/_dataset.pyx | 54 +++++++++++++++++++++++++++++++++++---
python/pyarrow/tests/test_acero.py | 37 ++++++++++++++++++++++++++
2 files changed, 88 insertions(+), 3 deletions(-)
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 16158f6749..67b82c8c87 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -29,6 +29,7 @@ import pyarrow as pa
from pyarrow.lib cimport *
from pyarrow.lib import ArrowTypeError, frombytes, tobytes, _pc
from pyarrow.includes.libarrow_dataset cimport *
+from pyarrow._acero cimport ExecNodeOptions
from pyarrow._compute cimport Expression, _bind
from pyarrow._compute import _forbid_instantiation
from pyarrow._fs cimport FileSystem, FileSelector
@@ -37,14 +38,19 @@ from pyarrow._csv cimport (
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
-_orc_fileformat = None
-_orc_imported = False
-
_DEFAULT_BATCH_SIZE = 2**17
_DEFAULT_BATCH_READAHEAD = 16
_DEFAULT_FRAGMENT_READAHEAD = 4
+# Initialise support for Datasets in ExecPlan
+Initialize()
+
+
+_orc_fileformat = None
+_orc_imported = False
+
+
def _get_orc_fileformat():
"""
Import OrcFileFormat on first usage (to avoid circular import issue
@@ -3634,3 +3640,45 @@ def _filesystemdataset_write(
c_scanner = data.unwrap()
with nogil:
check_status(CFileSystemDataset.Write(c_options, c_scanner))
+
+
+cdef class _ScanNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, Dataset dataset, dict scan_options):
+ cdef:
+ shared_ptr[CScanOptions] c_scan_options
+
+ c_scan_options = Scanner._make_scan_options(dataset, scan_options)
+
+ self.wrapped.reset(
+ new CScanNodeOptions(dataset.unwrap(), c_scan_options)
+ )
+
+
+class ScanNodeOptions(_ScanNodeOptions):
+ """
+ A Source node which yields batches from a Dataset scan.
+
+ This is the option class for the "scan" node factory.
+
+ This node is capable of applying pushdown projections or filters
+ to the file readers which reduce the amount of data that needs to
+ be read (if supported by the file format). But note that this does not
+ construct associated filter or project nodes to perform the final
+ filtering or projection. Rather, you may supply the same filter
+ expression or projection to the scan node that you also supply
+ to the filter or project node.
+
+ Yielded batches will be augmented with fragment/batch indices to
+ enable stable ordering for simple ExecPlans.
+
+ Parameters
+ ----------
+ dataset : pyarrow.dataset.Dataset
+ The table which acts as the data source.
+ **kwargs : dict, optional
+ Scan options. See `Scanner.from_dataset` for possible arguments.
+ """
+
+ def __init__(self, Dataset dataset, **kwargs):
+ self._set_options(dataset, kwargs)
diff --git a/python/pyarrow/tests/test_acero.py
b/python/pyarrow/tests/test_acero.py
index 4cba55f7e4..9ac54824e1 100644
--- a/python/pyarrow/tests/test_acero.py
+++ b/python/pyarrow/tests/test_acero.py
@@ -20,6 +20,7 @@ import pytest
import pyarrow as pa
import pyarrow.compute as pc
from pyarrow.compute import field
+import pyarrow.dataset as ds
from pyarrow._acero import (
TableSourceNodeOptions,
@@ -29,6 +30,7 @@ from pyarrow._acero import (
HashJoinNodeOptions,
Declaration,
)
+from pyarrow._dataset import ScanNodeOptions
@pytest.fixture
@@ -291,3 +293,38 @@ def test_hash_join():
names=["key", "a", "b"]
)
assert result.sort_by("a").equals(expected)
+
+
+def test_scan(tempdir):
+ table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+ ds.write_dataset(table, tempdir / "dataset", format="parquet")
+ dataset = ds.dataset(tempdir / "dataset", format="parquet")
+ decl = Declaration("scan", ScanNodeOptions(dataset))
+ result = decl.to_table()
+ assert result.schema.names == [
+ "a", "b", "__fragment_index", "__batch_index",
+ "__last_in_fragment", "__filename"
+ ]
+ assert result.select(["a", "b"]).equals(table)
+
+ # using a filter only does pushdown (depending on file format), not actual
filter
+
+ scan_opts = ScanNodeOptions(dataset, filter=field('a') > 1)
+ decl = Declaration("scan", scan_opts)
+ # fragment not filtered based on min/max statistics
+ assert decl.to_table().num_rows == 3
+
+ scan_opts = ScanNodeOptions(dataset, filter=field('a') > 4)
+ decl = Declaration("scan", scan_opts)
+ # full fragment filtered based on min/max statistics
+ assert decl.to_table().num_rows == 0
+
+ # projection scan option
+
+ scan_opts = ScanNodeOptions(dataset, columns={"a2":
pc.multiply(field("a"), 2)})
+ decl = Declaration("scan", scan_opts)
+ result = decl.to_table()
+ # "a" is included in the result (needed later on for the actual projection)
+ assert result["a"].to_pylist() == [1, 2, 3]
+ # "b" is still included, but without data as it will be removed by the
projection
+ assert pc.all(result["b"].is_null()).as_py()