westonpace commented on a change in pull request #10397:
URL: https://github.com/apache/arrow/pull/10397#discussion_r643353334



##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -225,22 +212,43 @@ class ARROW_EXPORT ExecNode {
   virtual void StopProducing() = 0;
 
  protected:
-  ExecNode(ExecPlan* plan, std::string label, std::vector<BatchDescr> 
input_descrs,
+  ExecNode(ExecPlan*, std::string label, NodeVector inputs,

Review comment:
       Nit: Seems odd to have no name for `ExecPlan` but then have a name for 
everything else.

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -61,13 +61,22 @@ Expression call(std::string function, 
std::vector<Expression> arguments,
   call.function_name = std::move(function);
   call.arguments = std::move(arguments);
   call.options = std::move(options);
+
+  call.hash = std::hash<std::string>{}(call.function_name);
+  for (const auto& arg : call.arguments) {
+    call.hash ^= arg.hash();
+  }
   return Expression(std::move(call));
 }
 
 const Datum* Expression::literal() const { return 
util::get_if<Datum>(impl_.get()); }
 
+const Expression::Parameter* Expression::parameter() const {

Review comment:
       Checking my knowledge.  A parameter is an index + an expected type?

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -613,6 +639,22 @@ std::vector<FieldRef> FieldsInExpression(const Expression& 
expr) {
   return fields;
 }
 
+std::vector<int> ParametersInExpression(const Expression& expr) {

Review comment:
       It seems this could return duplicate indices (e.g. in something like `x 
< 5 && x > 0`).  Is that a problem?

##########
File path: cpp/src/arrow/dataset/dataset_internal.h
##########
@@ -204,5 +204,35 @@ arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
   return internal::checked_pointer_cast<T>(source);
 }
 
+class FragmentDataset : public Dataset {

Review comment:
       Should this be a base type of `InMemoryDataset`?

##########
File path: cpp/src/arrow/compute/exec/test_util.cc
##########
@@ -124,277 +130,42 @@ struct DummyNode : ExecNode {
   bool started_ = false;
 };
 
-struct RecordBatchReaderNode : ExecNode {
-  RecordBatchReaderNode(ExecPlan* plan, std::string label,
-                        std::shared_ptr<RecordBatchReader> reader, Executor* 
io_executor)
-      : ExecNode(plan, std::move(label), {}, {},
-                 DescrFromSchemaColumns(*reader->schema()), /*num_outputs=*/1),
-        schema_(reader->schema()),
-        reader_(std::move(reader)),
-        io_executor_(io_executor) {}
-
-  RecordBatchReaderNode(ExecPlan* plan, std::string label, 
std::shared_ptr<Schema> schema,
-                        RecordBatchGenerator generator, Executor* io_executor)
-      : ExecNode(plan, std::move(label), {}, {}, 
DescrFromSchemaColumns(*schema),
-                 /*num_outputs=*/1),
-        schema_(std::move(schema)),
-        generator_(std::move(generator)),
-        io_executor_(io_executor) {}
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) 
override {}
-
-  void ErrorReceived(ExecNode* input, Status error) override {}
-
-  void InputFinished(ExecNode* input, int seq_stop) override {}
-
-  Status StartProducing() override {
-    next_batch_index_ = 0;
-    if (!generator_) {
-      auto it = MakeIteratorFromReader(reader_);
-      ARROW_ASSIGN_OR_RAISE(generator_,
-                            MakeBackgroundGenerator(std::move(it), 
io_executor_));
-    }
-    GenerateOne(std::unique_lock<std::mutex>{mutex_});
-    return Status::OK();
-  }
-
-  void PauseProducing(ExecNode* output) override {}
-
-  void ResumeProducing(ExecNode* output) override {}
-
-  void StopProducing(ExecNode* output) override {
-    ASSERT_EQ(output, outputs_[0]);
-    std::unique_lock<std::mutex> lock(mutex_);
-    generator_ = nullptr;  // null function
-  }
-
-  void StopProducing() override { StopProducing(outputs_[0]); }
-
- private:
-  void GenerateOne(std::unique_lock<std::mutex>&& lock) {
-    if (!generator_) {
-      // Stopped
-      return;
-    }
-    auto plan = this->plan()->shared_from_this();
-    auto fut = generator_();
-    const auto batch_index = next_batch_index_++;
-
-    lock.unlock();
-    // TODO we want to transfer always here
-    io_executor_->Transfer(std::move(fut))
-        .AddCallback(
-            [plan, batch_index, this](const 
Result<std::shared_ptr<RecordBatch>>& res) {
-              std::unique_lock<std::mutex> lock(mutex_);
-              if (!res.ok()) {
-                for (auto out : outputs_) {
-                  out->ErrorReceived(this, res.status());
-                }
-                return;
-              }
-              const auto& batch = *res;
-              if (IsIterationEnd(batch)) {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputFinished(this, batch_index);
-                }
-              } else {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputReceived(this, batch_index, 
compute::ExecBatch(*batch));
-                }
-                lock.lock();
-                GenerateOne(std::move(lock));
-              }
-            });
-  }
-
-  std::mutex mutex_;
-  const std::shared_ptr<Schema> schema_;
-  const std::shared_ptr<RecordBatchReader> reader_;
-  RecordBatchGenerator generator_;
-  int next_batch_index_;
-
-  Executor* const io_executor_;
-};
-
-struct RecordBatchCollectNodeImpl : public RecordBatchCollectNode {
-  RecordBatchCollectNodeImpl(ExecPlan* plan, std::string label,
-                             std::shared_ptr<Schema> schema)
-      : RecordBatchCollectNode(plan, std::move(label), 
{DescrFromSchemaColumns(*schema)},
-                               {"batches_to_collect"}, {}, 0),
-        schema_(std::move(schema)) {}
-
-  RecordBatchGenerator generator() override { return generator_; }
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  Status StartProducing() override {
-    num_received_ = 0;
-    num_emitted_ = 0;
-    emit_stop_ = -1;
-    stopped_ = false;
-    producer_.emplace(generator_.producer());
-    return Status::OK();
-  }
-
-  // sink nodes have no outputs from which to feel backpressure
-  void ResumeProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void PauseProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void StopProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-
-  void StopProducing() override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputReceived(ExecNode* input, int seq_num,
-                     compute::ExecBatch exec_batch) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (stopped_) {
-      return;
-    }
-    auto maybe_batch = MakeBatch(std::move(exec_batch));
-    if (!maybe_batch.ok()) {
-      lock.unlock();
-      producer_->Push(std::move(maybe_batch));
-      return;
-    }
-
-    // TODO would be nice to factor this out in a ReorderQueue
-    auto batch = *std::move(maybe_batch);
-    if (seq_num <= static_cast<int>(received_batches_.size())) {
-      received_batches_.resize(seq_num + 1, nullptr);
-    }
-    DCHECK_EQ(received_batches_[seq_num], nullptr);
-    received_batches_[seq_num] = std::move(batch);
-    ++num_received_;
-
-    if (seq_num != num_emitted_) {
-      // Cannot emit yet as there is a hole at `num_emitted_`
-      DCHECK_GT(seq_num, num_emitted_);
-      DCHECK_EQ(received_batches_[num_emitted_], nullptr);
-      return;
-    }
-    if (num_received_ == emit_stop_) {
-      StopProducingUnlocked();
-    }
-
-    // Emit batches in order as far as possible
-    // First collect these batches, then unlock before producing.
-    const auto seq_start = seq_num;
-    while (seq_num < static_cast<int>(received_batches_.size()) &&
-           received_batches_[seq_num] != nullptr) {
-      ++seq_num;
-    }
-    DCHECK_GT(seq_num, seq_start);
-    // By moving the values now, we make sure another thread won't emit the 
same values
-    // below
-    RecordBatchVector to_emit(
-        std::make_move_iterator(received_batches_.begin() + seq_start),
-        std::make_move_iterator(received_batches_.begin() + seq_num));
-
-    lock.unlock();
-    for (auto&& batch : to_emit) {
-      producer_->Push(std::move(batch));
-    }
-    lock.lock();
-
-    DCHECK_EQ(seq_start, num_emitted_);  // num_emitted_ wasn't bumped in the 
meantime
-    num_emitted_ = seq_num;
-  }
-
-  void ErrorReceived(ExecNode* input, Status error) override {
-    // XXX do we care about properly sequencing the error?
-    producer_->Push(std::move(error));
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputFinished(ExecNode* input, int seq_stop) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    DCHECK_GE(seq_stop, static_cast<int>(received_batches_.size()));
-    received_batches_.reserve(seq_stop);
-    emit_stop_ = seq_stop;
-    if (emit_stop_ == num_received_) {
-      DCHECK_EQ(emit_stop_, num_emitted_);
-      StopProducingUnlocked();
-    }
-  }
-
- private:
-  void StopProducingUnlocked() {
-    if (!stopped_) {
-      stopped_ = true;
-      producer_->Close();
-      inputs_[0]->StopProducing(this);
-    }
-  }
-
-  // TODO factor this out as ExecBatch::ToRecordBatch()?
-  Result<std::shared_ptr<RecordBatch>> MakeBatch(compute::ExecBatch&& 
exec_batch) {
-    ArrayDataVector columns;
-    columns.reserve(exec_batch.values.size());
-    for (auto&& value : exec_batch.values) {
-      if (!value.is_array()) {
-        return Status::TypeError("Expected array input");
-      }
-      columns.push_back(std::move(value).array());
-    }
-    return RecordBatch::Make(schema_, exec_batch.length, std::move(columns));
-  }
-
-  const std::shared_ptr<Schema> schema_;
-
-  std::mutex mutex_;
-  RecordBatchVector received_batches_;
-  int num_received_;
-  int num_emitted_;
-  int emit_stop_;
-  bool stopped_;
-
-  PushGenerator<std::shared_ptr<RecordBatch>> generator_;
-  util::optional<PushGenerator<std::shared_ptr<RecordBatch>>::Producer> 
producer_;
-};
+AsyncGenerator<util::optional<ExecBatch>> Wrap(RecordBatchGenerator gen,
+                                               ::arrow::internal::Executor* 
io_executor) {
+  return MakeMappedGenerator(
+      MakeTransferredGenerator(std::move(gen), io_executor),
+      [](const std::shared_ptr<RecordBatch>& batch) -> 
util::optional<ExecBatch> {
+        return ExecBatch(*batch);
+      });
+}
 
 }  // namespace
 
 ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label,
-                                    std::shared_ptr<RecordBatchReader> reader,
-                                    Executor* io_executor) {
-  return plan->EmplaceNode<RecordBatchReaderNode>(plan, std::move(label),
-                                                  std::move(reader), 
io_executor);
+                                    const std::shared_ptr<Schema>& schema,
+                                    RecordBatchGenerator generator,
+                                    ::arrow::internal::Executor* io_executor) {
+  return MakeSourceNode(plan, std::move(label), 
DescrFromSchemaColumns(*schema),
+                        Wrap(std::move(generator), io_executor));
 }
 
 ExecNode* MakeRecordBatchReaderNode(ExecPlan* plan, std::string label,
-                                    std::shared_ptr<Schema> schema,
-                                    RecordBatchGenerator generator,
-                                    ::arrow::internal::Executor* io_executor) {
-  return plan->EmplaceNode<RecordBatchReaderNode>(
-      plan, std::move(label), std::move(schema), std::move(generator), 
io_executor);
+                                    const std::shared_ptr<RecordBatchReader>& 
reader,
+                                    Executor* io_executor) {
+  auto gen =
+      MakeBackgroundGenerator(MakeIteratorFromReader(reader), 
io_executor).ValueOrDie();

Review comment:
       Same question, why `io_executor`?

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -510,7 +475,67 @@ Result<Expression> Expression::Bind(const Schema& 
in_schema,
   return Bind(ValueDescr::Array(struct_(in_schema.fields())), exec_context);
 }
 
-Result<Datum> ExecuteScalarExpression(const Expression& expr, const Datum& 
input,
+Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& 
partial) {
+  ExecBatch out;
+
+  if (partial.kind() == Datum::RECORD_BATCH) {
+    const auto& partial_batch = *partial.record_batch();
+    out.length = partial_batch.num_rows();
+
+    for (const auto& field : full_schema.fields()) {
+      ARROW_ASSIGN_OR_RAISE(auto column,
+                            
FieldRef(field->name()).GetOneOrNone(partial_batch));
+
+      if (column) {
+        if (!column->type()->Equals(field->type())) {
+          // Referenced field was present but didn't have the expected type.
+          // This *should* be handled by readers, and will just be an error in 
the future.
+          ARROW_ASSIGN_OR_RAISE(
+              auto converted,
+              compute::Cast(column, field->type(), 
compute::CastOptions::Safe()));
+          column = converted.make_array();
+        }
+        out.values.emplace_back(std::move(column));
+      } else {
+        out.values.emplace_back(MakeNullScalar(field->type()));
+      }
+    }
+    return out;
+  }
+
+  // wasteful but useful for testing:
+  if (partial.type()->id() == Type::STRUCT) {
+    if (partial.is_array()) {
+      ARROW_ASSIGN_OR_RAISE(auto partial_batch,
+                            
RecordBatch::FromStructArray(partial.make_array()));
+
+      return MakeExecBatch(full_schema, partial_batch);
+    }
+
+    if (partial.is_scalar()) {
+      ARROW_ASSIGN_OR_RAISE(auto partial_array,
+                            MakeArrayFromScalar(*partial.scalar(), 1));
+      ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, 
partial_array));
+
+      for (Datum& value : out.values) {
+        if (value.is_scalar()) continue;
+        ARROW_ASSIGN_OR_RAISE(value, value.make_array()->GetScalar(0));
+      }

Review comment:
       I'm not sure what is going on here (though that is likely my own 
problem).  If the value is a scalar record batch you want to end up with one 
each value being a scalar.  Can you not just grab the first item from each 
column of `partial_array`?  Why do you need to go back in and patch things up?

##########
File path: cpp/src/arrow/compute/exec/expression.cc
##########
@@ -510,7 +475,67 @@ Result<Expression> Expression::Bind(const Schema& 
in_schema,
   return Bind(ValueDescr::Array(struct_(in_schema.fields())), exec_context);
 }
 
-Result<Datum> ExecuteScalarExpression(const Expression& expr, const Datum& 
input,
+Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& 
partial) {
+  ExecBatch out;
+
+  if (partial.kind() == Datum::RECORD_BATCH) {
+    const auto& partial_batch = *partial.record_batch();
+    out.length = partial_batch.num_rows();
+
+    for (const auto& field : full_schema.fields()) {
+      ARROW_ASSIGN_OR_RAISE(auto column,
+                            
FieldRef(field->name()).GetOneOrNone(partial_batch));

Review comment:
       Will this fail if the column names are not unique?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -258,43 +258,27 @@ class MappingGenerator {
 /// Note: Errors returned from the `map` function will be propagated
 ///
 /// If the source generator is async-reentrant then this generator will be also
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
-                                      std::function<Result<V>(const T&)> map) {
-  std::function<Future<V>(const T&)> future_map = [map](const T& val) -> 
Future<V> {
-    return Future<V>::MakeFinished(map(val));
-  };
-  return MappingGenerator<T, V>(std::move(source_generator), 
std::move(future_map));
-}
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
-                                      std::function<V(const T&)> map) {
-  std::function<Future<V>(const T&)> maybe_future_map = [map](const T& val) -> 
Future<V> {
-    return Future<V>::MakeFinished(map(val));
-  };
-  return MappingGenerator<T, V>(std::move(source_generator), 
std::move(maybe_future_map));
-}
-template <typename T, typename V>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator,
-                                      std::function<Future<V>(const T&)> map) {
-  return MappingGenerator<T, V>(std::move(source_generator), std::move(map));
-}
-
-template <typename V, typename T, typename MapFunc>
-AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, 
MapFunc map) {
+template <typename T, typename MapFn,

Review comment:
       Thanks for figuring this out!

##########
File path: cpp/src/arrow/compute/exec/expression.h
##########
@@ -207,11 +218,22 @@ Result<Expression> SimplifyWithGuarantee(Expression,
 
 // Execution
 
-/// Execute a scalar expression against the provided state and input Datum. 
This
+/// Ensure that a RecordBatch (which may have missing or incorrectly ordered 
columns)

Review comment:
       This reads like an explanatory NB more than a description of what the 
function does.  Maybe...
   
   ```
   Converts a RecordBatch to an ExecBatch
   
   Arrays will be reordered according to schema ordering.  Missing fields will 
be replaced with null scalars.  This is necessary when executing expressions 
since we look up fields by index.
   ```
   
   Also, it feels a bit like a catch-22, though this is more of an observation 
than a complaint.  "We need to change expressions to use indices so they will 
work on exec batches." and "We need to convert record batches to exec batches 
because expressions work on indices."
   
   Maybe just remove the `This is necessary...` statement.  By this point the 
user is already making an exec batch so presumably they have a reason for it.

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -225,22 +212,43 @@ class ARROW_EXPORT ExecNode {
   virtual void StopProducing() = 0;
 
  protected:
-  ExecNode(ExecPlan* plan, std::string label, std::vector<BatchDescr> 
input_descrs,
+  ExecNode(ExecPlan*, std::string label, NodeVector inputs,
            std::vector<std::string> input_labels, BatchDescr output_descr,
            int num_outputs);
 
   ExecPlan* plan_;
-
   std::string label_;
 
-  std::vector<BatchDescr> input_descrs_;
-  std::vector<std::string> input_labels_;
   NodeVector inputs_;
+  std::vector<std::string> input_labels_;
 
   BatchDescr output_descr_;
   int num_outputs_;
   NodeVector outputs_;
 };
 
+/// \brief Adapt an AsyncGenerator<ExecBatch> as a source node
+ARROW_EXPORT
+ExecNode* MakeSourceNode(ExecPlan*, std::string label, ExecNode::BatchDescr 
output_descr,
+                         AsyncGenerator<util::optional<ExecBatch>>);
+
+/// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
+ARROW_EXPORT
+AsyncGenerator<util::optional<ExecBatch>> MakeSinkNode(ExecNode* input,
+                                                       std::string label);
+
+/// \brief Make a node which excludes some rows from batches passed through it
+///
+/// filter Expression must be bound; no field references will be looked up by 
name
+ARROW_EXPORT
+ExecNode* MakeFilterNode(ExecNode* input, std::string label, Expression 
filter);
+
+/// \brief Make a node which executes expressions on input batches, producing 
new batches.
+///
+/// Expressions must be bound; no field references will be looked up by name
+ARROW_EXPORT
+ExecNode* MakeProjectNode(ExecNode* input, std::string label,

Review comment:
       I maybe understand from the meaning of `Project` what is going on here 
but if someone were not as familiar with the domain they might not.  It's not 
clear that the output will have one output column per expression.  From the 
description this sounds like it might be more of a generic "map" operation that 
maps input batch to output batch.

##########
File path: cpp/src/arrow/compute/exec/expression_test.cc
##########
@@ -165,6 +165,56 @@ TEST(ExpressionUtils, StripOrderPreservingCasts) {
   Expect(cast(field_ref("i32"), uint64()), no_change);
 }
 
+TEST(ExpressionUtils, MakeExecBatch) {
+  auto Expect = [](std::shared_ptr<RecordBatch> partial_batch) {
+    SCOPED_TRACE(partial_batch->ToString());
+    ASSERT_OK_AND_ASSIGN(auto batch, MakeExecBatch(*kBoringSchema, 
partial_batch));
+
+    ASSERT_EQ(batch.num_values(), kBoringSchema->num_fields());
+    for (int i = 0; i < kBoringSchema->num_fields(); ++i) {
+      const auto& field = *kBoringSchema->field(i);
+
+      SCOPED_TRACE("Field#" + std::to_string(i) + " " + field.ToString());
+
+      EXPECT_TRUE(batch[i].type()->Equals(field.type()))
+          << "Incorrect type " << batch[i].type()->ToString();
+
+      ASSERT_OK_AND_ASSIGN(auto col, 
FieldRef(field.name()).GetOneOrNone(*partial_batch));

Review comment:
       Why not `partial_batch->GetFieldByName(field.name())`?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -66,6 +66,12 @@ bool IsIterationEnd(const T& val) {
   return IterationTraits<T>::IsEnd(val);
 }
 
+template <typename T>
+bool IsIterationEnd(const Result<T>& maybe_val) {

Review comment:
       Where is this used?

##########
File path: cpp/src/arrow/compute/exec/test_util.cc
##########
@@ -124,277 +130,42 @@ struct DummyNode : ExecNode {
   bool started_ = false;
 };
 
-struct RecordBatchReaderNode : ExecNode {
-  RecordBatchReaderNode(ExecPlan* plan, std::string label,
-                        std::shared_ptr<RecordBatchReader> reader, Executor* 
io_executor)
-      : ExecNode(plan, std::move(label), {}, {},
-                 DescrFromSchemaColumns(*reader->schema()), /*num_outputs=*/1),
-        schema_(reader->schema()),
-        reader_(std::move(reader)),
-        io_executor_(io_executor) {}
-
-  RecordBatchReaderNode(ExecPlan* plan, std::string label, 
std::shared_ptr<Schema> schema,
-                        RecordBatchGenerator generator, Executor* io_executor)
-      : ExecNode(plan, std::move(label), {}, {}, 
DescrFromSchemaColumns(*schema),
-                 /*num_outputs=*/1),
-        schema_(std::move(schema)),
-        generator_(std::move(generator)),
-        io_executor_(io_executor) {}
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) 
override {}
-
-  void ErrorReceived(ExecNode* input, Status error) override {}
-
-  void InputFinished(ExecNode* input, int seq_stop) override {}
-
-  Status StartProducing() override {
-    next_batch_index_ = 0;
-    if (!generator_) {
-      auto it = MakeIteratorFromReader(reader_);
-      ARROW_ASSIGN_OR_RAISE(generator_,
-                            MakeBackgroundGenerator(std::move(it), 
io_executor_));
-    }
-    GenerateOne(std::unique_lock<std::mutex>{mutex_});
-    return Status::OK();
-  }
-
-  void PauseProducing(ExecNode* output) override {}
-
-  void ResumeProducing(ExecNode* output) override {}
-
-  void StopProducing(ExecNode* output) override {
-    ASSERT_EQ(output, outputs_[0]);
-    std::unique_lock<std::mutex> lock(mutex_);
-    generator_ = nullptr;  // null function
-  }
-
-  void StopProducing() override { StopProducing(outputs_[0]); }
-
- private:
-  void GenerateOne(std::unique_lock<std::mutex>&& lock) {
-    if (!generator_) {
-      // Stopped
-      return;
-    }
-    auto plan = this->plan()->shared_from_this();
-    auto fut = generator_();
-    const auto batch_index = next_batch_index_++;
-
-    lock.unlock();
-    // TODO we want to transfer always here
-    io_executor_->Transfer(std::move(fut))
-        .AddCallback(
-            [plan, batch_index, this](const 
Result<std::shared_ptr<RecordBatch>>& res) {
-              std::unique_lock<std::mutex> lock(mutex_);
-              if (!res.ok()) {
-                for (auto out : outputs_) {
-                  out->ErrorReceived(this, res.status());
-                }
-                return;
-              }
-              const auto& batch = *res;
-              if (IsIterationEnd(batch)) {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputFinished(this, batch_index);
-                }
-              } else {
-                lock.unlock();
-                for (auto out : outputs_) {
-                  out->InputReceived(this, batch_index, 
compute::ExecBatch(*batch));
-                }
-                lock.lock();
-                GenerateOne(std::move(lock));
-              }
-            });
-  }
-
-  std::mutex mutex_;
-  const std::shared_ptr<Schema> schema_;
-  const std::shared_ptr<RecordBatchReader> reader_;
-  RecordBatchGenerator generator_;
-  int next_batch_index_;
-
-  Executor* const io_executor_;
-};
-
-struct RecordBatchCollectNodeImpl : public RecordBatchCollectNode {
-  RecordBatchCollectNodeImpl(ExecPlan* plan, std::string label,
-                             std::shared_ptr<Schema> schema)
-      : RecordBatchCollectNode(plan, std::move(label), 
{DescrFromSchemaColumns(*schema)},
-                               {"batches_to_collect"}, {}, 0),
-        schema_(std::move(schema)) {}
-
-  RecordBatchGenerator generator() override { return generator_; }
-
-  const char* kind_name() override { return "RecordBatchReader"; }
-
-  Status StartProducing() override {
-    num_received_ = 0;
-    num_emitted_ = 0;
-    emit_stop_ = -1;
-    stopped_ = false;
-    producer_.emplace(generator_.producer());
-    return Status::OK();
-  }
-
-  // sink nodes have no outputs from which to feel backpressure
-  void ResumeProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void PauseProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-  void StopProducing(ExecNode* output) override {
-    FAIL() << "no outputs; this should never be called";
-  }
-
-  void StopProducing() override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputReceived(ExecNode* input, int seq_num,
-                     compute::ExecBatch exec_batch) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    if (stopped_) {
-      return;
-    }
-    auto maybe_batch = MakeBatch(std::move(exec_batch));
-    if (!maybe_batch.ok()) {
-      lock.unlock();
-      producer_->Push(std::move(maybe_batch));
-      return;
-    }
-
-    // TODO would be nice to factor this out in a ReorderQueue
-    auto batch = *std::move(maybe_batch);
-    if (seq_num <= static_cast<int>(received_batches_.size())) {
-      received_batches_.resize(seq_num + 1, nullptr);
-    }
-    DCHECK_EQ(received_batches_[seq_num], nullptr);
-    received_batches_[seq_num] = std::move(batch);
-    ++num_received_;
-
-    if (seq_num != num_emitted_) {
-      // Cannot emit yet as there is a hole at `num_emitted_`
-      DCHECK_GT(seq_num, num_emitted_);
-      DCHECK_EQ(received_batches_[num_emitted_], nullptr);
-      return;
-    }
-    if (num_received_ == emit_stop_) {
-      StopProducingUnlocked();
-    }
-
-    // Emit batches in order as far as possible
-    // First collect these batches, then unlock before producing.
-    const auto seq_start = seq_num;
-    while (seq_num < static_cast<int>(received_batches_.size()) &&
-           received_batches_[seq_num] != nullptr) {
-      ++seq_num;
-    }
-    DCHECK_GT(seq_num, seq_start);
-    // By moving the values now, we make sure another thread won't emit the 
same values
-    // below
-    RecordBatchVector to_emit(
-        std::make_move_iterator(received_batches_.begin() + seq_start),
-        std::make_move_iterator(received_batches_.begin() + seq_num));
-
-    lock.unlock();
-    for (auto&& batch : to_emit) {
-      producer_->Push(std::move(batch));
-    }
-    lock.lock();
-
-    DCHECK_EQ(seq_start, num_emitted_);  // num_emitted_ wasn't bumped in the 
meantime
-    num_emitted_ = seq_num;
-  }
-
-  void ErrorReceived(ExecNode* input, Status error) override {
-    // XXX do we care about properly sequencing the error?
-    producer_->Push(std::move(error));
-    std::unique_lock<std::mutex> lock(mutex_);
-    StopProducingUnlocked();
-  }
-
-  void InputFinished(ExecNode* input, int seq_stop) override {
-    std::unique_lock<std::mutex> lock(mutex_);
-    DCHECK_GE(seq_stop, static_cast<int>(received_batches_.size()));
-    received_batches_.reserve(seq_stop);
-    emit_stop_ = seq_stop;
-    if (emit_stop_ == num_received_) {
-      DCHECK_EQ(emit_stop_, num_emitted_);
-      StopProducingUnlocked();
-    }
-  }
-
- private:
-  void StopProducingUnlocked() {
-    if (!stopped_) {
-      stopped_ = true;
-      producer_->Close();
-      inputs_[0]->StopProducing(this);
-    }
-  }
-
-  // TODO factor this out as ExecBatch::ToRecordBatch()?
-  Result<std::shared_ptr<RecordBatch>> MakeBatch(compute::ExecBatch&& 
exec_batch) {
-    ArrayDataVector columns;
-    columns.reserve(exec_batch.values.size());
-    for (auto&& value : exec_batch.values) {
-      if (!value.is_array()) {
-        return Status::TypeError("Expected array input");
-      }
-      columns.push_back(std::move(value).array());
-    }
-    return RecordBatch::Make(schema_, exec_batch.length, std::move(columns));
-  }
-
-  const std::shared_ptr<Schema> schema_;
-
-  std::mutex mutex_;
-  RecordBatchVector received_batches_;
-  int num_received_;
-  int num_emitted_;
-  int emit_stop_;
-  bool stopped_;
-
-  PushGenerator<std::shared_ptr<RecordBatch>> generator_;
-  util::optional<PushGenerator<std::shared_ptr<RecordBatch>>::Producer> 
producer_;
-};
+AsyncGenerator<util::optional<ExecBatch>> Wrap(RecordBatchGenerator gen,
+                                               ::arrow::internal::Executor* 
io_executor) {
+  return MakeMappedGenerator(
+      MakeTransferredGenerator(std::move(gen), io_executor),

Review comment:
       Why are you transferring onto the `io_executor` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to