westonpace commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r914281708


##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +447,73 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, 
coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        """Apply an additional row filter to the filtered dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        FilteredDataset
+        """
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:

Review Comment:
   If `self._filter` can be `None` then what is the advantage of creating a 
separate `FilteredDataset` instead of just adding `_filter` to the existing 
`Dataset`?



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +447,73 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, 
coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        """Apply an additional row filter to the filtered dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        FilteredDataset
+        """
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        if "filter" in options:
+            raise ValueError(
+                "Passing filter in scanner option is not valid for 
FilteredDataset."

Review Comment:
   Why wouldn't it just AND the incoming filter with the existing filter?



##########
cpp/src/arrow/compute/exec/options.cc:
##########
@@ -49,17 +49,29 @@ std::string ToString(JoinType t) {
 }
 
 Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
-    const Table& table, arrow::internal::Executor* exc) {
+    const Table& table, arrow::internal::Executor* executor) {
   std::shared_ptr<RecordBatchReader> reader = 
std::make_shared<TableBatchReader>(table);
 
-  if (exc == nullptr) return Status::TypeError("No executor provided.");
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
 
   // Map the RecordBatchReader to a SourceNode
-  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), 
exc));
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), 
executor));
 
   return std::shared_ptr<SourceNodeOptions>(
       new SourceNodeOptions(table.schema(), batch_gen));
 }
 
+Result<std::shared_ptr<SourceNodeOptions>> 
SourceNodeOptions::FromRecordBatchReader(
+    std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
+    arrow::internal::Executor* executor) {
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
+
+  // Map the RecordBatchReader to a SourceNode
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), 
executor));
+
+  return std::shared_ptr<SourceNodeOptions>(
+      new SourceNodeOptions(std::move(schema), batch_gen));

Review Comment:
   ```suggestion
     return std::make_shared<SourceNodeOptions>(std::move(schema), 
std::move(batch_gen));
   ```



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] 
plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, 
CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):

Review Comment:
   While there shouldn't be a significant performance difference this does seem 
a little less than ideal.  Could the `_make_scanner` method be changed to a 
method that creates scan options instead?
   
   Also, if the user only asks for some columns does the projection get passed 
down into the `ToRecordBatchReader` call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to