This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a0f06c4a4 GH-41706: [C++][Acero] Enhance asof_join to work in 
multi-threaded execution by sequencing input (#44083)
2a0f06c4a4 is described below

commit 2a0f06c4a40ec64e1f4f046a7a8aa165329c66d6
Author: mroz45 <[email protected]>
AuthorDate: Tue Oct 29 22:00:56 2024 +0100

    GH-41706: [C++][Acero] Enhance asof_join to work in multi-threaded 
execution by sequencing input (#44083)
    
    
    
    ### Rationale for this change
    This is initial PR. I found that with specyfics parameters test fails.
    
    ### What changes are included in this PR?
    In this PR I provoke fail of asof_join_node_test.
    
    * GitHub Issue: #41706
    
    Authored-by: kamilt <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/acero/accumulation_queue.h     |  1 +
 cpp/src/arrow/acero/asof_join_node.cc        | 27 ++++++++++++++++++++-------
 cpp/src/arrow/acero/asof_join_node_test.cc   |  8 ++++++--
 cpp/src/arrow/acero/options.h                |  9 +++++++--
 cpp/src/arrow/acero/source_node.cc           |  5 +++--
 cpp/src/arrow/acero/test_util_internal.cc    |  4 ++++
 cpp/src/arrow/dataset/scanner.cc             | 11 +++++++----
 python/pyarrow/_dataset.pyx                  |  9 +++++++--
 python/pyarrow/acero.py                      | 15 +++++++++++----
 python/pyarrow/includes/libarrow_dataset.pxd |  2 +-
 10 files changed, 67 insertions(+), 24 deletions(-)

diff --git a/cpp/src/arrow/acero/accumulation_queue.h 
b/cpp/src/arrow/acero/accumulation_queue.h
index a27b8b399c..a173f98403 100644
--- a/cpp/src/arrow/acero/accumulation_queue.h
+++ b/cpp/src/arrow/acero/accumulation_queue.h
@@ -128,6 +128,7 @@ class SerialSequencingQueue {
   /// Strategy that describes how to handle items
   class Processor {
    public:
+    virtual ~Processor() = default;
     /// Process the batch
     ///
     /// This method will be called on each batch in order.  Calls to this 
method
diff --git a/cpp/src/arrow/acero/asof_join_node.cc 
b/cpp/src/arrow/acero/asof_join_node.cc
index c4f11d01f3..a5a80c8805 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "arrow/acero/asof_join_node.h"
+#include "arrow/acero/accumulation_queue.h"
 #include "arrow/acero/backpressure_handler.h"
 #include "arrow/acero/concurrent_queue_internal.h"
 
@@ -471,7 +472,7 @@ class BackpressureController : public BackpressureControl {
   std::atomic<int32_t>& backpressure_counter_;
 };
 
-class InputState {
+class InputState : public util::SerialSequencingQueue::Processor {
   // InputState corresponds to an input
   // Input record batches are queued up in InputState until processed and
   // turned into output record batches.
@@ -482,7 +483,8 @@ class InputState {
              const std::shared_ptr<arrow::Schema>& schema,
              const col_index_t time_col_index,
              const std::vector<col_index_t>& key_col_index)
-      : queue_(std::move(handler)),
+      : sequencer_(util::SerialSequencingQueue::Make(this)),
+        queue_(std::move(handler)),
         schema_(schema),
         time_col_index_(time_col_index),
         key_col_index_(key_col_index),
@@ -699,7 +701,16 @@ class InputState {
                DEBUG_MANIP(std::endl));
     return updated;
   }
+  Status InsertBatch(ExecBatch batch) {
+    return sequencer_->InsertBatch(std::move(batch));
+  }
 
+  Status Process(ExecBatch batch) override {
+    auto rb = *batch.ToRecordBatch(schema_);
+    DEBUG_SYNC(node_, "received batch from input ", index_, ":", 
DEBUG_MANIP(std::endl),
+               rb->ToString(), DEBUG_MANIP(std::endl));
+    return Push(rb);
+  }
   void Rehash() {
     DEBUG_SYNC(node_, "rehashing for input ", index_, ":", 
DEBUG_MANIP(std::endl));
     MemoStore new_memo(DEBUG_ADD(memo_.no_future_, node_, index_));
@@ -760,6 +771,8 @@ class InputState {
   }
 
  private:
+  // ExecBatch Sequencer
+  std::unique_ptr<util::SerialSequencingQueue> sequencer_;
   // Pending record batches. The latest is the front. Batches cannot be empty.
   BackpressureConcurrentQueue<std::shared_ptr<RecordBatch>> queue_;
   // Schema associated with the input
@@ -1399,6 +1412,9 @@ class AsofJoinNode : public ExecNode {
     // InputReceived may be called after execution was finished. Pushing it to 
the
     // InputState is unnecessary since we're done (and anyway may cause the
     // BackPressureController to pause the input, causing a deadlock), so drop 
it.
+    if (::arrow::compute::kUnsequencedIndex == batch.index)
+      return Status::Invalid("AsofJoin requires sequenced input");
+
     if (process_task_.is_finished()) {
       DEBUG_SYNC(this, "Input received while done. Short circuiting.",
                  DEBUG_MANIP(std::endl));
@@ -1409,12 +1425,9 @@ class AsofJoinNode : public ExecNode {
     ARROW_DCHECK(std_has(inputs_, input));
     size_t k = std_find(inputs_, input) - inputs_.begin();
 
-    // Put into the queue
-    auto rb = *batch.ToRecordBatch(input->output_schema());
-    DEBUG_SYNC(this, "received batch from input ", k, ":", 
DEBUG_MANIP(std::endl),
-               rb->ToString(), DEBUG_MANIP(std::endl));
+    // Put into the sequencing queue
+    ARROW_RETURN_NOT_OK(state_.at(k)->InsertBatch(std::move(batch)));
 
-    ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
     PushProcess(true);
 
     return Status::OK();
diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc 
b/cpp/src/arrow/acero/asof_join_node_test.cc
index 5d3e9fba08..2983888183 100644
--- a/cpp/src/arrow/acero/asof_join_node_test.cc
+++ b/cpp/src/arrow/acero/asof_join_node_test.cc
@@ -101,6 +101,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
   BatchesWithSchema batches;
   batches.schema = schema;
   int n_fields = schema->num_fields();
+  size_t batch_index = 0;
   for (auto num_batch : num_batches.batches) {
     Datum two(Int32Scalar(2));
     std::vector<Datum> values;
@@ -128,6 +129,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
       }
     }
     ExecBatch batch(values, num_batch.length);
+    batch.index = batch_index++;
     batches.batches.push_back(batch);
   }
   return batches;
@@ -185,6 +187,7 @@ Result<BatchesWithSchema> MutateByKey(BatchesWithSchema& 
batches, std::string fr
                           replace_key ? batches.schema->SetField(from_index, 
new_field)
                                       : batches.schema->AddField(from_index, 
new_field));
   }
+  size_t batch_index = 0;
   for (const ExecBatch& batch : batches.batches) {
     std::vector<Datum> new_values;
     for (int i = 0; i < n_fields; i++) {
@@ -233,6 +236,7 @@ Result<BatchesWithSchema> MutateByKey(BatchesWithSchema& 
batches, std::string fr
       new_values.push_back(value);
     }
     new_batches.batches.emplace_back(new_values, batch.length);
+    new_batches.batches.back().index = batch_index++;
   }
   return new_batches;
 }
@@ -1571,7 +1575,7 @@ void TestSequencing(BatchesMaker maker, int num_batches, 
int batch_size) {
       "asofjoin", {l_src, r_src}, GetRepeatedOptions(2, "time", {"key"}, 
1000)};
 
   QueryOptions query_options;
-  query_options.use_threads = false;
+  query_options.use_threads = true;
   ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema batches,
                        DeclarationToExecBatches(asofjoin, query_options));
 
@@ -1579,7 +1583,7 @@ void TestSequencing(BatchesMaker maker, int num_batches, 
int batch_size) {
 }
 
 TEST(AsofJoinTest, BatchSequencing) {
-  return TestSequencing(MakeIntegerBatches, /*num_batches=*/32, 
/*batch_size=*/1);
+  return TestSequencing(MakeIntegerBatches, /*num_batches=*/1000, 
/*batch_size=*/1);
 }
 
 template <typename BatchesMaker>
diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h
index 4447e9c67a..2beacfe26b 100644
--- a/cpp/src/arrow/acero/options.h
+++ b/cpp/src/arrow/acero/options.h
@@ -93,13 +93,18 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public 
ExecNodeOptions {
  public:
   /// Create an instance from values
   SourceNodeOptions(std::shared_ptr<Schema> output_schema,
-                    std::function<Future<std::optional<ExecBatch>>()> 
generator)
-      : output_schema(std::move(output_schema)), 
generator(std::move(generator)) {}
+                    std::function<Future<std::optional<ExecBatch>>()> 
generator,
+                    Ordering ordering = Ordering::Unordered())
+      : output_schema(std::move(output_schema)),
+        generator(std::move(generator)),
+        ordering(std::move(ordering)) {}
 
   /// \brief the schema for batches that will be generated by this source
   std::shared_ptr<Schema> output_schema;
   /// \brief an asynchronous stream of batches ending with std::nullopt
   std::function<Future<std::optional<ExecBatch>>()> generator;
+
+  Ordering ordering = Ordering::Unordered();
 };
 
 /// \brief a node that generates data from a table already loaded in memory
diff --git a/cpp/src/arrow/acero/source_node.cc 
b/cpp/src/arrow/acero/source_node.cc
index 8060e01f07..ac34e4b6a0 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -106,7 +106,8 @@ struct SourceNode : ExecNode, public TracedNode {
     RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode"));
     const auto& source_options = checked_cast<const 
SourceNodeOptions&>(options);
     return plan->EmplaceNode<SourceNode>(plan, source_options.output_schema,
-                                         source_options.generator);
+                                         source_options.generator,
+                                         source_options.ordering);
   }
 
   const char* kind_name() const override { return "SourceNode"; }
@@ -406,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
 struct RecordBatchReaderSourceNode : public SourceNode {
   RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
                               arrow::AsyncGenerator<std::optional<ExecBatch>> 
generator)
-      : SourceNode(plan, schema, generator) {}
+      : SourceNode(plan, schema, generator, Ordering::Implicit()) {}
 
   static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
                                 const ExecNodeOptions& options) {
diff --git a/cpp/src/arrow/acero/test_util_internal.cc 
b/cpp/src/arrow/acero/test_util_internal.cc
index f50ca92238..107a20354c 100644
--- a/cpp/src/arrow/acero/test_util_internal.cc
+++ b/cpp/src/arrow/acero/test_util_internal.cc
@@ -384,6 +384,7 @@ Result<BatchesWithSchema> MakeIntegerBatches(
   int row = 0;
   for (int i = 0; i < num_batches; i++) {
     ARROW_ASSIGN_OR_RAISE(auto batch, MakeIntegerBatch(gens, schema, row, 
batch_size));
+    batch.index = i;
     out.batches.push_back(std::move(batch));
     row += batch_size;
   }
@@ -410,6 +411,9 @@ BatchesWithSchema MakeBatchesFromString(const 
std::shared_ptr<Schema>& schema,
       out_batches.batches.push_back(out_batches.batches[i]);
     }
   }
+  for (size_t batch_index = 0; batch_index < out_batches.batches.size(); 
++batch_index) {
+    out_batches.batches[batch_index].index = batch_index;
+  }
 
   return out_batches;
 }
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index a856a792a2..0df8fd8026 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -1032,11 +1032,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* 
plan,
   } else {
     batch_gen = std::move(merged_batch_gen);
   }
-
+  int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;
   auto gen = MakeMappedGenerator(
       std::move(batch_gen),
-      [scan_options](const EnumeratedRecordBatch& partial)
-          -> Result<std::optional<compute::ExecBatch>> {
+      [scan_options, index](const EnumeratedRecordBatch& partial) mutable
+      -> Result<std::optional<compute::ExecBatch>> {
         // TODO(ARROW-13263) fragments may be able to attach more guarantees 
to batches
         // than this, for example parquet's row group stats. Failing to do 
this leaves
         // perf on the table because row group stats could be used to skip 
kernel execs in
@@ -1057,9 +1057,12 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* 
plan,
         batch->values.emplace_back(partial.record_batch.index);
         batch->values.emplace_back(partial.record_batch.last);
         batch->values.emplace_back(partial.fragment.value->ToString());
+        if (index != compute::kUnsequencedIndex) batch->index = index++;
         return batch;
       });
 
+  auto ordering = require_sequenced_output ? Ordering::Implicit() : 
Ordering::Unordered();
+
   auto fields = scan_options->dataset_schema->fields();
   if (scan_options->add_augmented_fields) {
     for (const auto& aug_field : kAugmentedFields) {
@@ -1069,7 +1072,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* 
plan,
 
   return acero::MakeExecNode(
       "source", plan, {},
-      acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen)});
+      acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen), 
ordering});
 }
 
 Result<acero::ExecNode*> MakeAugmentedProjectNode(acero::ExecPlan* plan,
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 39e3f4d665..3a4fa1ab61 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -4067,11 +4067,14 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
     def _set_options(self, Dataset dataset, dict scan_options):
         cdef:
             shared_ptr[CScanOptions] c_scan_options
+            bint require_sequenced_output=False
 
         c_scan_options = Scanner._make_scan_options(dataset, scan_options)
 
+        require_sequenced_output=scan_options.get("require_sequenced_output", 
False)
+
         self.wrapped.reset(
-            new CScanNodeOptions(dataset.unwrap(), c_scan_options)
+            new CScanNodeOptions(dataset.unwrap(), c_scan_options, 
require_sequenced_output)
         )
 
 
@@ -4097,7 +4100,9 @@ class ScanNodeOptions(_ScanNodeOptions):
     dataset : pyarrow.dataset.Dataset
         The table which acts as the data source.
     **kwargs : dict, optional
-        Scan options. See `Scanner.from_dataset` for possible arguments.
+        Scan options. See `Scanner.from_dataset` for possible arguments.       
 
+    require_sequenced_output : bool, default False
+        Assert implicit ordering on data.
     """
 
     def __init__(self, Dataset dataset, **kwargs):
diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py
index 77ba3ab1ce..706338bd8c 100644
--- a/python/pyarrow/acero.py
+++ b/python/pyarrow/acero.py
@@ -56,8 +56,10 @@ except ImportError:
     ds = DatasetModuleStub
 
 
-def _dataset_to_decl(dataset, use_threads=True):
-    decl = Declaration("scan", ScanNodeOptions(dataset, 
use_threads=use_threads))
+def _dataset_to_decl(dataset, use_threads=True, 
require_sequenced_output=False):
+    decl = Declaration("scan", ScanNodeOptions(
+        dataset, use_threads=use_threads,
+        require_sequenced_output=require_sequenced_output))
 
     # Get rid of special dataset columns
     # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
@@ -311,13 +313,18 @@ def _perform_join_asof(left_operand, left_on, left_by,
 
     # Add the join node to the execplan
     if isinstance(left_operand, ds.Dataset):
-        left_source = _dataset_to_decl(left_operand, use_threads=use_threads)
+        left_source = _dataset_to_decl(
+            left_operand,
+            use_threads=use_threads,
+            require_sequenced_output=True)
     else:
         left_source = Declaration(
             "table_source", TableSourceNodeOptions(left_operand),
         )
     if isinstance(right_operand, ds.Dataset):
-        right_source = _dataset_to_decl(right_operand, use_threads=use_threads)
+        right_source = _dataset_to_decl(
+            right_operand, use_threads=use_threads,
+            require_sequenced_output=True)
     else:
         right_source = Declaration(
             "table_source", TableSourceNodeOptions(right_operand)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd 
b/python/pyarrow/includes/libarrow_dataset.pxd
index ef1238e415..d2fbcd0ee4 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
         CExpression filter
 
     cdef cppclass CScanNodeOptions 
"arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
-        CScanNodeOptions(shared_ptr[CDataset] dataset, 
shared_ptr[CScanOptions] scan_options)
+        CScanNodeOptions(shared_ptr[CDataset] dataset, 
shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)
 
         shared_ptr[CScanOptions] scan_options
 

Reply via email to