This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 2a0f06c4a4 GH-41706: [C++][Acero] Enhance asof_join to work in
multi-threaded execution by sequencing input (#44083)
2a0f06c4a4 is described below
commit 2a0f06c4a40ec64e1f4f046a7a8aa165329c66d6
Author: mroz45 <[email protected]>
AuthorDate: Tue Oct 29 22:00:56 2024 +0100
GH-41706: [C++][Acero] Enhance asof_join to work in multi-threaded
execution by sequencing input (#44083)
### Rationale for this change
This is initial PR. I found that with specyfics parameters test fails.
### What changes are included in this PR?
In this PR I provoke fail of asof_join_node_test.
* GitHub Issue: #41706
Authored-by: kamilt <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/acero/accumulation_queue.h | 1 +
cpp/src/arrow/acero/asof_join_node.cc | 27 ++++++++++++++++++++-------
cpp/src/arrow/acero/asof_join_node_test.cc | 8 ++++++--
cpp/src/arrow/acero/options.h | 9 +++++++--
cpp/src/arrow/acero/source_node.cc | 5 +++--
cpp/src/arrow/acero/test_util_internal.cc | 4 ++++
cpp/src/arrow/dataset/scanner.cc | 11 +++++++----
python/pyarrow/_dataset.pyx | 9 +++++++--
python/pyarrow/acero.py | 15 +++++++++++----
python/pyarrow/includes/libarrow_dataset.pxd | 2 +-
10 files changed, 67 insertions(+), 24 deletions(-)
diff --git a/cpp/src/arrow/acero/accumulation_queue.h
b/cpp/src/arrow/acero/accumulation_queue.h
index a27b8b399c..a173f98403 100644
--- a/cpp/src/arrow/acero/accumulation_queue.h
+++ b/cpp/src/arrow/acero/accumulation_queue.h
@@ -128,6 +128,7 @@ class SerialSequencingQueue {
/// Strategy that describes how to handle items
class Processor {
public:
+ virtual ~Processor() = default;
/// Process the batch
///
/// This method will be called on each batch in order. Calls to this
method
diff --git a/cpp/src/arrow/acero/asof_join_node.cc
b/cpp/src/arrow/acero/asof_join_node.cc
index c4f11d01f3..a5a80c8805 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -16,6 +16,7 @@
// under the License.
#include "arrow/acero/asof_join_node.h"
+#include "arrow/acero/accumulation_queue.h"
#include "arrow/acero/backpressure_handler.h"
#include "arrow/acero/concurrent_queue_internal.h"
@@ -471,7 +472,7 @@ class BackpressureController : public BackpressureControl {
std::atomic<int32_t>& backpressure_counter_;
};
-class InputState {
+class InputState : public util::SerialSequencingQueue::Processor {
// InputState corresponds to an input
// Input record batches are queued up in InputState until processed and
// turned into output record batches.
@@ -482,7 +483,8 @@ class InputState {
const std::shared_ptr<arrow::Schema>& schema,
const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index)
- : queue_(std::move(handler)),
+ : sequencer_(util::SerialSequencingQueue::Make(this)),
+ queue_(std::move(handler)),
schema_(schema),
time_col_index_(time_col_index),
key_col_index_(key_col_index),
@@ -699,7 +701,16 @@ class InputState {
DEBUG_MANIP(std::endl));
return updated;
}
+ Status InsertBatch(ExecBatch batch) {
+ return sequencer_->InsertBatch(std::move(batch));
+ }
+ Status Process(ExecBatch batch) override {
+ auto rb = *batch.ToRecordBatch(schema_);
+ DEBUG_SYNC(node_, "received batch from input ", index_, ":",
DEBUG_MANIP(std::endl),
+ rb->ToString(), DEBUG_MANIP(std::endl));
+ return Push(rb);
+ }
void Rehash() {
DEBUG_SYNC(node_, "rehashing for input ", index_, ":",
DEBUG_MANIP(std::endl));
MemoStore new_memo(DEBUG_ADD(memo_.no_future_, node_, index_));
@@ -760,6 +771,8 @@ class InputState {
}
private:
+ // ExecBatch Sequencer
+ std::unique_ptr<util::SerialSequencingQueue> sequencer_;
// Pending record batches. The latest is the front. Batches cannot be empty.
BackpressureConcurrentQueue<std::shared_ptr<RecordBatch>> queue_;
// Schema associated with the input
@@ -1399,6 +1412,9 @@ class AsofJoinNode : public ExecNode {
// InputReceived may be called after execution was finished. Pushing it to
the
// InputState is unnecessary since we're done (and anyway may cause the
// BackPressureController to pause the input, causing a deadlock), so drop
it.
+ if (::arrow::compute::kUnsequencedIndex == batch.index)
+ return Status::Invalid("AsofJoin requires sequenced input");
+
if (process_task_.is_finished()) {
DEBUG_SYNC(this, "Input received while done. Short circuiting.",
DEBUG_MANIP(std::endl));
@@ -1409,12 +1425,9 @@ class AsofJoinNode : public ExecNode {
ARROW_DCHECK(std_has(inputs_, input));
size_t k = std_find(inputs_, input) - inputs_.begin();
- // Put into the queue
- auto rb = *batch.ToRecordBatch(input->output_schema());
- DEBUG_SYNC(this, "received batch from input ", k, ":",
DEBUG_MANIP(std::endl),
- rb->ToString(), DEBUG_MANIP(std::endl));
+ // Put into the sequencing queue
+ ARROW_RETURN_NOT_OK(state_.at(k)->InsertBatch(std::move(batch)));
- ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
PushProcess(true);
return Status::OK();
diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc
b/cpp/src/arrow/acero/asof_join_node_test.cc
index 5d3e9fba08..2983888183 100644
--- a/cpp/src/arrow/acero/asof_join_node_test.cc
+++ b/cpp/src/arrow/acero/asof_join_node_test.cc
@@ -101,6 +101,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
BatchesWithSchema batches;
batches.schema = schema;
int n_fields = schema->num_fields();
+ size_t batch_index = 0;
for (auto num_batch : num_batches.batches) {
Datum two(Int32Scalar(2));
std::vector<Datum> values;
@@ -128,6 +129,7 @@ Result<BatchesWithSchema> MakeBatchesFromNumString(
}
}
ExecBatch batch(values, num_batch.length);
+ batch.index = batch_index++;
batches.batches.push_back(batch);
}
return batches;
@@ -185,6 +187,7 @@ Result<BatchesWithSchema> MutateByKey(BatchesWithSchema&
batches, std::string fr
replace_key ? batches.schema->SetField(from_index,
new_field)
: batches.schema->AddField(from_index,
new_field));
}
+ size_t batch_index = 0;
for (const ExecBatch& batch : batches.batches) {
std::vector<Datum> new_values;
for (int i = 0; i < n_fields; i++) {
@@ -233,6 +236,7 @@ Result<BatchesWithSchema> MutateByKey(BatchesWithSchema&
batches, std::string fr
new_values.push_back(value);
}
new_batches.batches.emplace_back(new_values, batch.length);
+ new_batches.batches.back().index = batch_index++;
}
return new_batches;
}
@@ -1571,7 +1575,7 @@ void TestSequencing(BatchesMaker maker, int num_batches,
int batch_size) {
"asofjoin", {l_src, r_src}, GetRepeatedOptions(2, "time", {"key"},
1000)};
QueryOptions query_options;
- query_options.use_threads = false;
+ query_options.use_threads = true;
ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema batches,
DeclarationToExecBatches(asofjoin, query_options));
@@ -1579,7 +1583,7 @@ void TestSequencing(BatchesMaker maker, int num_batches,
int batch_size) {
}
TEST(AsofJoinTest, BatchSequencing) {
- return TestSequencing(MakeIntegerBatches, /*num_batches=*/32,
/*batch_size=*/1);
+ return TestSequencing(MakeIntegerBatches, /*num_batches=*/1000,
/*batch_size=*/1);
}
template <typename BatchesMaker>
diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h
index 4447e9c67a..2beacfe26b 100644
--- a/cpp/src/arrow/acero/options.h
+++ b/cpp/src/arrow/acero/options.h
@@ -93,13 +93,18 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public
ExecNodeOptions {
public:
/// Create an instance from values
SourceNodeOptions(std::shared_ptr<Schema> output_schema,
- std::function<Future<std::optional<ExecBatch>>()>
generator)
- : output_schema(std::move(output_schema)),
generator(std::move(generator)) {}
+ std::function<Future<std::optional<ExecBatch>>()>
generator,
+ Ordering ordering = Ordering::Unordered())
+ : output_schema(std::move(output_schema)),
+ generator(std::move(generator)),
+ ordering(std::move(ordering)) {}
/// \brief the schema for batches that will be generated by this source
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;
+
+ Ordering ordering = Ordering::Unordered();
};
/// \brief a node that generates data from a table already loaded in memory
diff --git a/cpp/src/arrow/acero/source_node.cc
b/cpp/src/arrow/acero/source_node.cc
index 8060e01f07..ac34e4b6a0 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -106,7 +106,8 @@ struct SourceNode : ExecNode, public TracedNode {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode"));
const auto& source_options = checked_cast<const
SourceNodeOptions&>(options);
return plan->EmplaceNode<SourceNode>(plan, source_options.output_schema,
- source_options.generator);
+ source_options.generator,
+ source_options.ordering);
}
const char* kind_name() const override { return "SourceNode"; }
@@ -406,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>>
generator)
- : SourceNode(plan, schema, generator) {}
+ : SourceNode(plan, schema, generator, Ordering::Implicit()) {}
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
diff --git a/cpp/src/arrow/acero/test_util_internal.cc
b/cpp/src/arrow/acero/test_util_internal.cc
index f50ca92238..107a20354c 100644
--- a/cpp/src/arrow/acero/test_util_internal.cc
+++ b/cpp/src/arrow/acero/test_util_internal.cc
@@ -384,6 +384,7 @@ Result<BatchesWithSchema> MakeIntegerBatches(
int row = 0;
for (int i = 0; i < num_batches; i++) {
ARROW_ASSIGN_OR_RAISE(auto batch, MakeIntegerBatch(gens, schema, row,
batch_size));
+ batch.index = i;
out.batches.push_back(std::move(batch));
row += batch_size;
}
@@ -410,6 +411,9 @@ BatchesWithSchema MakeBatchesFromString(const
std::shared_ptr<Schema>& schema,
out_batches.batches.push_back(out_batches.batches[i]);
}
}
+ for (size_t batch_index = 0; batch_index < out_batches.batches.size();
++batch_index) {
+ out_batches.batches[batch_index].index = batch_index;
+ }
return out_batches;
}
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index a856a792a2..0df8fd8026 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -1032,11 +1032,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan*
plan,
} else {
batch_gen = std::move(merged_batch_gen);
}
-
+ int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;
auto gen = MakeMappedGenerator(
std::move(batch_gen),
- [scan_options](const EnumeratedRecordBatch& partial)
- -> Result<std::optional<compute::ExecBatch>> {
+ [scan_options, index](const EnumeratedRecordBatch& partial) mutable
+ -> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees
to batches
// than this, for example parquet's row group stats. Failing to do
this leaves
// perf on the table because row group stats could be used to skip
kernel execs in
@@ -1057,9 +1057,12 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan*
plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
+ if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});
+ auto ordering = require_sequenced_output ? Ordering::Implicit() :
Ordering::Unordered();
+
auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
@@ -1069,7 +1072,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan*
plan,
return acero::MakeExecNode(
"source", plan, {},
- acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen)});
+ acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen),
ordering});
}
Result<acero::ExecNode*> MakeAugmentedProjectNode(acero::ExecPlan* plan,
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 39e3f4d665..3a4fa1ab61 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -4067,11 +4067,14 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
def _set_options(self, Dataset dataset, dict scan_options):
cdef:
shared_ptr[CScanOptions] c_scan_options
+ bint require_sequenced_output=False
c_scan_options = Scanner._make_scan_options(dataset, scan_options)
+ require_sequenced_output=scan_options.get("require_sequenced_output",
False)
+
self.wrapped.reset(
- new CScanNodeOptions(dataset.unwrap(), c_scan_options)
+ new CScanNodeOptions(dataset.unwrap(), c_scan_options,
require_sequenced_output)
)
@@ -4097,7 +4100,9 @@ class ScanNodeOptions(_ScanNodeOptions):
dataset : pyarrow.dataset.Dataset
The table which acts as the data source.
**kwargs : dict, optional
- Scan options. See `Scanner.from_dataset` for possible arguments.
+ Scan options. See `Scanner.from_dataset` for possible arguments.
+ require_sequenced_output : bool, default False
+ Assert implicit ordering on data.
"""
def __init__(self, Dataset dataset, **kwargs):
diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py
index 77ba3ab1ce..706338bd8c 100644
--- a/python/pyarrow/acero.py
+++ b/python/pyarrow/acero.py
@@ -56,8 +56,10 @@ except ImportError:
ds = DatasetModuleStub
-def _dataset_to_decl(dataset, use_threads=True):
- decl = Declaration("scan", ScanNodeOptions(dataset,
use_threads=use_threads))
+def _dataset_to_decl(dataset, use_threads=True,
require_sequenced_output=False):
+ decl = Declaration("scan", ScanNodeOptions(
+ dataset, use_threads=use_threads,
+ require_sequenced_output=require_sequenced_output))
# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
@@ -311,13 +313,18 @@ def _perform_join_asof(left_operand, left_on, left_by,
# Add the join node to the execplan
if isinstance(left_operand, ds.Dataset):
- left_source = _dataset_to_decl(left_operand, use_threads=use_threads)
+ left_source = _dataset_to_decl(
+ left_operand,
+ use_threads=use_threads,
+ require_sequenced_output=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
)
if isinstance(right_operand, ds.Dataset):
- right_source = _dataset_to_decl(right_operand, use_threads=use_threads)
+ right_source = _dataset_to_decl(
+ right_operand, use_threads=use_threads,
+ require_sequenced_output=True)
else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd
b/python/pyarrow/includes/libarrow_dataset.pxd
index ef1238e415..d2fbcd0ee4 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
CExpression filter
cdef cppclass CScanNodeOptions
"arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
- CScanNodeOptions(shared_ptr[CDataset] dataset,
shared_ptr[CScanOptions] scan_options)
+ CScanNodeOptions(shared_ptr[CDataset] dataset,
shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)
shared_ptr[CScanOptions] scan_options