This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c9293039b5 ARROW-17610: [C++] Support additional source types in
SourceNode (#14207)
c9293039b5 is described below
commit c9293039b5454c99e8a34f8fc3a4602d74874114
Author: rtpsw <[email protected]>
AuthorDate: Mon Nov 21 21:50:03 2022 +0200
ARROW-17610: [C++] Support additional source types in SourceNode (#14207)
See https://issues.apache.org/jira/browse/ARROW-17610
Authored-by: Yaron Gvili <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/src/arrow/compute/exec/options.h | 51 +++++++++++
cpp/src/arrow/compute/exec/plan_test.cc | 79 +++++++++++++++++
cpp/src/arrow/compute/exec/source_node.cc | 136 ++++++++++++++++++++++++++++++
cpp/src/arrow/compute/exec/test_util.cc | 32 +++++++
cpp/src/arrow/compute/exec/test_util.h | 24 ++++++
5 files changed, 322 insertions(+)
diff --git a/cpp/src/arrow/compute/exec/options.h
b/cpp/src/arrow/compute/exec/options.h
index ffb9f16983..8600b11348 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -27,12 +27,20 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/expression.h"
+#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"
#include "arrow/util/visibility.h"
namespace arrow {
+
+namespace internal {
+
+class Executor;
+
+} // namespace internal
+
namespace compute {
using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>;
@@ -77,6 +85,49 @@ class ARROW_EXPORT TableSourceNodeOptions : public
ExecNodeOptions {
int64_t max_batch_size;
};
+/// \brief An extended Source node which accepts a schema
+///
+/// ItMaker is a maker of an iterator of tabular data.
+template <typename ItMaker>
+class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
+ public:
+ SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
+ arrow::internal::Executor* io_executor = NULLPTR)
+ : schema(schema), it_maker(std::move(it_maker)),
io_executor(io_executor) {}
+
+ /// \brief The schema of the record batches from the iterator
+ std::shared_ptr<Schema> schema;
+
+ /// \brief A maker of an iterator which acts as the data source
+ ItMaker it_maker;
+
+ /// \brief The executor to use for scanning the iterator
+ ///
+ /// Defaults to the default I/O executor.
+ arrow::internal::Executor* io_executor;
+};
+
+using ArrayVectorIteratorMaker =
std::function<Iterator<std::shared_ptr<ArrayVector>>()>;
+/// \brief An extended Source node which accepts a schema and array-vectors
+class ARROW_EXPORT ArrayVectorSourceNodeOptions
+ : public SchemaSourceNodeOptions<ArrayVectorIteratorMaker> {
+ using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
+};
+
+using ExecBatchIteratorMaker =
std::function<Iterator<std::shared_ptr<ExecBatch>>()>;
+/// \brief An extended Source node which accepts a schema and exec-batches
+class ARROW_EXPORT ExecBatchSourceNodeOptions
+ : public SchemaSourceNodeOptions<ExecBatchIteratorMaker> {
+ using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
+};
+
+using RecordBatchIteratorMaker =
std::function<Iterator<std::shared_ptr<RecordBatch>>()>;
+/// \brief An extended Source node which accepts a schema and record-batches
+class ARROW_EXPORT RecordBatchSourceNodeOptions
+ : public SchemaSourceNodeOptions<RecordBatchIteratorMaker> {
+ using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
+};
+
/// \brief Make a node which excludes some rows from batches passed through it
///
/// filter_expression will be evaluated against each batch which is pushed to
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc
b/cpp/src/arrow/compute/exec/plan_test.cc
index a33337fcfe..6c8d497a1d 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -295,6 +295,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
}
+template <typename ElementType, typename OptionsType>
+void TestSourceSinkError(
+ std::string source_factory_name,
+ std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
+ to_elements) {
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ std::shared_ptr<Schema> no_schema;
+
+ auto exp_batches = MakeBasicBatches();
+ ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
+ auto element_it_maker = [&elements]() {
+ return MakeVectorIterator<ElementType>(elements);
+ };
+
+ auto null_executor_options = OptionsType{exp_batches.schema,
element_it_maker};
+ ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {},
null_executor_options));
+
+ auto null_schema_options = OptionsType{no_schema, element_it_maker};
+ ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {},
null_schema_options),
+ Raises(StatusCode::Invalid, HasSubstr("not null")));
+}
+
+template <typename ElementType, typename OptionsType>
+void TestSourceSink(
+ std::string source_factory_name,
+ std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
+ to_elements) {
+ ASSERT_OK_AND_ASSIGN(auto io_executor, arrow::internal::ThreadPool::Make(1));
+ ExecContext exec_context(default_memory_pool(), io_executor.get());
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
+ AsyncGenerator<std::optional<ExecBatch>> sink_gen;
+
+ auto exp_batches = MakeBasicBatches();
+ ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
+ auto element_it_maker = [&elements]() {
+ return MakeVectorIterator<ElementType>(elements);
+ };
+
+ ASSERT_OK(Declaration::Sequence({
+ {source_factory_name,
+ OptionsType{exp_batches.schema,
element_it_maker}},
+ {"sink", SinkNodeOptions{&sink_gen}},
+ })
+ .AddToPlan(plan.get()));
+
+ ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
+
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSink) {
+ TestSourceSink<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>(
+ "array_vector_source", ToArrayVectors);
+}
+
+TEST(ExecPlanExecution, ArrayVectorSourceSinkError) {
+ TestSourceSinkError<std::shared_ptr<ArrayVector>,
ArrayVectorSourceNodeOptions>(
+ "array_vector_source", ToArrayVectors);
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSink) {
+ TestSourceSink<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
+ "exec_batch_source", ToExecBatches);
+}
+
+TEST(ExecPlanExecution, ExecBatchSourceSinkError) {
+ TestSourceSinkError<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
+ "exec_batch_source", ToExecBatches);
+}
+
+TEST(ExecPlanExecution, RecordBatchSourceSink) {
+ TestSourceSink<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>(
+ "record_batch_source", ToRecordBatches);
+}
+
+TEST(ExecPlanExecution, RecordBatchSourceSinkError) {
+ TestSourceSinkError<std::shared_ptr<RecordBatch>,
RecordBatchSourceNodeOptions>(
+ "record_batch_source", ToRecordBatches);
+}
+
TEST(ExecPlanExecution, SinkNodeBackpressure) {
std::optional<ExecBatch> batch =
ExecBatchFromJSON({int32(), boolean()},
diff --git a/cpp/src/arrow/compute/exec/source_node.cc
b/cpp/src/arrow/compute/exec/source_node.cc
index 59a287b0c4..3fd4c6fd5b 100644
--- a/cpp/src/arrow/compute/exec/source_node.cc
+++ b/cpp/src/arrow/compute/exec/source_node.cc
@@ -26,6 +26,7 @@
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/datum.h"
+#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
@@ -293,6 +294,138 @@ struct TableSourceNode : public SourceNode {
}
};
+template <typename This, typename Options>
+struct SchemaSourceNode : public SourceNode {
+ SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
+ arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
+ : SourceNode(plan, schema, generator) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
+ const auto& cast_options = checked_cast<const Options&>(options);
+ auto& it_maker = cast_options.it_maker;
+ auto& schema = cast_options.schema;
+ auto io_executor = cast_options.io_executor;
+
+ if (io_executor == NULLPTR) {
+ io_executor = plan->exec_context()->executor();
+ }
+ auto it = it_maker();
+
+ if (schema == NULLPTR) {
+ return Status::Invalid(This::kKindName, " requires schema which is not
null");
+ }
+ if (io_executor == NULLPTR) {
+ io_executor = io::internal::GetIOThreadPool();
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor,
schema));
+ return plan->EmplaceNode<This>(plan, schema, generator);
+ }
+};
+
+struct RecordBatchSourceNode
+ : public SchemaSourceNode<RecordBatchSourceNode,
RecordBatchSourceNodeOptions> {
+ using RecordBatchSchemaSourceNode =
+ SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions>;
+
+ using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode;
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ return RecordBatchSchemaSourceNode::Make(plan, inputs, options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+
+ static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
+ Iterator<std::shared_ptr<RecordBatch>>& batch_it,
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema) {
+ auto to_exec_batch =
+ [schema](const std::shared_ptr<RecordBatch>& batch) ->
std::optional<ExecBatch> {
+ if (batch == NULLPTR || *batch->schema() != *schema) {
+ return std::nullopt;
+ }
+ return std::optional<ExecBatch>(ExecBatch(*batch));
+ };
+ auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+ return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
+ }
+
+ static const char kKindName[];
+};
+
+const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode";
+
+struct ExecBatchSourceNode
+ : public SchemaSourceNode<ExecBatchSourceNode, ExecBatchSourceNodeOptions>
{
+ using ExecBatchSchemaSourceNode =
+ SchemaSourceNode<ExecBatchSourceNode, ExecBatchSourceNodeOptions>;
+
+ using ExecBatchSchemaSourceNode::ExecBatchSchemaSourceNode;
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ return ExecBatchSchemaSourceNode::Make(plan, inputs, options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+
+ static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
+ Iterator<std::shared_ptr<ExecBatch>>& batch_it,
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema) {
+ auto to_exec_batch =
+ [](const std::shared_ptr<ExecBatch>& batch) ->
std::optional<ExecBatch> {
+ return batch == NULLPTR ? std::nullopt :
std::optional<ExecBatch>(*batch);
+ };
+ auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
+ return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
+ }
+
+ static const char kKindName[];
+};
+
+const char ExecBatchSourceNode::kKindName[] = "ExecBatchSourceNode";
+
+struct ArrayVectorSourceNode
+ : public SchemaSourceNode<ArrayVectorSourceNode,
ArrayVectorSourceNodeOptions> {
+ using ArrayVectorSchemaSourceNode =
+ SchemaSourceNode<ArrayVectorSourceNode, ArrayVectorSourceNodeOptions>;
+
+ using ArrayVectorSchemaSourceNode::ArrayVectorSchemaSourceNode;
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ return ArrayVectorSchemaSourceNode::Make(plan, inputs, options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+
+ static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
+ Iterator<std::shared_ptr<ArrayVector>>& arrayvec_it,
+ arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>&
schema) {
+ auto to_exec_batch =
+ [](const std::shared_ptr<ArrayVector>& arrayvec) ->
std::optional<ExecBatch> {
+ if (arrayvec == NULLPTR || arrayvec->size() == 0) {
+ return std::nullopt;
+ }
+ std::vector<Datum> datumvec;
+ for (const auto& array : *arrayvec) {
+ datumvec.push_back(Datum(array));
+ }
+ return std::optional<ExecBatch>(
+ ExecBatch(std::move(datumvec), (*arrayvec)[0]->length()));
+ };
+ auto exec_batch_it = MakeMapIterator(to_exec_batch,
std::move(arrayvec_it));
+ return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
+ }
+
+ static const char kKindName[];
+};
+
+const char ArrayVectorSourceNode::kKindName[] = "ArrayVectorSourceNode";
+
} // namespace
namespace internal {
@@ -300,6 +433,9 @@ namespace internal {
void RegisterSourceNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("source", SourceNode::Make));
DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make));
+ DCHECK_OK(registry->AddFactory("record_batch_source",
RecordBatchSourceNode::Make));
+ DCHECK_OK(registry->AddFactory("exec_batch_source",
ExecBatchSourceNode::Make));
+ DCHECK_OK(registry->AddFactory("array_vector_source",
ArrayVectorSourceNode::Make));
}
} // namespace internal
diff --git a/cpp/src/arrow/compute/exec/test_util.cc
b/cpp/src/arrow/compute/exec/test_util.cc
index 13c07d540c..189310fb93 100644
--- a/cpp/src/arrow/compute/exec/test_util.cc
+++ b/cpp/src/arrow/compute/exec/test_util.cc
@@ -258,6 +258,38 @@ BatchesWithSchema MakeBatchesFromString(const
std::shared_ptr<Schema>& schema,
return out_batches;
}
+Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
+ const BatchesWithSchema& batches_with_schema) {
+ std::vector<std::shared_ptr<ArrayVector>> arrayvecs;
+ for (auto batch : batches_with_schema.batches) {
+ ARROW_ASSIGN_OR_RAISE(auto record_batch,
+ batch.ToRecordBatch(batches_with_schema.schema));
+
arrayvecs.push_back(std::make_shared<ArrayVector>(record_batch->columns()));
+ }
+ return arrayvecs;
+}
+
+Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
+ const BatchesWithSchema& batches_with_schema) {
+ std::vector<std::shared_ptr<ExecBatch>> exec_batches;
+ for (auto batch : batches_with_schema.batches) {
+ auto exec_batch = std::make_shared<ExecBatch>(batch);
+ exec_batches.push_back(exec_batch);
+ }
+ return exec_batches;
+}
+
+Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
+ const BatchesWithSchema& batches_with_schema) {
+ std::vector<std::shared_ptr<RecordBatch>> record_batches;
+ for (auto batch : batches_with_schema.batches) {
+ ARROW_ASSIGN_OR_RAISE(auto record_batch,
+ batch.ToRecordBatch(batches_with_schema.schema));
+ record_batches.push_back(record_batch);
+ }
+ return record_batches;
+}
+
Result<std::shared_ptr<Table>> SortTableOnAllFields(const
std::shared_ptr<Table>& tab) {
std::vector<SortKey> sort_keys;
for (auto&& f : tab->schema()->fields()) {
diff --git a/cpp/src/arrow/compute/exec/test_util.h
b/cpp/src/arrow/compute/exec/test_util.h
index ae7eac61e9..2984ef6a56 100644
--- a/cpp/src/arrow/compute/exec/test_util.h
+++ b/cpp/src/arrow/compute/exec/test_util.h
@@ -113,6 +113,30 @@ BatchesWithSchema MakeBatchesFromString(const
std::shared_ptr<Schema>& schema,
const std::vector<std::string_view>&
json_strings,
int multiplicity = 1);
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
+ const BatchesWithSchema& batches_with_schema);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
+ const BatchesWithSchema& batches);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
+ const BatchesWithSchema& batches);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
+ const BatchesWithSchema& batches_with_schema);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
+ const BatchesWithSchema& batches);
+
+ARROW_TESTING_EXPORT
+Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
+ const BatchesWithSchema& batches);
+
ARROW_TESTING_EXPORT
Result<std::shared_ptr<Table>> SortTableOnAllFields(const
std::shared_ptr<Table>& tab);