vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796379045
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+ batch_size(batch_size) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+ const auto& table_options = checked_cast<const
TableSourceNodeOptions&>(options);
+ return plan->EmplaceNode<TableSourceNode>(plan,
table_options.table->schema(),
+ table_options.table,
+ table_options.batch_size);
+ }
+ const char* kind_name() const override { return "TableSourceNode"; }
+
+ [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+ SourceNode::InputReceived(input, batch);
+ }
+ [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+ SourceNode::ErrorReceived(input, status);
+ }
+ [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override
{
+ SourceNode::InputFinished(input, total_batches);
+ }
+
+ Status StartProducing() override { return SourceNode::StartProducing(); }
+
+ void PauseProducing(ExecNode* output) override {
SourceNode::PauseProducing(output); }
+
+ void StopProducing() override { SourceNode::StopProducing(); }
+
+ Future<> finished() override { return SourceNode::finished(); }
+
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+ std::vector<ExecBatch> batches) {
+ auto opt_batches = MapVector(
+ [](ExecBatch batch) { return util::make_optional(std::move(batch)); },
batches);
+ AsyncGenerator<util::optional<ExecBatch>> gen;
+ gen = MakeVectorGenerator(std::move(opt_batches));
+ return gen;
+ }
+
+ arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table&
table) {
+ std::shared_ptr<TableBatchReader> reader =
std::make_shared<TableBatchReader>(table);
+
+ // setting chunksize for the batch reader
+ if (batch_size > 0) {
+ reader->set_chunksize(batch_size);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
+ std::vector<ExecBatch> exec_batches;
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(batch, reader->Next());
Review comment:
Isn't this is a part of `gtest_util.h`, should we include it? I see it's
usage is only in the tests and benchmarks.
Did I interpret it right? Please correct me if I am wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]