westonpace commented on code in PR #15183:
URL: https://github.com/apache/arrow/pull/15183#discussion_r1061651944
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -344,6 +344,47 @@ void TestSourceSink(
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
}
+void TestRecordBatchReaderSourceSink(
+ std::function<Result<std::shared_ptr<RecordBatchReader>>(const
BatchesWithSchema&)>
+ to_reader) {
+ ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1));
+ ExecContext exec_context(default_memory_pool(), executor.get());
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context));
+ AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+ auto exp_batches = MakeBasicBatches();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader,
to_reader(exp_batches));
+
+ ASSERT_OK(Declaration::Sequence(
+ {
+ {"record_batch_reader_source",
+ RecordBatchReaderSourceNodeOptions{exp_batches.schema,
reader}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+}
Review Comment:
You are specifying an executor but you aren't verifying that it is being
used. Also, there is no test where we do not specify an executor.
Testing that the executor gets used would be a little tricky (maybe someday
we can add a `TotalTasksRun` method to the executor). You would probably need
to wrap the batch generator with something that verifies it is on the I/O
executor (e.g. using OwnsThisThread or something similar).
I am probably being a bit harsh here as I suspect we test some of the other
sources in a similar fashion. If we're only going to have one simple test I
would prefer a test using the default instead of one specifying a custom
executor as that is the more common case.
##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -327,6 +327,55 @@ struct SchemaSourceNode : public SourceNode {
}
};
+struct RecordBatchReaderSourceNode : public SourceNode {
+ RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+ arrow::AsyncGenerator<std::optional<ExecBatch>>
generator)
+ : SourceNode(plan, schema, generator) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, kKindName));
+ const auto& cast_options =
+ checked_cast<const RecordBatchReaderSourceNodeOptions&>(options);
+ auto& reader = cast_options.reader;
+ auto& schema = cast_options.schema;
+ auto io_executor = cast_options.io_executor;
+
+ if (io_executor == NULLPTR) {
+ io_executor = plan->query_context()->exec_context()->executor();
+ }
+
Review Comment:
```suggestion
```
This will initialize the io_executor to the CPU executor, which we do not
want. You already have an `if (io_executor == NULLPTR)` block below
##########
cpp/src/arrow/compute/exec/test_util.cc:
##########
@@ -290,6 +290,18 @@ Result<std::vector<std::shared_ptr<RecordBatch>>>
ToRecordBatches(
return record_batches;
}
+Result<std::shared_ptr<RecordBatchReader>> ToRecordBatcheReader(
Review Comment:
```suggestion
Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader(
```
##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -344,6 +344,47 @@ void TestSourceSink(
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
}
+void TestRecordBatchReaderSourceSink(
+ std::function<Result<std::shared_ptr<RecordBatchReader>>(const
BatchesWithSchema&)>
+ to_reader) {
+ ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1));
+ ExecContext exec_context(default_memory_pool(), executor.get());
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context));
+ AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+ auto exp_batches = MakeBasicBatches();
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader,
to_reader(exp_batches));
+
+ ASSERT_OK(Declaration::Sequence(
+ {
+ {"record_batch_reader_source",
+ RecordBatchReaderSourceNodeOptions{exp_batches.schema,
reader}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
Review Comment:
Let's use `DeclarationToExecBatches` for new tests (feel free to fix
existing tests too).
##########
cpp/src/arrow/compute/exec/test_util.h:
##########
@@ -127,6 +127,10 @@ ARROW_TESTING_EXPORT
Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const BatchesWithSchema& batches);
+ARROW_TESTING_EXPORT
+Result<std::shared_ptr<RecordBatchReader>> ToRecordBatcheReader(
+ const BatchesWithSchema& batches_with_schema);
+
Review Comment:
```suggestion
ARROW_TESTING_EXPORT
Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader(
const BatchesWithSchema& batches_with_schema);
```
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -126,6 +126,25 @@ class ARROW_EXPORT SchemaSourceNodeOptions : public
ExecNodeOptions {
arrow::internal::Executor* io_executor;
};
+class ARROW_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions
{
+ public:
+ RecordBatchReaderSourceNodeOptions(std::shared_ptr<Schema> schema,
+ std::shared_ptr<RecordBatchReader> reader,
+ arrow::internal::Executor* io_executor =
NULLPTR)
+ : schema(schema), reader(std::move(reader)), io_executor(io_executor) {}
+
+ /// \brief The schema of the record batches from the iterator
+ std::shared_ptr<Schema> schema;
Review Comment:
`RecordBatchReader` has a `schema()` method. We should use that instead of
requiring it to be specified separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]