westonpace commented on a change in pull request #12275:
URL: https://github.com/apache/arrow/pull/12275#discussion_r793908808



##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode {
   std::shared_ptr<SinkNodeConsumer> consumer_;
 };
 
+/**
+ * @brief This node is an extension on ConsumingSinkNode
+ * to facilitate to get the output from an execution plan
+ * as a table. We define a custom SinkNodeConsumer to
+ * enable this functionality.
+ */
+
+struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
+ public:
+  TableSinkNodeConsumer(std::shared_ptr<Table>* out,
+                        std::shared_ptr<Schema> output_schema, MemoryPool* 
pool,
+                        Future<> finish)
+      : out_(out),
+        output_schema_(output_schema),
+        pool_(pool),
+        finish_(std::move(finish)) {}
+
+  Status Consume(ExecBatch batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_));
+    if (rb) {
+      batch_vector.push_back(rb);
+    } else {
+      return Status::Invalid("Invalid ExecBatch consumed");
+    }
+    return Status::OK();
+  }
+
+  Future<> Finish() override {
+    ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch_vector));
+    *out_ = table;
+    return finish_;

Review comment:
       So the purpose of this callback is for consuming sink implementations 
that need to do some kind of (potentially asynchronous) cleanup after the last 
batch has arrived.  For example, in the dataset writer, after the last batch 
has arrived, we need to close all open files.  This is an asynchronous 
operation and so we return a future that will complete when all those files are 
closed.
   
   In the ToTable case we only need to create a table from all the batches we 
have collected.  This does not involve any I/O and so it does not need to be 
asynchronous.  So you can just return `Future<>::MarkFinished()`.
   
   However, because this happens rather often, we have a [convenience 
constructor](https://github.com/apache/arrow/blob/03f3cf986314654e932587d01df59ad145faf5b9/cpp/src/arrow/util/future.h#L686)
 which can implicitly create a finished future from a status.  So you should be 
able to just return `Status::OK()` (which will get implicitly converted to a 
completed `Future<>`).
   
   Then you should get rid of `TableSinkNodeOptions::finish`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to