westonpace commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r794164590
##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -52,6 +52,19 @@ class ARROW_EXPORT SourceNodeOptions : public
ExecNodeOptions {
std::function<Future<util::optional<ExecBatch>>()> generator;
};
+/// \brief Adapt an Table as a source node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.
+class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
+ public:
+ TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t batch_size)
+ : table(table), batch_size(batch_size) {}
+
+ std::shared_ptr<Table> table;
+ int64_t batch_size;
Review comment:
Can you add docstrings for these fields? They should hopefully be
fairly obvious so I don't think we need too much information but just for
consistency since we are pretty good about this elsewhere in this file.
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+ batch_size(batch_size) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+ const auto& table_options = checked_cast<const
TableSourceNodeOptions&>(options);
+ return plan->EmplaceNode<TableSourceNode>(plan,
table_options.table->schema(),
+ table_options.table,
+ table_options.batch_size);
+ }
+ const char* kind_name() const override { return "TableSourceNode"; }
+
+ [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+ SourceNode::InputReceived(input, batch);
+ }
+ [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+ SourceNode::ErrorReceived(input, status);
+ }
+ [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override
{
+ SourceNode::InputFinished(input, total_batches);
+ }
+
+ Status StartProducing() override { return SourceNode::StartProducing(); }
+
+ void PauseProducing(ExecNode* output) override {
SourceNode::PauseProducing(output); }
+
+ void StopProducing() override { SourceNode::StopProducing(); }
+
+ Future<> finished() override { return SourceNode::finished(); }
Review comment:
Do these methods need to be overridden? Will it work if we omit this?
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+ batch_size(batch_size) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+ const auto& table_options = checked_cast<const
TableSourceNodeOptions&>(options);
+ return plan->EmplaceNode<TableSourceNode>(plan,
table_options.table->schema(),
+ table_options.table,
+ table_options.batch_size);
+ }
+ const char* kind_name() const override { return "TableSourceNode"; }
+
+ [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+ SourceNode::InputReceived(input, batch);
+ }
+ [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+ SourceNode::ErrorReceived(input, status);
+ }
+ [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override
{
+ SourceNode::InputFinished(input, total_batches);
+ }
+
+ Status StartProducing() override { return SourceNode::StartProducing(); }
+
+ void PauseProducing(ExecNode* output) override {
SourceNode::PauseProducing(output); }
+
+ void StopProducing() override { SourceNode::StopProducing(); }
+
+ Future<> finished() override { return SourceNode::finished(); }
+
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+ std::vector<ExecBatch> batches) {
+ auto opt_batches = MapVector(
+ [](ExecBatch batch) { return util::make_optional(std::move(batch)); },
batches);
+ AsyncGenerator<util::optional<ExecBatch>> gen;
+ gen = MakeVectorGenerator(std::move(opt_batches));
+ return gen;
+ }
+
+ arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table&
table) {
+ std::shared_ptr<TableBatchReader> reader =
std::make_shared<TableBatchReader>(table);
+
+ // setting chunksize for the batch reader
+ if (batch_size > 0) {
+ reader->set_chunksize(batch_size);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
+ std::vector<ExecBatch> exec_batches;
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(batch, reader->Next());
Review comment:
Can you use `ASSIGN_OR_ABORT` and add a comment explaining that a
TableBatchReader should not be able to fail but always returns an OK status to
comply with the RecordBatchReader interface.
Then you can change the return value to `std::vector<ExecBatch>` and get rid
of the `ValueOrDie`
Otherwise I don't think it would be safe to be doing this kind of work in a
constructor (and we should be doing it in the `Make` method instead so we can
propagate the failure).
##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -238,6 +238,34 @@ TEST(ExecPlanExecution, SourceSink) {
}
}
+TEST(ExecPlanExecution, TableSourceSink) {
+ for (bool slow : {false, true}) {
+ SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+ for (bool parallel : {false, true}) {
+ SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ auto exp_batches = MakeBasicBatches();
+ ASSERT_OK_AND_ASSIGN(auto table,
+ TableFromExecBatches(exp_batches.schema,
exp_batches.batches));
+
+ ASSERT_OK(
+ Declaration::Sequence({
+ {"table_source",
TableSourceNodeOptions{table, 1}},
Review comment:
Can you test two different batch sizes? I like 1 as a test case but can
you also test 4 (there are no batches of size 4 in the table but this should
still be a valid setting, it won't concatenate batches and will just emit the
undersized batches which is ok).
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+ batch_size(batch_size) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+ const auto& table_options = checked_cast<const
TableSourceNodeOptions&>(options);
+ return plan->EmplaceNode<TableSourceNode>(plan,
table_options.table->schema(),
+ table_options.table,
+ table_options.batch_size);
+ }
+ const char* kind_name() const override { return "TableSourceNode"; }
+
+ [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+ SourceNode::InputReceived(input, batch);
+ }
+ [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+ SourceNode::ErrorReceived(input, status);
+ }
+ [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override
{
+ SourceNode::InputFinished(input, total_batches);
+ }
+
+ Status StartProducing() override { return SourceNode::StartProducing(); }
+
+ void PauseProducing(ExecNode* output) override {
SourceNode::PauseProducing(output); }
+
+ void StopProducing() override { SourceNode::StopProducing(); }
+
+ Future<> finished() override { return SourceNode::finished(); }
+
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
Review comment:
Can we make this a static method named something like
`TableToGenerator`? It looks like a property accessor here but it isn't
accessing the instance state and it is doing too much work.
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
Review comment:
Nit: Rather than call `ConvertTableToExecBatches` here can we call it in
the `generator` method. That will be slightly more readable.
It would be good if we could get rid of the `ValueOrDie` too.
##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -52,6 +52,19 @@ class ARROW_EXPORT SourceNodeOptions : public
ExecNodeOptions {
std::function<Future<util::optional<ExecBatch>>()> generator;
};
+/// \brief Adapt an Table as a source node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.
Review comment:
I'm not sure how much this comment adds value. Can we reword it? Maybe
something like:
The table will be sent through the exec plan in batches. Each batch will be
submitted as a new thread task if plan->exec_context()->executor() is not null.
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+ batch_size(batch_size) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+ const auto& table_options = checked_cast<const
TableSourceNodeOptions&>(options);
+ return plan->EmplaceNode<TableSourceNode>(plan,
table_options.table->schema(),
+ table_options.table,
+ table_options.batch_size);
+ }
+ const char* kind_name() const override { return "TableSourceNode"; }
+
+ [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+ SourceNode::InputReceived(input, batch);
+ }
+ [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+ SourceNode::ErrorReceived(input, status);
+ }
+ [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override
{
+ SourceNode::InputFinished(input, total_batches);
+ }
+
+ Status StartProducing() override { return SourceNode::StartProducing(); }
+
+ void PauseProducing(ExecNode* output) override {
SourceNode::PauseProducing(output); }
+
+ void StopProducing() override { SourceNode::StopProducing(); }
+
+ Future<> finished() override { return SourceNode::finished(); }
+
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+ std::vector<ExecBatch> batches) {
+ auto opt_batches = MapVector(
+ [](ExecBatch batch) { return util::make_optional(std::move(batch)); },
batches);
+ AsyncGenerator<util::optional<ExecBatch>> gen;
+ gen = MakeVectorGenerator(std::move(opt_batches));
+ return gen;
+ }
+
+ arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table&
table) {
+ std::shared_ptr<TableBatchReader> reader =
std::make_shared<TableBatchReader>(table);
+
+ // setting chunksize for the batch reader
+ if (batch_size > 0) {
+ reader->set_chunksize(batch_size);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
Review comment:
Is this used?
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+ batch_size(batch_size) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+ const auto& table_options = checked_cast<const
TableSourceNodeOptions&>(options);
+ return plan->EmplaceNode<TableSourceNode>(plan,
table_options.table->schema(),
Review comment:
We aren't great about validation in our existing nodes but since this is
essentially a "public API" I think we can start being better.
* What happens if `table` is `nullptr`?
* What happens if `batch_size <= 0`?
##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};
+struct TableSourceNode : public SourceNode {
+ TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+ std::shared_ptr<Table> table, int64_t batch_size)
+ : SourceNode(plan, output_schema,
+
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+ batch_size(batch_size) {}
Review comment:
```suggestion
TableSourceNode(ExecPlan* plan,
std::shared_ptr<Table> table, int64_t batch_size)
: SourceNode(plan, table->schema(),
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
batch_size(batch_size) {}
```
The `output_schema` should be the schema of the `table`. We shouldn't need
to take this in. I guess that is where this comes from in `Make` but I think
it would be a bit cleaner to take in three arguments instead of four.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]