This is an automated email from the ASF dual-hosted git repository.

zanmato1984 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c9d88028ae GH-47481: [C++][Acero] record_batch_reader_source specify 
Ordering::Implicit to support `select * limit k` (#47482)
7c9d88028ae is described below

commit 7c9d88028aee7d040185fac8c4b71b28029366be
Author: egolearner <[email protected]>
AuthorDate: Sun May 24 12:48:40 2026 +0800

    GH-47481: [C++][Acero] record_batch_reader_source specify 
Ordering::Implicit to support `select * limit k` (#47482)
    
    
    ### Rationale for this change
    record_batch_reader_source does not support `select * limit k` as described 
in #47481
    
    ### What changes are included in this PR?
    record_batch_reader_source specify Ordering::Implicit
    
    ### Are these changes tested?
    yes
    
    ### Are there any user-facing changes?
    no
    
    * GitHub Issue: #47481
    
    Lead-authored-by: egolearner <[email protected]>
    Co-authored-by: egolearner <[email protected]>
    Co-authored-by: Rossi Sun <[email protected]>
    Signed-off-by: Rossi Sun <[email protected]>
---
 cpp/src/arrow/acero/fetch_node_test.cc | 22 ++++++++++++++++++++++
 cpp/src/arrow/acero/source_node.cc     |  2 +-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/cpp/src/arrow/acero/fetch_node_test.cc 
b/cpp/src/arrow/acero/fetch_node_test.cc
index 3ffd48038d2..e1e49354fd8 100644
--- a/cpp/src/arrow/acero/fetch_node_test.cc
+++ b/cpp/src/arrow/acero/fetch_node_test.cc
@@ -80,6 +80,28 @@ TEST(FetchNode, Basic) {
   CheckFetch({0, 0});
 }
 
+TEST(FetchNode, RecordBatchReaderSource) {
+  constexpr random::SeedType kSeed = 42;
+  constexpr int kJitterMod = 4;
+  FetchNodeOptions options{20, 50};
+  RegisterTestNodes();
+  std::shared_ptr<Table> input = TestTable();
+  for (bool use_threads : {false, true}) {
+    SCOPED_TRACE(use_threads ? "threaded" : "serial");
+    auto reader = std::make_shared<TableBatchReader>(input);
+    Declaration plan = Declaration::Sequence(
+        {{"record_batch_reader_source", 
RecordBatchReaderSourceNodeOptions{reader}},
+         {"jitter", JitterNodeOptions(kSeed, kJitterMod)},
+         {"fetch", options}});
+    QueryOptions query_options;
+    query_options.use_threads = use_threads;
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> actual,
+                         DeclarationToTable(plan, query_options));
+    std::shared_ptr<Table> expected = input->Slice(options.offset, 
options.count);
+    AssertTablesEqual(*expected, *actual);
+  }
+}
+
 TEST(FetchNode, Invalid) {
   CheckFetchInvalid({-1, 10}, "`offset` must be non-negative");
   CheckFetchInvalid({10, -1}, "`count` must be non-negative");
diff --git a/cpp/src/arrow/acero/source_node.cc 
b/cpp/src/arrow/acero/source_node.cc
index 888f6e23c13..ed3723d249d 100644
--- a/cpp/src/arrow/acero/source_node.cc
+++ b/cpp/src/arrow/acero/source_node.cc
@@ -408,7 +408,7 @@ struct SchemaSourceNode : public SourceNode {
 struct RecordBatchReaderSourceNode : public SourceNode {
   RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
                               arrow::AsyncGenerator<std::optional<ExecBatch>> 
generator)
-      : SourceNode(plan, schema, generator) {}
+      : SourceNode(plan, schema, generator, Ordering::Implicit()) {}
 
   static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
                                 const ExecNodeOptions& options) {

Reply via email to