westonpace commented on code in PR #15183:
URL: https://github.com/apache/arrow/pull/15183#discussion_r1061651944


##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -344,6 +344,47 @@ void TestSourceSink(
               
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
 }
 
+void TestRecordBatchReaderSourceSink(
+    std::function<Result<std::shared_ptr<RecordBatchReader>>(const 
BatchesWithSchema&)>
+        to_reader) {
+  ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1));
+  ExecContext exec_context(default_memory_pool(), executor.get());
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context));
+  AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader, 
to_reader(exp_batches));
+
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"record_batch_reader_source",
+                     RecordBatchReaderSourceNodeOptions{exp_batches.schema, 
reader}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+}

Review Comment:
   You are specifying an executor but you aren't verifying that it is being 
used.  Also, there is no test where we do not specify an executor.
   
   Testing that the executor gets used would be a little tricky (maybe someday 
we can add a `TotalTasksRun` method to the executor).  You would probably need 
to wrap the batch generator with something that verifies it is on the I/O 
executor (e.g. using OwnsThisThread or something similar).
   
   I am probably being a bit harsh here as I suspect we test some of the other 
sources in a similar fashion.  If we're only going to have one simple test I 
would prefer a test using the default instead of one specifying a custom 
executor as that is the more common case.



##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -327,6 +327,55 @@ struct SchemaSourceNode : public SourceNode {
   }
 };
 
+struct RecordBatchReaderSourceNode : public SourceNode {
+  RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+                              arrow::AsyncGenerator<std::optional<ExecBatch>> 
generator)
+      : SourceNode(plan, schema, generator) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, kKindName));
+    const auto& cast_options =
+        checked_cast<const RecordBatchReaderSourceNodeOptions&>(options);
+    auto& reader = cast_options.reader;
+    auto& schema = cast_options.schema;
+    auto io_executor = cast_options.io_executor;
+
+    if (io_executor == NULLPTR) {
+      io_executor = plan->query_context()->exec_context()->executor();
+    }
+

Review Comment:
   ```suggestion
   ```
   This will initialize the io_executor to the CPU executor, which we do not 
want.  You already have an `if (io_executor == NULLPTR)` block below



##########
cpp/src/arrow/compute/exec/test_util.cc:
##########
@@ -290,6 +290,18 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> 
ToRecordBatches(
   return record_batches;
 }
 
+Result<std::shared_ptr<RecordBatchReader>> ToRecordBatcheReader(

Review Comment:
   ```suggestion
   Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader(
   ```



##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -344,6 +344,47 @@ void TestSourceSink(
               
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
 }
 
+void TestRecordBatchReaderSourceSink(
+    std::function<Result<std::shared_ptr<RecordBatchReader>>(const 
BatchesWithSchema&)>
+        to_reader) {
+  ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1));
+  ExecContext exec_context(default_memory_pool(), executor.get());
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context));
+  AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader, 
to_reader(exp_batches));
+
+  ASSERT_OK(Declaration::Sequence(
+                {
+                    {"record_batch_reader_source",
+                     RecordBatchReaderSourceNodeOptions{exp_batches.schema, 
reader}},
+                    {"sink", SinkNodeOptions{&sink_gen}},
+                })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));

Review Comment:
   Let's use `DeclarationToExecBatches` for new tests (feel free to fix 
existing tests too).



##########
cpp/src/arrow/compute/exec/test_util.h:
##########
@@ -127,6 +127,10 @@ ARROW_TESTING_EXPORT
 Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
     const BatchesWithSchema& batches);
 
+ARROW_TESTING_EXPORT
+Result<std::shared_ptr<RecordBatchReader>> ToRecordBatcheReader(
+    const BatchesWithSchema& batches_with_schema);
+

Review Comment:
   ```suggestion
   ARROW_TESTING_EXPORT
   Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader(
       const BatchesWithSchema& batches_with_schema);
   
   ```



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -126,6 +126,25 @@ class ARROW_EXPORT SchemaSourceNodeOptions : public 
ExecNodeOptions {
   arrow::internal::Executor* io_executor;
 };
 
+class ARROW_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions 
{
+ public:
+  RecordBatchReaderSourceNodeOptions(std::shared_ptr<Schema> schema,
+                                     std::shared_ptr<RecordBatchReader> reader,
+                                     arrow::internal::Executor* io_executor = 
NULLPTR)
+      : schema(schema), reader(std::move(reader)), io_executor(io_executor) {}
+
+  /// \brief The schema of the record batches from the iterator
+  std::shared_ptr<Schema> schema;

Review Comment:
   `RecordBatchReader` has a `schema()` method. We should use that instead of 
requiring it to be specified separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to