westonpace commented on code in PR #14041:
URL: https://github.com/apache/arrow/pull/14041#discussion_r976781115


##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -296,6 +296,148 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
               Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
 }
 
+TEST(ExecPlanExecution, ArrayVectorSourceSink) {
+  for (int num_threads : {1, 4}) {
+    ASSERT_OK_AND_ASSIGN(auto io_executor,
+                         arrow::internal::ThreadPool::Make(num_threads));
+    ExecContext exec_context(default_memory_pool(), io_executor.get());
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    auto exp_batches = MakeBasicBatches();
+    ASSERT_OK_AND_ASSIGN(auto arrayvecs, ToArrayVectors(exp_batches));
+    auto arrayvec_it_maker = [&arrayvecs]() {
+      return MakeVectorIterator<std::shared_ptr<ArrayVector>>(arrayvecs);
+    };
+
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"array_source", 
ArrayVectorSourceNodeOptions{exp_batches.schema,
+                                                                    
arrayvec_it_maker}},
+                      {"sink", SinkNodeOptions{&sink_gen}},
+                  })
+                  .AddToPlan(plan.get()));
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+  }
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSinkError) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  std::shared_ptr<Schema> no_schema;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto arrayvecs, ToArrayVectors(exp_batches));
+  auto arrayvec_it_maker = [&arrayvecs]() {
+    return MakeVectorIterator<std::shared_ptr<ArrayVector>>(arrayvecs);
+  };
+
+  auto null_executor_options =
+      ArrayVectorSourceNodeOptions{exp_batches.schema, arrayvec_it_maker};
+  ASSERT_THAT(MakeExecNode("array_source", plan.get(), {}, 
null_executor_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+
+  auto null_schema_options = ArrayVectorSourceNodeOptions{no_schema, 
arrayvec_it_maker};
+  ASSERT_THAT(MakeExecNode("array_source", plan.get(), {}, 
null_schema_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSink) {
+  for (int num_threads : {1, 4}) {
+    ASSERT_OK_AND_ASSIGN(auto io_executor,
+                         arrow::internal::ThreadPool::Make(num_threads));
+    ExecContext exec_context(default_memory_pool(), io_executor.get());
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    auto exp_batches = MakeBasicBatches();
+    ASSERT_OK_AND_ASSIGN(auto exec_batches, ToExecBatches(exp_batches));
+    auto exec_batch_it_maker = [&exec_batches]() {
+      return MakeVectorIterator<std::shared_ptr<ExecBatch>>(exec_batches);
+    };
+
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"exec_source", 
ExecBatchSourceNodeOptions{exp_batches.schema,

Review Comment:
   Hmm, I'm don't know that `exec_source` or `record_source` are very clear out 
of context.  How about `batch_source` and `exec_batch_source`?



##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -296,6 +296,148 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
               Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
 }
 
+TEST(ExecPlanExecution, ArrayVectorSourceSink) {
+  for (int num_threads : {1, 4}) {

Review Comment:
   Testing with a different number of threads is probably overkill for this 
feature.  I'm guessing this was from testing ordered sources?  It's a nit but 
it might be nice to remove this excess complexity.



##########
cpp/src/arrow/compute/exec/plan_test.cc:
##########
@@ -296,6 +296,148 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
               Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
 }
 
+TEST(ExecPlanExecution, ArrayVectorSourceSink) {
+  for (int num_threads : {1, 4}) {
+    ASSERT_OK_AND_ASSIGN(auto io_executor,
+                         arrow::internal::ThreadPool::Make(num_threads));
+    ExecContext exec_context(default_memory_pool(), io_executor.get());
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    auto exp_batches = MakeBasicBatches();
+    ASSERT_OK_AND_ASSIGN(auto arrayvecs, ToArrayVectors(exp_batches));
+    auto arrayvec_it_maker = [&arrayvecs]() {
+      return MakeVectorIterator<std::shared_ptr<ArrayVector>>(arrayvecs);
+    };
+
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"array_source", 
ArrayVectorSourceNodeOptions{exp_batches.schema,
+                                                                    
arrayvec_it_maker}},
+                      {"sink", SinkNodeOptions{&sink_gen}},
+                  })
+                  .AddToPlan(plan.get()));
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+  }
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSinkError) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  std::shared_ptr<Schema> no_schema;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto arrayvecs, ToArrayVectors(exp_batches));
+  auto arrayvec_it_maker = [&arrayvecs]() {
+    return MakeVectorIterator<std::shared_ptr<ArrayVector>>(arrayvecs);
+  };
+
+  auto null_executor_options =
+      ArrayVectorSourceNodeOptions{exp_batches.schema, arrayvec_it_maker};
+  ASSERT_THAT(MakeExecNode("array_source", plan.get(), {}, 
null_executor_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+
+  auto null_schema_options = ArrayVectorSourceNodeOptions{no_schema, 
arrayvec_it_maker};
+  ASSERT_THAT(MakeExecNode("array_source", plan.get(), {}, 
null_schema_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSink) {
+  for (int num_threads : {1, 4}) {
+    ASSERT_OK_AND_ASSIGN(auto io_executor,
+                         arrow::internal::ThreadPool::Make(num_threads));
+    ExecContext exec_context(default_memory_pool(), io_executor.get());
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    auto exp_batches = MakeBasicBatches();
+    ASSERT_OK_AND_ASSIGN(auto exec_batches, ToExecBatches(exp_batches));
+    auto exec_batch_it_maker = [&exec_batches]() {
+      return MakeVectorIterator<std::shared_ptr<ExecBatch>>(exec_batches);
+    };
+
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"exec_source", 
ExecBatchSourceNodeOptions{exp_batches.schema,
+                                                                 
exec_batch_it_maker}},
+                      {"sink", SinkNodeOptions{&sink_gen}},
+                  })
+                  .AddToPlan(plan.get()));
+
+    ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+                
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+  }
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSinkError) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  std::shared_ptr<Schema> no_schema;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto exec_batches, ToExecBatches(exp_batches));
+  auto exec_batch_it_maker = [&exec_batches]() {
+    return MakeVectorIterator<std::shared_ptr<ExecBatch>>(exec_batches);
+  };
+
+  auto null_executor_options =
+      ExecBatchSourceNodeOptions{exp_batches.schema, exec_batch_it_maker};
+  ASSERT_THAT(MakeExecNode("exec_source", plan.get(), {}, 
null_executor_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+
+  auto null_schema_options = ExecBatchSourceNodeOptions{no_schema, 
exec_batch_it_maker};
+  ASSERT_THAT(MakeExecNode("exec_source", plan.get(), {}, null_schema_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+}
+
+TEST(ExecPlanExecution, RecordBatchSourceSink) {

Review Comment:
   I think there's a lot of duplication between these three sources.  Would it 
be possible to turn these into some kind of parameterized test?



##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -291,13 +291,194 @@ struct TableSourceNode : public SourceNode {
   }
 };
 
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+  SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+                   arrow::AsyncGenerator<util::optional<ExecBatch>> generator)
+      : SourceNode(plan, schema, generator) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+    const auto& cast_options = checked_cast<const Options&>(options);
+    auto& it_maker = cast_options.it_maker;
+    auto& schema = cast_options.schema;
+
+    auto io_executor = plan->exec_context()->executor();
+    auto it = it_maker();
+
+    RETURN_NOT_OK(ValidateSchemaSourceNodeInput(io_executor, schema, 
This::kKindName));
+    ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor, 
schema));
+    return plan->EmplaceNode<This>(plan, schema, generator);
+  }
+
+  static arrow::Status ValidateSchemaSourceNodeInput(
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema,
+      const char* kKindName) {
+    if (schema == NULLPTR) {
+      return Status::Invalid(kKindName, " requires schema which is not null");
+    }
+    if (io_executor == NULLPTR) {
+      return Status::Invalid(kKindName, " requires IO-executor which is not 
null");
+    }

Review Comment:
   `default_io_context().executor()` should work.  Or, more directly, 
`io::internal::GetIOThreadPool()`



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -77,6 +78,34 @@ class ARROW_EXPORT TableSourceNodeOptions : public 
ExecNodeOptions {
   int64_t max_batch_size;
 };
 
+/// \brief An extended Source node which accepts a schema
+///
+/// ItMaker is a maker of an iterator of tabular data.
+template <typename ItMaker>
+class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
+ public:
+  SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker)
+      : schema(schema), it_maker(std::move(it_maker)) {}
+
+  // the schema of the record batches from the iterator
+  std::shared_ptr<Schema> schema;
+
+  // maker of an iterator which acts as the data source
+  ItMaker it_maker;
+};
+
+/// \brief An extended Source node which accepts a schema and array-vectors
+using ArrayVectorIteratorMaker = 
std::function<Iterator<std::shared_ptr<ArrayVector>>()>;
+using ArrayVectorSourceNodeOptions = 
SchemaSourceNodeOptions<ArrayVectorIteratorMaker>;
+
+using ExecBatchIteratorMaker = 
std::function<Iterator<std::shared_ptr<ExecBatch>>()>;

Review Comment:
   It might be nice to have just `Iterator<ExecBatch>` and drop the 
`shared_ptr` but I suppose we would need to introduce an "end batch" concept.  
Mostly an observation.  No action required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to