vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796245719



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const 
TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, 
table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override 
{
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { 
SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+      std::vector<ExecBatch> batches) {
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, 
batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& 
table) {
+    std::shared_ptr<TableBatchReader> reader = 
std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;

Review comment:
       no these were supposed to use, but I haven't actually used them for any 
thing. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to