This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 94103279db GH-44629: [C++][Acero] Use `implicit_ordering` for
`asof_join` rather than `require_sequenced_output` (#44616)
94103279db is described below
commit 94103279db56459338efd84a0450d34134fcf865
Author: Enrico Minack <[email protected]>
AuthorDate: Tue Feb 11 03:17:39 2025 +0100
GH-44629: [C++][Acero] Use `implicit_ordering` for `asof_join` rather than
`require_sequenced_output` (#44616)
### Rationale for this change
Changes in #44083 (GH-41706) unnecessarily sequences batches retrieved from
scanner where it only requires the batches to provide index according to
implicit input order.
### What changes are included in this PR?
Setting `implicit_ordering` causes existing code to set batch index, which
is then available to the `asof_join` node to sequence the batches int input
order. This replaces some of #44083 changes.
Some code introduced by #44083 turns out to not be required and has
therefore been reverted.
### Are these changes tested?
Existing unit tests still pass.
### Are there any user-facing changes?
Reverts some breaking user-facing changes done by #44083.
* GitHub Issue: #41706
* GitHub Issue: #44629
Authored-by: Enrico Minack <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
cpp/src/arrow/acero/options.h | 4 ++--
cpp/src/arrow/acero/source_node.cc | 2 +-
cpp/src/arrow/dataset/scanner.cc | 10 +++++-----
cpp/src/arrow/dataset/scanner.h | 15 +++++++++++----
python/pyarrow/_dataset.pyx | 12 ++++++++----
python/pyarrow/acero.py | 8 ++++----
python/pyarrow/includes/libarrow_dataset.pxd | 2 +-
7 files changed, 32 insertions(+), 21 deletions(-)
diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h
index 2beacfe26b..2eb7df0085 100644
--- a/cpp/src/arrow/acero/options.h
+++ b/cpp/src/arrow/acero/options.h
@@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public
ExecNodeOptions {
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;
-
- Ordering ordering = Ordering::Unordered();
+ /// \brief the order of the data, defaults to Ordering::Unordered
+ Ordering ordering;
};
/// \brief a node that generates data from a table already loaded in memory
diff --git a/cpp/src/arrow/acero/source_node.cc
b/cpp/src/arrow/acero/source_node.cc
index ac34e4b6a0..2d3e2a1da1 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -407,7 +407,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>>
generator)
- : SourceNode(plan, schema, generator, Ordering::Implicit()) {}
+ : SourceNode(plan, schema, generator) {}
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 0df8fd8026..e66fed3f06 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -1000,6 +1000,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan*
plan,
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
bool require_sequenced_output = scan_node_options.require_sequenced_output;
+ bool implicit_ordering = scan_node_options.implicit_ordering;
RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));
@@ -1032,11 +1033,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan*
plan,
} else {
batch_gen = std::move(merged_batch_gen);
}
- int64_t index = require_sequenced_output ? 0 : compute::kUnsequencedIndex;
+
auto gen = MakeMappedGenerator(
std::move(batch_gen),
- [scan_options, index](const EnumeratedRecordBatch& partial) mutable
- -> Result<std::optional<compute::ExecBatch>> {
+ [scan_options](const EnumeratedRecordBatch& partial)
+ -> Result<std::optional<compute::ExecBatch>> {
// TODO(ARROW-13263) fragments may be able to attach more guarantees
to batches
// than this, for example parquet's row group stats. Failing to do
this leaves
// perf on the table because row group stats could be used to skip
kernel execs in
@@ -1057,11 +1058,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan*
plan,
batch->values.emplace_back(partial.record_batch.index);
batch->values.emplace_back(partial.record_batch.last);
batch->values.emplace_back(partial.fragment.value->ToString());
- if (index != compute::kUnsequencedIndex) batch->index = index++;
return batch;
});
- auto ordering = require_sequenced_output ? Ordering::Implicit() :
Ordering::Unordered();
+ auto ordering = implicit_ordering ? Ordering::Implicit() :
Ordering::Unordered();
auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index d2de267897..af8888ad87 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -557,20 +557,27 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// \brief Construct a source ExecNode which yields batches from a dataset
scan.
///
/// Does not construct associated filter or project nodes.
-/// Yielded batches will be augmented with fragment/batch indices to enable
stable
-/// ordering for simple ExecPlans.
+///
+/// Batches are yielded sequentially, like single-threaded,
+/// when require_sequenced_output=true.
+///
+/// Yielded batches will be augmented with fragment/batch indices when
+/// implicit_ordering=true to enable stable ordering for simple ExecPlans.
class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
- bool require_sequenced_output = false)
+ bool require_sequenced_output = false,
+ bool implicit_ordering = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
- require_sequenced_output(require_sequenced_output) {}
+ require_sequenced_output(require_sequenced_output),
+ implicit_ordering(implicit_ordering) {}
std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
bool require_sequenced_output;
+ bool implicit_ordering;
};
/// @}
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index fd50215cee..f8c8b9bc11 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -4077,13 +4077,15 @@ cdef class _ScanNodeOptions(ExecNodeOptions):
cdef:
shared_ptr[CScanOptions] c_scan_options
bint require_sequenced_output=False
+ bint implicit_ordering=False
c_scan_options = Scanner._make_scan_options(dataset, scan_options)
require_sequenced_output=scan_options.get("require_sequenced_output",
False)
+ implicit_ordering=scan_options.get("implicit_ordering", False)
self.wrapped.reset(
- new CScanNodeOptions(dataset.unwrap(), c_scan_options,
require_sequenced_output)
+ new CScanNodeOptions(dataset.unwrap(), c_scan_options,
require_sequenced_output, implicit_ordering)
)
@@ -4101,8 +4103,8 @@ class ScanNodeOptions(_ScanNodeOptions):
expression or projection to the scan node that you also supply
to the filter or project node.
- Yielded batches will be augmented with fragment/batch indices to
- enable stable ordering for simple ExecPlans.
+ Yielded batches will be augmented with fragment/batch indices when
+ implicit_ordering=True to enable stable ordering for simple ExecPlans.
Parameters
----------
@@ -4111,7 +4113,9 @@ class ScanNodeOptions(_ScanNodeOptions):
**kwargs : dict, optional
Scan options. See `Scanner.from_dataset` for possible arguments.
require_sequenced_output : bool, default False
- Assert implicit ordering on data.
+ Batches are yielded sequentially, like single-threaded
+ implicit_ordering : bool, default False
+ Preserve implicit ordering of data.
"""
def __init__(self, Dataset dataset, **kwargs):
diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py
index 706338bd8c..86bf7cbf4d 100644
--- a/python/pyarrow/acero.py
+++ b/python/pyarrow/acero.py
@@ -56,10 +56,10 @@ except ImportError:
ds = DatasetModuleStub
-def _dataset_to_decl(dataset, use_threads=True,
require_sequenced_output=False):
+def _dataset_to_decl(dataset, use_threads=True, implicit_ordering=False):
decl = Declaration("scan", ScanNodeOptions(
dataset, use_threads=use_threads,
- require_sequenced_output=require_sequenced_output))
+ implicit_ordering=implicit_ordering))
# Get rid of special dataset columns
# "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
@@ -316,7 +316,7 @@ def _perform_join_asof(left_operand, left_on, left_by,
left_source = _dataset_to_decl(
left_operand,
use_threads=use_threads,
- require_sequenced_output=True)
+ implicit_ordering=True)
else:
left_source = Declaration(
"table_source", TableSourceNodeOptions(left_operand),
@@ -324,7 +324,7 @@ def _perform_join_asof(left_operand, left_on, left_by,
if isinstance(right_operand, ds.Dataset):
right_source = _dataset_to_decl(
right_operand, use_threads=use_threads,
- require_sequenced_output=True)
+ implicit_ordering=True)
else:
right_source = Declaration(
"table_source", TableSourceNodeOptions(right_operand)
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd
b/python/pyarrow/includes/libarrow_dataset.pxd
index d2fbcd0ee4..403e306e13 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -51,7 +51,7 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
CExpression filter
cdef cppclass CScanNodeOptions
"arrow::dataset::ScanNodeOptions"(CExecNodeOptions):
- CScanNodeOptions(shared_ptr[CDataset] dataset,
shared_ptr[CScanOptions] scan_options, bint require_sequenced_output)
+ CScanNodeOptions(shared_ptr[CDataset] dataset,
shared_ptr[CScanOptions] scan_options, bint require_sequenced_output, bint
implicit_ordering)
shared_ptr[CScanOptions] scan_options