westonpace commented on a change in pull request #12275:
URL: https://github.com/apache/arrow/pull/12275#discussion_r793908808
##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode {
std::shared_ptr<SinkNodeConsumer> consumer_;
};
+/**
+ * @brief This node is an extension on ConsumingSinkNode
+ * to facilitate to get the output from an execution plan
+ * as a table. We define a custom SinkNodeConsumer to
+ * enable this functionality.
+ */
+
+struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
+ public:
+ TableSinkNodeConsumer(std::shared_ptr<Table>* out,
+ std::shared_ptr<Schema> output_schema, MemoryPool*
pool,
+ Future<> finish)
+ : out_(out),
+ output_schema_(output_schema),
+ pool_(pool),
+ finish_(std::move(finish)) {}
+
+ Status Consume(ExecBatch batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_));
+ if (rb) {
+ batch_vector.push_back(rb);
+ } else {
+ return Status::Invalid("Invalid ExecBatch consumed");
+ }
+ return Status::OK();
+ }
+
+ Future<> Finish() override {
+ ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch_vector));
+ *out_ = table;
+ return finish_;
Review comment:
So the purpose of this callback is for consuming sink implementations
that need to do some kind (potentially asynchronous) cleanup after the last
batch has arrived. For example, in the dataset writer, after the last batch
has arrived, we need to close all open files. This is an asynchronous
operation and so we return a future that will complete when all those files are
closed.
In the ToTable case we only need to create a table from all the batches we
have collected. This does not involve any I/O and so it does not need to be
asynchronous. So you can just return `Future<>::MarkFinished()`.
However, because this happens rather often, we have a [convenience
constructor](https://github.com/apache/arrow/blob/03f3cf986314654e932587d01df59ad145faf5b9/cpp/src/arrow/util/future.h#L686)
which can implicitly create a finished future from a status. So you should be
able to just return `Status::OK()` (which will get implicitly converted to a
completed `Future<>`).
Then you should get rid of `TableSinkNodeOptions::finish`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]