pitrou commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913843039


##########
docs/source/python/compute.rst:
##########
@@ -368,5 +368,19 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter:
    nums: [[6,8,10]]
    chars: [["f","h","l"]]
 
-:class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` 
method
-passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset 
documentation.
+:class:`.Dataset` can be filtered with :meth:`.Dataset.filter` method too.

Review Comment:
   Wording suggestion
   ```suggestion
   :class:`.Dataset` can similarly be filtered with the :meth:`.Dataset.filter` 
method.
   ```



##########
cpp/src/arrow/compute/exec/options.cc:
##########
@@ -49,17 +49,28 @@ std::string ToString(JoinType t) {
 }
 
 Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
-    const Table& table, arrow::internal::Executor* exc) {
+    const Table& table, arrow::internal::Executor* executor) {
   std::shared_ptr<RecordBatchReader> reader = 
std::make_shared<TableBatchReader>(table);
 
-  if (exc == nullptr) return Status::TypeError("No executor provided.");
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
 
   // Map the RecordBatchReader to a SourceNode
-  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), 
exc));
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), 
executor));
 
   return std::shared_ptr<SourceNodeOptions>(
       new SourceNodeOptions(table.schema(), batch_gen));
 }
 
+Result<std::shared_ptr<SourceNodeOptions>> 
SourceNodeOptions::FromRecordBatchReader(
+    std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
+    arrow::internal::Executor* executor) {
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
+
+  // Map the RecordBatchReader to a SourceNode
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), 
executor));
+
+  return std::shared_ptr<SourceNodeOptions>(new SourceNodeOptions(schema, 
batch_gen));

Review Comment:
   Nit: move parameter to avoid copying (you may need to reformat after that 
:-))
   ```suggestion
     return std::shared_ptr<SourceNodeOptions>(new 
SourceNodeOptions(std::move(schema), batch_gen));
   ```



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] 
plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, 
CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):

Review Comment:
   I wonder why the existing code for Dataset below wouldn't work here. 
Especially, one creates a Source node, the other a Scan node. Is there a 
functional or performance difference? cc @westonpace for insight.



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] 
plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, 
CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):
+            node_factory = "source"
+            c_in_dataset = (<Dataset>ipt).unwrap()
+            c_dataset_scanner = <shared_ptr[CScanner]>(
+                (<Scanner>(<FilteredDataset>ipt)._make_scanner({})).unwrap()
+            )
+            c_recordbatchreader_in = <shared_ptr[CRecordBatchReader]>(

Review Comment:
   Same question here: why the explicit cast to 
`<shared_ptr[CRecordBatchReader]>`?



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, 
coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):

Review Comment:
   Can you add a docstring here? It won't be inherited automatically from the 
parent, unfortunately.



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] 
plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, 
CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):
+            node_factory = "source"
+            c_in_dataset = (<Dataset>ipt).unwrap()
+            c_dataset_scanner = <shared_ptr[CScanner]>(

Review Comment:
   I'm curious: why do you need the explicit cast to `shared_ptr[CScanner]` 
here?



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, 
coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        scanner_options = dict(options, filter=self._filter)
+        return Scanner.from_dataset(self, **scanner_options)
+
+    def scanner(self, **kwargs):

Review Comment:
   Here as well, we should probably add a docstring.
   (or try to arrange the code such that overriding isn't possible? perhaps by 
delegating to `_make_scanner` both in the base class and here?)



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, 
coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        scanner_options = dict(options, filter=self._filter)
+        return Scanner.from_dataset(self, **scanner_options)

Review Comment:
   Simpler wording:
   ```suggestion
           return Scanner.from_dataset(self, filter=self._filter, **options)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to