rtpsw commented on code in PR #14041:
URL: https://github.com/apache/arrow/pull/14041#discussion_r966134778


##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -291,13 +291,194 @@ struct TableSourceNode : public SourceNode {
   }
 };
 
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+  SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+                   arrow::AsyncGenerator<util::optional<ExecBatch>> generator)
+      : SourceNode(plan, schema, generator) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+    const auto& cast_options = checked_cast<const Options&>(options);
+    auto& it_maker = cast_options.it_maker;
+    auto& schema = cast_options.schema;
+
+    auto io_executor = plan->exec_context()->executor();
+    auto it = it_maker();
+
+    RETURN_NOT_OK(ValidateSchemaSourceNodeInput(io_executor, schema, 
This::kKindName));
+    ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor, 
schema));
+    return plan->EmplaceNode<This>(plan, schema, generator);
+  }
+
+  static arrow::Status ValidateSchemaSourceNodeInput(
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema,
+      const char* kKindName) {
+    if (schema == NULLPTR) {
+      return Status::Invalid(kKindName, " requires schema which is not null");
+    }
+    if (io_executor == NULLPTR) {
+      return Status::Invalid(kKindName, " requires IO-executor which is not 
null");
+    }
+
+    return Status::OK();
+  }
+
+  template <typename Item>
+  static Iterator<Enumerated<Item>> MakeEnumeratedIterator(Iterator<Item> it) {
+    // TODO: Should Enumerated<>.index be changed to int64_t? Currently, this 
change
+    // causes dataset unit-test failures
+    using index_t = decltype(Enumerated<Item>{}.index);
+    struct {
+      index_t index = 0;
+      Enumerated<Item> operator()(const Item& item) {
+        return Enumerated<Item>{item, index++, false};
+      }
+    } enumerator;
+    return MakeMapIterator(std::move(enumerator), std::move(it));
+  }
+
+  template <typename Item>
+  static arrow::AsyncGenerator<Item> MakeUnenumeratedGenerator(
+      const arrow::AsyncGenerator<Enumerated<Item>>& enum_gen) {
+    using Enum = Enumerated<Item>;
+    return MakeMappedGenerator(enum_gen, [](const Enum& e) { return e.value; 
});
+  }
+
+  template <typename Item>
+  static arrow::AsyncGenerator<Item> MakeOrderedGenerator(
+      const arrow::AsyncGenerator<Enumerated<Item>>& unordered_gen) {
+    using Enum = Enumerated<Item>;
+    auto enum_gen = MakeSequencingGenerator(
+        unordered_gen,
+        /*compare=*/[](const Enum& a, const Enum& b) { return a.index > 
b.index; },
+        /*is_next=*/[](const Enum& a, const Enum& b) { return a.index + 1 == 
b.index; },
+        /*initial_value=*/Enum{{}, 0});
+    return MakeUnenumeratedGenerator(enum_gen);
+  }
+};
+
+struct RecordBatchSourceNode
+    : public SchemaSourceNode<RecordBatchSourceNode, 
RecordBatchSourceNodeOptions> {
+  using RecordBatchSchemaSourceNode =
+      SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions>;
+
+  using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode;
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    return RecordBatchSchemaSourceNode::Make(plan, inputs, options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+
+  static Result<arrow::AsyncGenerator<util::optional<ExecBatch>>> 
MakeGenerator(
+      Iterator<std::shared_ptr<RecordBatch>>& batch_it,
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema) {
+    auto to_exec_batch =
+        [schema](const std::shared_ptr<RecordBatch>& batch) -> 
util::optional<ExecBatch> {
+      if (batch == NULLPTR || *batch->schema() != *schema) {
+        return util::nullopt;
+      }
+      return util::optional<ExecBatch>(ExecBatch(*batch));
+    };
+    auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+    auto enum_it = MakeEnumeratedIterator(std::move(exec_batch_it));
+    ARROW_ASSIGN_OR_RAISE(auto enum_gen,
+                          MakeBackgroundGenerator(std::move(enum_it), 
io_executor));
+    return MakeUnenumeratedGenerator(std::move(enum_gen));

Review Comment:
   The problem is that a background generator, which is useful for overlapped 
IO, does not guarantee in-order delivery. My solution is to enumerate at the 
iterator, which does guarantee order, then apply the background generator, and 
finally sort by the enumeration. Though there might be a better way to do 
overlapped IO; I'd be interested in alternatives.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to