This is an automated email from the ASF dual-hosted git repository.
zanmato1984 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7c9d88028ae GH-47481: [C++][Acero] record_batch_reader_source specify
Ordering::Implicit to support `select * limit k` (#47482)
7c9d88028ae is described below
commit 7c9d88028aee7d040185fac8c4b71b28029366be
Author: egolearner <[email protected]>
AuthorDate: Sun May 24 12:48:40 2026 +0800
GH-47481: [C++][Acero] record_batch_reader_source specify
Ordering::Implicit to support `select * limit k` (#47482)
### Rationale for this change
record_batch_reader_source does not support `select * limit k` as described
in #47481
### What changes are included in this PR?
record_batch_reader_source specify Ordering::Implicit
### Are these changes tested?
yes
### Are there any user-facing changes?
no
* GitHub Issue: #47481
Lead-authored-by: egolearner <[email protected]>
Co-authored-by: egolearner <[email protected]>
Co-authored-by: Rossi Sun <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
cpp/src/arrow/acero/fetch_node_test.cc | 22 ++++++++++++++++++++++
cpp/src/arrow/acero/source_node.cc | 2 +-
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/acero/fetch_node_test.cc
b/cpp/src/arrow/acero/fetch_node_test.cc
index 3ffd48038d2..e1e49354fd8 100644
--- a/cpp/src/arrow/acero/fetch_node_test.cc
+++ b/cpp/src/arrow/acero/fetch_node_test.cc
@@ -80,6 +80,28 @@ TEST(FetchNode, Basic) {
CheckFetch({0, 0});
}
+TEST(FetchNode, RecordBatchReaderSource) {
+ constexpr random::SeedType kSeed = 42;
+ constexpr int kJitterMod = 4;
+ FetchNodeOptions options{20, 50};
+ RegisterTestNodes();
+ std::shared_ptr<Table> input = TestTable();
+ for (bool use_threads : {false, true}) {
+ SCOPED_TRACE(use_threads ? "threaded" : "serial");
+ auto reader = std::make_shared<TableBatchReader>(input);
+ Declaration plan = Declaration::Sequence(
+ {{"record_batch_reader_source",
RecordBatchReaderSourceNodeOptions{reader}},
+ {"jitter", JitterNodeOptions(kSeed, kJitterMod)},
+ {"fetch", options}});
+ QueryOptions query_options;
+ query_options.use_threads = use_threads;
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> actual,
+ DeclarationToTable(plan, query_options));
+ std::shared_ptr<Table> expected = input->Slice(options.offset,
options.count);
+ AssertTablesEqual(*expected, *actual);
+ }
+}
+
TEST(FetchNode, Invalid) {
CheckFetchInvalid({-1, 10}, "`offset` must be non-negative");
CheckFetchInvalid({10, -1}, "`count` must be non-negative");
diff --git a/cpp/src/arrow/acero/source_node.cc
b/cpp/src/arrow/acero/source_node.cc
index 888f6e23c13..ed3723d249d 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -408,7 +408,7 @@ struct SchemaSourceNode : public SourceNode {
struct RecordBatchReaderSourceNode : public SourceNode {
RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>>
generator)
- : SourceNode(plan, schema, generator) {}
+ : SourceNode(plan, schema, generator, Ordering::Implicit()) {}
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {