This is an automated email from the ASF dual-hosted git repository.

zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 94103279db GH-44629: [C++][Acero] Use `implicit_ordering` for 
`asof_join` rather than `require_sequenced_output` (#44616)
94103279db is described below

commit 94103279db56459338efd84a0450d34134fcf865
Author: Enrico Minack <[email protected]>
AuthorDate: Tue Feb 11 03:17:39 2025 +0100

    GH-44629: [C++][Acero] Use `implicit_ordering` for `asof_join` rather than 
`require_sequenced_output` (#44616)
    
    ### Rationale for this change
    Changes in #44083 (GH-41706) unnecessarily sequences batches retrieved from 
scanner where it only requires the batches to provide index according to 
implicit input order.
    
    ### What changes are included in this PR?
    Setting `implicit_ordering` causes existing code to set batch index, which 
is then available to the `asof_join` node to sequence the batches int input 
order. This replaces some of #44083 changes.
    
    Some code introduced by #44083 turns out to not be required and has 
therefore been reverted.
    
    ### Are these changes tested?
    Existing unit tests still pass.
    
    ### Are there any user-facing changes?
    Reverts some breaking user-facing changes done by #44083.
    * GitHub Issue: #41706
    * GitHub Issue: #44629
    
    Authored-by: Enrico Minack <[email protected]>
    Signed-off-by: Rossi Sun <[email protected]>
---
 cpp/src/arrow/acero/options.h                |  4 ++--
 cpp/src/arrow/acero/source_node.cc           |  2 +-
 cpp/src/arrow/dataset/scanner.cc             | 10 +++++-----
 cpp/src/arrow/dataset/scanner.h              | 15 +++++++++++----
 python/pyarrow/_dataset.pyx                  | 12 ++++++++----
 python/pyarrow/acero.py                      |  8 ++++----
 python/pyarrow/includes/libarrow_dataset.pxd |  2 +-
 7 files changed, 32 insertions(+), 21 deletions(-)

diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h
index 2beacfe26b..2eb7df0085 100644
--- a/cpp/src/arrow/acero/options.h
+++ b/cpp/src/arrow/acero/options.h
@@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public 
ExecNodeOptions {
   std::shared_ptr<Schema> output_schema;
   /// \brief an asynchronous stream of batches ending with std::nullopt
   std::function<Future<std::optional<ExecBatch>>()> generator;
-
-  Ordering ordering = Ordering::Unordered();
+  /// \brief the order of the data, defaults to Ordering::Unordered
+  Ordering ordering;
 };
 
 /// \brief a node that generates data from a table already loaded in memory
diff --git a/cpp/src/arrow/acero/source_node.cc 
b/cpp/src/arrow/acero/source_node.cc
index ac34e4b6a0..2d3e2a1da1 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -407,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
 struct RecordBatchReaderSourceNode : public SourceNode {
   RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
                               arrow::AsyncGenerator<std::optional<ExecBatch>> 
generator)
-      : SourceNode(plan, schema, generator, Ordering::Implicit()) {}
+      : SourceNode(plan, schema, generator) {}
 
   static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
                                 const ExecNodeOptions& options) {
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 0df8fd8026..e66fed3f06 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -1000,6 +1000,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* 
plan,
   auto scan_options = scan_node_options.scan_options;
   auto dataset = scan_node_options.dataset;
   bool require_sequenced_output = scan_node_options.require_sequenced_output;
+  bool implicit_ordering = scan_node_options.implicit_ordering;
 
   RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));
 
@@ -1032,11 +1033,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* 
plan,
   } else {
     batch_gen = std::move(merged_batch_gen);
   }
-  int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;
+
   auto gen = MakeMappedGenerator(
       std::move(batch_gen),
-      [scan_options, index](const EnumeratedRecordBatch& partial) mutable
-      -> Result<std::optional<compute::ExecBatch>> {
+      [scan_options](const EnumeratedRecordBatch& partial)
+          -> Result<std::optional<compute::ExecBatch>> {
         // TODO(ARROW-13263) fragments may be able to attach more guarantees 
to batches
         // than this, for example parquet's row group stats. Failing to do 
this leaves
         // perf on the table because row group stats could be used to skip 
kernel execs in
@@ -1057,11 +1058,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* 
plan,
         batch->values.emplace_back(partial.record_batch.index);
         batch->values.emplace_back(partial.record_batch.last);
         batch->values.emplace_back(partial.fragment.value->ToString());
-        if (index != compute::kUnsequencedIndex) batch->index = index++;
         return batch;
       });
 
-  auto ordering = require_sequenced_output ? Ordering::Implicit() : 
Ordering::Unordered();
+  auto ordering = implicit_ordering ? Ordering::Implicit() : 
Ordering::Unordered();
 
   auto fields = scan_options->dataset_schema->fields();
   if (scan_options->add_augmented_fields) {
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index d2de267897..af8888ad87 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -557,20 +557,27 @@ class ARROW_DS_EXPORT ScannerBuilder {
 /// \brief Construct a source ExecNode which yields batches from a dataset 
scan.
 ///
 /// Does not construct associated filter or project nodes.
-/// Yielded batches will be augmented with fragment/batch indices to enable 
stable
-/// ordering for simple ExecPlans.
+///
+/// Batches are yielded sequentially, like single-threaded,
+/// when require_sequenced_output=true.
+///
+/// Yielded batches will be augmented with fragment/batch indices when
+/// implicit_ordering=true to enable stable ordering for simple ExecPlans.
 class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
  public:
   explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
                            std::shared_ptr<ScanOptions> scan_options,
-                           bool require_sequenced_output = false)
+                           bool require_sequenced_output = false,
+                           bool implicit_ordering = false)
       : dataset(std::move(dataset)),
         scan_options(std::move(scan_options)),
-        require_sequenced_output(require_sequenced_output) {}
+        require_sequenced_output(require_sequenced_output),
+        implicit_ordering(implicit_ordering) {}
 
   std::shared_ptr<Dataset> dataset;
   std::shared_ptr<ScanOptions> scan_options;
   bool require_sequenced_output;
+  bool implicit_ordering;
 };
 
 /// @}
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index fd50215cee..f8c8b9bc11 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -4077,13 +4077,15 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
         cdef:
             shared_ptr[CScanOptions] c_scan_options
             bint require_sequenced_output=False
+            bint implicit_ordering=False
 
         c_scan_options = Scanner._make_scan_options(dataset, scan_options)
 
         require_sequenced_output=scan_options.get("require_sequenced_output", 
False)
+        implicit_ordering=scan_options.get("implicit_ordering", False)
 
         self.wrapped.reset(
-            new CScanNodeOptions(dataset.unwrap(), c_scan_options, 
require_sequenced_output)
+            new CScanNodeOptions(dataset.unwrap(), c_scan_options, 
require_sequenced_output, implicit_ordering)
         )
 
 
@@ -4101,8 +4103,8 @@ class ScanNodeOptions(_ScanNodeOptions):
     expression or projection to the scan node that you also supply
     to the filter or project node.
 
-    Yielded batches will be augmented with fragment/batch indices to
-    enable stable ordering for simple ExecPlans.
+    Yielded batches will be augmented with fragment/batch indices when
+    implicit_ordering=True to enable stable ordering for simple ExecPlans.
 
     Parameters
     ----------
@@ -4111,7 +4113,9 @@ class ScanNodeOptions(_ScanNodeOptions):
     **kwargs : dict, optional
         Scan options. See `Scanner.from_dataset` for possible arguments.       
 
     require_sequenced_output : bool, default False
-        Assert implicit ordering on data.
+        Batches are yielded sequentially, like single-threaded
+    implicit_ordering : bool, default False
+        Preserve implicit ordering of data.
     """
 
     def __init__(self, Dataset dataset, **kwargs):
diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py
index 706338bd8c..86bf7cbf4d 100644
--- a/python/pyarrow/acero.py
+++ b/python/pyarrow/acero.py
@@ -56,10 +56,10 @@ except ImportError:
     ds = DatasetModuleStub
 
 
-def _dataset_to_decl(dataset, use_threads=True, 
require_sequenced_output=False):
+def _dataset_to_decl(dataset, use_threads=True, implicit_ordering=False):
     decl = Declaration("scan", ScanNodeOptions(
         dataset, use_threads=use_threads,
-        require_sequenced_output=require_sequenced_output))
+        implicit_ordering=implicit_ordering))
 
     # Get rid of special dataset columns
     # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
@@ -316,7 +316,7 @@ def _perform_join_asof(left_operand, left_on, left_by,
         left_source = _dataset_to_decl(
             left_operand,
             use_threads=use_threads,
-            require_sequenced_output=True)
+            implicit_ordering=True)
     else:
         left_source = Declaration(
             "table_source", TableSourceNodeOptions(left_operand),
@@ -324,7 +324,7 @@ def _perform_join_asof(left_operand, left_on, left_by,
     if isinstance(right_operand, ds.Dataset):
         right_source = _dataset_to_decl(
             right_operand, use_threads=use_threads,
-            require_sequenced_output=True)
+            implicit_ordering=True)
     else:
         right_source = Declaration(
             "table_source", TableSourceNodeOptions(right_operand)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd 
b/python/pyarrow/includes/libarrow_dataset.pxd
index d2fbcd0ee4..403e306e13 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
         CExpression filter
 
     cdef cppclass CScanNodeOptions 
"arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
-        CScanNodeOptions(shared_ptr[CDataset] dataset, 
shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)
+        CScanNodeOptions(shared_ptr[CDataset] dataset, 
shared_ptr[CScanOptions] scan_options, bint require_sequenced_output, bint 
implicit_ordering)
 
         shared_ptr[CScanOptions] scan_options
 

Reply via email to