This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c9293039b5 ARROW-17610: [C++] Support additional source types in 
SourceNode (#14207)
c9293039b5 is described below

commit c9293039b5454c99e8a34f8fc3a4602d74874114
Author: rtpsw <[email protected]>
AuthorDate: Mon Nov 21 21:50:03 2022 +0200

    ARROW-17610: [C++] Support additional source types in SourceNode (#14207)
    
    See https://issues.apache.org/jira/browse/ARROW-17610
    
    Authored-by: Yaron Gvili <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/src/arrow/compute/exec/options.h      |  51 +++++++++++
 cpp/src/arrow/compute/exec/plan_test.cc   |  79 +++++++++++++++++
 cpp/src/arrow/compute/exec/source_node.cc | 136 ++++++++++++++++++++++++++++++
 cpp/src/arrow/compute/exec/test_util.cc   |  32 +++++++
 cpp/src/arrow/compute/exec/test_util.h    |  24 ++++++
 5 files changed, 322 insertions(+)

diff --git a/cpp/src/arrow/compute/exec/options.h 
b/cpp/src/arrow/compute/exec/options.h
index ffb9f16983..8600b11348 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -27,12 +27,20 @@
 #include "arrow/compute/api_vector.h"
 #include "arrow/compute/exec.h"
 #include "arrow/compute/exec/expression.h"
+#include "arrow/record_batch.h"
 #include "arrow/result.h"
 #include "arrow/util/async_generator.h"
 #include "arrow/util/async_util.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
+
+namespace internal {
+
+class Executor;
+
+}  // namespace internal
+
 namespace compute {
 
 using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>;
@@ -77,6 +85,49 @@ class ARROW_EXPORT TableSourceNodeOptions : public 
ExecNodeOptions {
   int64_t max_batch_size;
 };
 
+/// \brief An extended Source node which accepts a schema
+///
+/// ItMaker is a maker of an iterator of tabular data.
+template <typename ItMaker>
+class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
+ public:
+  SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
+                          arrow::internal::Executor* io_executor = NULLPTR)
+      : schema(schema), it_maker(std::move(it_maker)), 
io_executor(io_executor) {}
+
+  /// \brief The schema of the record batches from the iterator
+  std::shared_ptr<Schema> schema;
+
+  /// \brief A maker of an iterator which acts as the data source
+  ItMaker it_maker;
+
+  /// \brief The executor to use for scanning the iterator
+  ///
+  /// Defaults to the default I/O executor.
+  arrow::internal::Executor* io_executor;
+};
+
+using ArrayVectorIteratorMaker = 
std::function<Iterator<std::shared_ptr<ArrayVector>>()>;
+/// \brief An extended Source node which accepts a schema and array-vectors
+class ARROW_EXPORT ArrayVectorSourceNodeOptions
+    : public SchemaSourceNodeOptions<ArrayVectorIteratorMaker> {
+  using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
+};
+
+using ExecBatchIteratorMaker = 
std::function<Iterator<std::shared_ptr<ExecBatch>>()>;
+/// \brief An extended Source node which accepts a schema and exec-batches
+class ARROW_EXPORT ExecBatchSourceNodeOptions
+    : public SchemaSourceNodeOptions<ExecBatchIteratorMaker> {
+  using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
+};
+
+using RecordBatchIteratorMaker = 
std::function<Iterator<std::shared_ptr<RecordBatch>>()>;
+/// \brief An extended Source node which accepts a schema and record-batches
+class ARROW_EXPORT RecordBatchSourceNodeOptions
+    : public SchemaSourceNodeOptions<RecordBatchIteratorMaker> {
+  using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
+};
+
 /// \brief Make a node which excludes some rows from batches passed through it
 ///
 /// filter_expression will be evaluated against each batch which is pushed to
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc 
b/cpp/src/arrow/compute/exec/plan_test.cc
index a33337fcfe..6c8d497a1d 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -295,6 +295,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
               Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
 }
 
+template <typename ElementType, typename OptionsType>
+void TestSourceSinkError(
+    std::string source_factory_name,
+    std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
+        to_elements) {
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+  std::shared_ptr<Schema> no_schema;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
+  auto element_it_maker = [&elements]() {
+    return MakeVectorIterator<ElementType>(elements);
+  };
+
+  auto null_executor_options = OptionsType{exp_batches.schema, 
element_it_maker};
+  ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, 
null_executor_options));
+
+  auto null_schema_options = OptionsType{no_schema, element_it_maker};
+  ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, 
null_schema_options),
+              Raises(StatusCode::Invalid, HasSubstr("not null")));
+}
+
+template <typename ElementType, typename OptionsType>
+void TestSourceSink(
+    std::string source_factory_name,
+    std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
+        to_elements) {
+  ASSERT_OK_AND_ASSIGN(auto io_executor, arrow::internal::ThreadPool::Make(1));
+  ExecContext exec_context(default_memory_pool(), io_executor.get());
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+  AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+  auto exp_batches = MakeBasicBatches();
+  ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
+  auto element_it_maker = [&elements]() {
+    return MakeVectorIterator<ElementType>(elements);
+  };
+
+  ASSERT_OK(Declaration::Sequence({
+                                      {source_factory_name,
+                                       OptionsType{exp_batches.schema, 
element_it_maker}},
+                                      {"sink", SinkNodeOptions{&sink_gen}},
+                                  })
+                .AddToPlan(plan.get()));
+
+  ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+              
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSink) {
+  TestSourceSink<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>(
+      "array_vector_source", ToArrayVectors);
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSinkError) {
+  TestSourceSinkError<std::shared_ptr<ArrayVector>, 
ArrayVectorSourceNodeOptions>(
+      "array_vector_source", ToArrayVectors);
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSink) {
+  TestSourceSink<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
+      "exec_batch_source", ToExecBatches);
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSinkError) {
+  TestSourceSinkError<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
+      "exec_batch_source", ToExecBatches);
+}
+
+TEST(ExecPlanExecution, RecordBatchSourceSink) {
+  TestSourceSink<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>(
+      "record_batch_source", ToRecordBatches);
+}
+
+TEST(ExecPlanExecution, RecordBatchSourceSinkError) {
+  TestSourceSinkError<std::shared_ptr<RecordBatch>, 
RecordBatchSourceNodeOptions>(
+      "record_batch_source", ToRecordBatches);
+}
+
 TEST(ExecPlanExecution, SinkNodeBackpressure) {
   std::optional<ExecBatch> batch =
       ExecBatchFromJSON({int32(), boolean()},
diff --git a/cpp/src/arrow/compute/exec/source_node.cc 
b/cpp/src/arrow/compute/exec/source_node.cc
index 59a287b0c4..3fd4c6fd5b 100644
--- a/cpp/src/arrow/compute/exec/source_node.cc
+++ b/cpp/src/arrow/compute/exec/source_node.cc
@@ -26,6 +26,7 @@
 #include "arrow/compute/exec/util.h"
 #include "arrow/compute/exec_internal.h"
 #include "arrow/datum.h"
+#include "arrow/io/util_internal.h"
 #include "arrow/result.h"
 #include "arrow/table.h"
 #include "arrow/util/async_generator.h"
@@ -293,6 +294,138 @@ struct TableSourceNode : public SourceNode {
   }
 };
 
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+  SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+                   arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
+      : SourceNode(plan, schema, generator) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+    const auto& cast_options = checked_cast<const Options&>(options);
+    auto& it_maker = cast_options.it_maker;
+    auto& schema = cast_options.schema;
+    auto io_executor = cast_options.io_executor;
+
+    if (io_executor == NULLPTR) {
+      io_executor = plan->exec_context()->executor();
+    }
+    auto it = it_maker();
+
+    if (schema == NULLPTR) {
+      return Status::Invalid(This::kKindName, " requires schema which is not 
null");
+    }
+    if (io_executor == NULLPTR) {
+      io_executor = io::internal::GetIOThreadPool();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor, 
schema));
+    return plan->EmplaceNode<This>(plan, schema, generator);
+  }
+};
+
+struct RecordBatchSourceNode
+    : public SchemaSourceNode<RecordBatchSourceNode, 
RecordBatchSourceNodeOptions> {
+  using RecordBatchSchemaSourceNode =
+      SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions>;
+
+  using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode;
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    return RecordBatchSchemaSourceNode::Make(plan, inputs, options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+
+  static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
+      Iterator<std::shared_ptr<RecordBatch>>& batch_it,
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema) {
+    auto to_exec_batch =
+        [schema](const std::shared_ptr<RecordBatch>& batch) -> 
std::optional<ExecBatch> {
+      if (batch == NULLPTR || *batch->schema() != *schema) {
+        return std::nullopt;
+      }
+      return std::optional<ExecBatch>(ExecBatch(*batch));
+    };
+    auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+    return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
+  }
+
+  static const char kKindName[];
+};
+
+const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode";
+
+struct ExecBatchSourceNode
+    : public SchemaSourceNode<ExecBatchSourceNode, ExecBatchSourceNodeOptions> 
{
+  using ExecBatchSchemaSourceNode =
+      SchemaSourceNode<ExecBatchSourceNode, ExecBatchSourceNodeOptions>;
+
+  using ExecBatchSchemaSourceNode::ExecBatchSchemaSourceNode;
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    return ExecBatchSchemaSourceNode::Make(plan, inputs, options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+
+  static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
+      Iterator<std::shared_ptr<ExecBatch>>& batch_it,
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema) {
+    auto to_exec_batch =
+        [](const std::shared_ptr<ExecBatch>& batch) -> 
std::optional<ExecBatch> {
+      return batch == NULLPTR ? std::nullopt : 
std::optional<ExecBatch>(*batch);
+    };
+    auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+    return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
+  }
+
+  static const char kKindName[];
+};
+
+const char ExecBatchSourceNode::kKindName[] = "ExecBatchSourceNode";
+
+struct ArrayVectorSourceNode
+    : public SchemaSourceNode<ArrayVectorSourceNode, 
ArrayVectorSourceNodeOptions> {
+  using ArrayVectorSchemaSourceNode =
+      SchemaSourceNode<ArrayVectorSourceNode, ArrayVectorSourceNodeOptions>;
+
+  using ArrayVectorSchemaSourceNode::ArrayVectorSchemaSourceNode;
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    return ArrayVectorSchemaSourceNode::Make(plan, inputs, options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+
+  static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
+      Iterator<std::shared_ptr<ArrayVector>>& arrayvec_it,
+      arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& 
schema) {
+    auto to_exec_batch =
+        [](const std::shared_ptr<ArrayVector>& arrayvec) -> 
std::optional<ExecBatch> {
+      if (arrayvec == NULLPTR || arrayvec->size() == 0) {
+        return std::nullopt;
+      }
+      std::vector<Datum> datumvec;
+      for (const auto& array : *arrayvec) {
+        datumvec.push_back(Datum(array));
+      }
+      return std::optional<ExecBatch>(
+          ExecBatch(std::move(datumvec), (*arrayvec)[0]->length()));
+    };
+    auto exec_batch_it = MakeMapIterator(to_exec_batch, 
std::move(arrayvec_it));
+    return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
+  }
+
+  static const char kKindName[];
+};
+
+const char ArrayVectorSourceNode::kKindName[] = "ArrayVectorSourceNode";
+
 }  // namespace
 
 namespace internal {
@@ -300,6 +433,9 @@ namespace internal {
 void RegisterSourceNode(ExecFactoryRegistry* registry) {
   DCHECK_OK(registry->AddFactory("source", SourceNode::Make));
   DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make));
+  DCHECK_OK(registry->AddFactory("record_batch_source", 
RecordBatchSourceNode::Make));
+  DCHECK_OK(registry->AddFactory("exec_batch_source", 
ExecBatchSourceNode::Make));
+  DCHECK_OK(registry->AddFactory("array_vector_source", 
ArrayVectorSourceNode::Make));
 }
 
 }  // namespace internal
diff --git a/cpp/src/arrow/compute/exec/test_util.cc 
b/cpp/src/arrow/compute/exec/test_util.cc
index 13c07d540c..189310fb93 100644
--- a/cpp/src/arrow/compute/exec/test_util.cc
+++ b/cpp/src/arrow/compute/exec/test_util.cc
@@ -258,6 +258,38 @@ BatchesWithSchema MakeBatchesFromString(const 
std::shared_ptr<Schema>& schema,
   return out_batches;
 }
 
+Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
+    const BatchesWithSchema& batches_with_schema) {
+  std::vector<std::shared_ptr<ArrayVector>> arrayvecs;
+  for (auto batch : batches_with_schema.batches) {
+    ARROW_ASSIGN_OR_RAISE(auto record_batch,
+                          batch.ToRecordBatch(batches_with_schema.schema));
+    
arrayvecs.push_back(std::make_shared<ArrayVector>(record_batch->columns()));
+  }
+  return arrayvecs;
+}
+
+Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
+    const BatchesWithSchema& batches_with_schema) {
+  std::vector<std::shared_ptr<ExecBatch>> exec_batches;
+  for (auto batch : batches_with_schema.batches) {
+    auto exec_batch = std::make_shared<ExecBatch>(batch);
+    exec_batches.push_back(exec_batch);
+  }
+  return exec_batches;
+}
+
+Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
+    const BatchesWithSchema& batches_with_schema) {
+  std::vector<std::shared_ptr<RecordBatch>> record_batches;
+  for (auto batch : batches_with_schema.batches) {
+    ARROW_ASSIGN_OR_RAISE(auto record_batch,
+                          batch.ToRecordBatch(batches_with_schema.schema));
+    record_batches.push_back(record_batch);
+  }
+  return record_batches;
+}
+
 Result<std::shared_ptr<Table>> SortTableOnAllFields(const 
std::shared_ptr<Table>& tab) {
   std::vector<SortKey> sort_keys;
   for (auto&& f : tab->schema()->fields()) {
diff --git a/cpp/src/arrow/compute/exec/test_util.h 
b/cpp/src/arrow/compute/exec/test_util.h
index ae7eac61e9..2984ef6a56 100644
--- a/cpp/src/arrow/compute/exec/test_util.h
+++ b/cpp/src/arrow/compute/exec/test_util.h
@@ -113,6 +113,30 @@ BatchesWithSchema MakeBatchesFromString(const 
std::shared_ptr<Schema>& schema,
                                         const std::vector<std::string_view>& 
json_strings,
                                         int multiplicity = 1);
 
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
+    const BatchesWithSchema& batches_with_schema);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
+    const BatchesWithSchema& batches);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
+    const BatchesWithSchema& batches);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
+    const BatchesWithSchema& batches_with_schema);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
+    const BatchesWithSchema& batches);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
+    const BatchesWithSchema& batches);
+
 ARROW_TESTING_EXPORT
 Result<std::shared_ptr<Table>> SortTableOnAllFields(const 
std::shared_ptr<Table>& tab);
 

Reply via email to