westonpace commented on code in PR #14041:
URL: https://github.com/apache/arrow/pull/14041#discussion_r965898567
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -77,6 +77,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public
ExecNodeOptions {
int64_t max_batch_size;
};
+/// \brief An extended Source node which accepts a schema
+///
+/// ItMaker is a maker of an iterator of tabular data.
Review Comment:
Why are we taking in a factory? It seems that `it_maker` is called exactly
once, when the node is created. So what is the advantage to accepting, for
example, `std::function<Iterator<std::shared_ptr<ExecBatch>>()>` instead of
`Iterator<std::shared_ptr<ExecBatch>>`?
##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -291,13 +291,194 @@ struct TableSourceNode : public SourceNode {
}
};
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+ SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator)
+ : SourceNode(plan, schema, generator) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+ const auto& cast_options = checked_cast<const Options&>(options);
+ auto& it_maker = cast_options.it_maker;
+ auto& schema = cast_options.schema;
+
+ auto io_executor = plan->exec_context()->executor();
+ auto it = it_maker();
+
+ RETURN_NOT_OK(ValidateSchemaSourceNodeInput(io_executor, schema,
This::kKindName));
+ ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor,
schema));
+ return plan->EmplaceNode<This>(plan, schema, generator);
+ }
+
+ static arrow::Status ValidateSchemaSourceNodeInput(
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema,
+ const char* kKindName) {
+ if (schema == NULLPTR) {
+ return Status::Invalid(kKindName, " requires schema which is not null");
+ }
+ if (io_executor == NULLPTR) {
+ return Status::Invalid(kKindName, " requires IO-executor which is not
null");
+ }
+
+ return Status::OK();
+ }
+
+ template <typename Item>
+ static Iterator<Enumerated<Item>> MakeEnumeratedIterator(Iterator<Item> it) {
+ // TODO: Should Enumerated<>.index be changed to int64_t? Currently, this
change
+ // causes dataset unit-test failures
+ using index_t = decltype(Enumerated<Item>{}.index);
+ struct {
+ index_t index = 0;
+ Enumerated<Item> operator()(const Item& item) {
+ return Enumerated<Item>{item, index++, false};
+ }
+ } enumerator;
+ return MakeMapIterator(std::move(enumerator), std::move(it));
+ }
+
+ template <typename Item>
+ static arrow::AsyncGenerator<Item> MakeUnenumeratedGenerator(
+ const arrow::AsyncGenerator<Enumerated<Item>>& enum_gen) {
+ using Enum = Enumerated<Item>;
+ return MakeMappedGenerator(enum_gen, [](const Enum& e) { return e.value;
});
+ }
+
+ template <typename Item>
+ static arrow::AsyncGenerator<Item> MakeOrderedGenerator(
+ const arrow::AsyncGenerator<Enumerated<Item>>& unordered_gen) {
+ using Enum = Enumerated<Item>;
+ auto enum_gen = MakeSequencingGenerator(
+ unordered_gen,
+ /*compare=*/[](const Enum& a, const Enum& b) { return a.index >
b.index; },
+ /*is_next=*/[](const Enum& a, const Enum& b) { return a.index + 1 ==
b.index; },
+ /*initial_value=*/Enum{{}, 0});
+ return MakeUnenumeratedGenerator(enum_gen);
+ }
+};
+
+struct RecordBatchSourceNode
+ : public SchemaSourceNode<RecordBatchSourceNode,
RecordBatchSourceNodeOptions> {
+ using RecordBatchSchemaSourceNode =
+ SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions>;
+
+ using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode;
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ return RecordBatchSchemaSourceNode::Make(plan, inputs, options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+
+ static Result<arrow::AsyncGenerator<util::optional<ExecBatch>>>
MakeGenerator(
+ Iterator<std::shared_ptr<RecordBatch>>& batch_it,
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema) {
+ auto to_exec_batch =
+ [schema](const std::shared_ptr<RecordBatch>& batch) ->
util::optional<ExecBatch> {
+ if (batch == NULLPTR || *batch->schema() != *schema) {
+ return util::nullopt;
+ }
+ return util::optional<ExecBatch>(ExecBatch(*batch));
+ };
+ auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+ auto enum_it = MakeEnumeratedIterator(std::move(exec_batch_it));
+ ARROW_ASSIGN_OR_RAISE(auto enum_gen,
+ MakeBackgroundGenerator(std::move(enum_it),
io_executor));
+ return MakeUnenumeratedGenerator(std::move(enum_gen));
Review Comment:
Why are we enumerating and then unenumerating all these generators?
##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -291,13 +291,194 @@ struct TableSourceNode : public SourceNode {
}
};
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+ SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator)
+ : SourceNode(plan, schema, generator) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+ const auto& cast_options = checked_cast<const Options&>(options);
+ auto& it_maker = cast_options.it_maker;
+ auto& schema = cast_options.schema;
+
+ auto io_executor = plan->exec_context()->executor();
+ auto it = it_maker();
+
+ RETURN_NOT_OK(ValidateSchemaSourceNodeInput(io_executor, schema,
This::kKindName));
+ ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor,
schema));
+ return plan->EmplaceNode<This>(plan, schema, generator);
+ }
+
+ static arrow::Status ValidateSchemaSourceNodeInput(
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema,
+ const char* kKindName) {
+ if (schema == NULLPTR) {
+ return Status::Invalid(kKindName, " requires schema which is not null");
+ }
+ if (io_executor == NULLPTR) {
+ return Status::Invalid(kKindName, " requires IO-executor which is not
null");
+ }
Review Comment:
Couldn't we default to some kind of default IO executor here?
##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -291,13 +291,194 @@ struct TableSourceNode : public SourceNode {
}
};
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+ SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+ arrow::AsyncGenerator<util::optional<ExecBatch>> generator)
+ : SourceNode(plan, schema, generator) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+ const auto& cast_options = checked_cast<const Options&>(options);
+ auto& it_maker = cast_options.it_maker;
+ auto& schema = cast_options.schema;
+
+ auto io_executor = plan->exec_context()->executor();
+ auto it = it_maker();
+
+ RETURN_NOT_OK(ValidateSchemaSourceNodeInput(io_executor, schema,
This::kKindName));
+ ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor,
schema));
+ return plan->EmplaceNode<This>(plan, schema, generator);
+ }
+
+ static arrow::Status ValidateSchemaSourceNodeInput(
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema,
+ const char* kKindName) {
+ if (schema == NULLPTR) {
+ return Status::Invalid(kKindName, " requires schema which is not null");
+ }
+ if (io_executor == NULLPTR) {
+ return Status::Invalid(kKindName, " requires IO-executor which is not
null");
+ }
+
+ return Status::OK();
+ }
+
+ template <typename Item>
+ static Iterator<Enumerated<Item>> MakeEnumeratedIterator(Iterator<Item> it) {
+ // TODO: Should Enumerated<>.index be changed to int64_t? Currently, this
change
+ // causes dataset unit-test failures
Review Comment:
I think there are probably many places that would fail today if we had more
than 2Bi batches. That sort of scale is usually larger than a single node I
think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]