vibhatha commented on a change in pull request #12033:
URL: https://github.com/apache/arrow/pull/12033#discussion_r781945495



##########
File path: cpp/examples/arrow/execution_plan_documentation_examples.cc
##########
@@ -0,0 +1,1125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <utility>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+
+#include <arrow/compute/api.h>
+#include <arrow/compute/api_scalar.h>
+#include <arrow/compute/api_vector.h>
+#include <arrow/compute/cast.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/ir_consumer.h>
+#include <arrow/compute/exec/test_util.h>
+
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/dataset_writer.h>
+#include <arrow/dataset/file_base.h>
+#include <arrow/dataset/file_parquet.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/slow.h>
+#include <arrow/io/stdio.h>
+#include <arrow/io/transform.h>
+
+#include "arrow/json/api.h"
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <arrow/table.h>
+
+#include <arrow/ipc/api.h>
+
+#include <arrow/util/future.h>
+#include <arrow/util/range.h>
+#include <arrow/util/thread_pool.h>
+#include <arrow/util/vector.h>
+
+// Demonstrate various operators in Arrow Streaming Execution Engine
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+constexpr char kSep[] = "******";
+
+#define PRINT_BLOCK(msg)                                               \
+  std::cout << "" << std::endl;                                        \
+  std::cout << "\t" << kSep << " " << msg << " " << kSep << std::endl; \
+  std::cout << "" << std::endl;
+
+namespace cp = ::arrow::compute;
+
+std::string GetDataAsCsvString() {
+  std::string data_str = R"csv(a,b
+1,null
+2,true
+null,true
+3,false
+null,true
+4,false
+5,null
+6,false
+7,false
+8,true
+)csv";
+  return data_str;
+}
+
+template <typename TYPE,
+          typename = typename 
std::enable_if<arrow::is_number_type<TYPE>::value |
+                                             
arrow::is_boolean_type<TYPE>::value |
+                                             
arrow::is_temporal_type<TYPE>::value>::type>
+arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
+    const std::vector<typename TYPE::c_type>& values) {
+  using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
+  using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
+  ARROW_BUILDER_TYPE builder;
+  ABORT_ON_FAILURE(builder.Reserve(values.size()));
+  std::shared_ptr<ARROW_ARRAY_TYPE> array;
+  ABORT_ON_FAILURE(builder.AppendValues(values));
+  ABORT_ON_FAILURE(builder.Finish(&array));
+  arrow::Result<std::shared_ptr<ARROW_ARRAY_TYPE>> result(std::move(array));
+  return result;
+}
+
+template <class TYPE>
+arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
+    const std::vector<std::string>& values) {
+  using ARROW_ARRAY_TYPE = typename arrow::TypeTraits<TYPE>::ArrayType;
+  using ARROW_BUILDER_TYPE = typename arrow::TypeTraits<TYPE>::BuilderType;
+  ARROW_BUILDER_TYPE builder;
+  ABORT_ON_FAILURE(builder.Reserve(values.size()));
+  std::shared_ptr<ARROW_ARRAY_TYPE> array;
+  ABORT_ON_FAILURE(builder.AppendValues(values));
+  ABORT_ON_FAILURE(builder.Finish(&array));
+  arrow::Result<std::shared_ptr<ARROW_ARRAY_TYPE>> result(std::move(array));
+  return result;
+}
+
+arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
+    const arrow::ArrayVector array_vector, const arrow::FieldVector& 
field_vector) {
+  std::shared_ptr<arrow::RecordBatch> record_batch;
+  ARROW_ASSIGN_OR_RAISE(auto struct_result,
+                        arrow::StructArray::Make(array_vector, field_vector));
+  return record_batch->FromStructArray(struct_result);
+}
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> 
CreateDataSetFromCSVData() {
+  arrow::io::IOContext io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = GetDataAsCsvString();
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, 
read_options,
+                                                      parse_options, 
convert_options));
+
+  std::shared_ptr<arrow::csv::TableReader> reader = table_reader;
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> 
result(std::move(ds));
+  return result;
+}
+
+// (Doc section: Materialize)
+cp::Expression Materialize(std::vector<std::string> names,
+                           bool include_aug_fields = false) {
+  if (include_aug_fields) {
+    for (auto aug_name : {"__fragment_index", "__batch_index", 
"__last_in_fragment"}) {
+      names.emplace_back(aug_name);
+    }
+  }
+
+  std::vector<cp::Expression> exprs;
+  for (const auto& name : names) {
+    exprs.push_back(cp::field_ref(name));
+  }
+
+  return cp::project(exprs, names);
+}
+// (Doc section: Materialize)
+
+// (Doc section: Scan Example)
+/**
+ * \brief
+ * Scan-Sink
+ * This example shows how scan operation can be applied on a dataset.
+ * There are operations that can be applied on the scan (project, filter)
+ * and the input data can be processed. THe output is obtained as a table
+ * via the sink node.
+ * \param exec_context : execution context
+ * \return arrow::Status
+ */
+arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
+  // Execution plan created
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset,
+                        CreateDataSetFromCSVData());
+
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  // sync scanning is not supported by ScanNode
+  options->use_async = true;
+  options->projection = Materialize({});  // create empty projection
+
+  // construct the scan node
+  cp::ExecNode* scan;
+  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
+
+  ARROW_ASSIGN_OR_RAISE(scan,
+                        cp::MakeExecNode("scan", plan.get(), {}, 
scan_node_options));
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), 
{scan},
+                                                      
cp::SinkNodeOptions{&sink_gen}));
+
+  // // translate sink_gen (async) to sink_reader (sync)
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = 
cp::MakeGeneratorReader(
+      dataset->schema(), std::move(sink_gen), exec_context.memory_pool());
+
+  // validate the ExecPlan
+  ARROW_RETURN_NOT_OK(plan->Validate());
+  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
+  // // start the ExecPlan
+  ARROW_RETURN_NOT_OK(plan->StartProducing());
+
+  // // collect sink_reader into a Table
+  std::shared_ptr<arrow::Table> response_table;
+
+  ARROW_ASSIGN_OR_RAISE(response_table,
+                        
arrow::Table::FromRecordBatchReader(sink_reader.get()));
+
+  std::cout << "Results : " << response_table->ToString() << std::endl;
+
+  // // stop producing
+  plan->StopProducing();
+  // // plan mark finished
+  auto futures = plan->finished();
+  ARROW_RETURN_NOT_OK(futures.status());
+  futures.Wait();
+  return arrow::Status::OK();
+}
+// (Doc section: Scan Example)
+
+arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
+    const arrow::FieldVector& field_vector, const arrow::ArrayVector& 
array_vector) {
+  std::shared_ptr<arrow::RecordBatch> record_batch;
+  ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, 
field_vector));
+  cp::ExecBatch batch{*res_batch};
+  arrow::Result<cp::ExecBatch> result(batch);
+  return result;
+}
+
+// (Doc section: BatchesWithSchema Definition)
+struct BatchesWithSchema {
+  std::vector<cp::ExecBatch> batches;
+  std::shared_ptr<arrow::Schema> schema;
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() const {
+    auto opt_batches = ::arrow::internal::MapVector(
+        [](cp::ExecBatch batch) { return 
arrow::util::make_optional(std::move(batch)); },
+        batches);
+    arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen;
+    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+};
+// (Doc section: BatchesWithSchema Definition)
+
+// (Doc section: MakeBasicBatches Definition)
+arrow::Result<BatchesWithSchema> MakeBasicBatches() {
+  BatchesWithSchema out;
+  auto field_vector = {arrow::field("a", arrow::int32()),
+                       arrow::field("b", arrow::boolean())};
+  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 
4}));
+  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 
6, 7}));
+  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 
9, 10}));
+
+  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
+                        GetArrayDataSample<arrow::BooleanType>({false, true}));
+  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
+                        GetArrayDataSample<arrow::BooleanType>({true, false, 
true}));
+  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
+                        GetArrayDataSample<arrow::BooleanType>({false, true, 
false}));
+
+  ARROW_ASSIGN_OR_RAISE(auto b1,
+                        GetExecBatchFromVectors(field_vector, {b1_int, 
b1_bool}));
+  ARROW_ASSIGN_OR_RAISE(auto b2,
+                        GetExecBatchFromVectors(field_vector, {b2_int, 
b2_bool}));
+  ARROW_ASSIGN_OR_RAISE(auto b3,
+                        GetExecBatchFromVectors(field_vector, {b3_int, 
b3_bool}));
+
+  out.batches = {b1, b2, b3};
+  out.schema = arrow::schema(field_vector);
+  arrow::Result<BatchesWithSchema> result(std::move(out));
+  return result;
+}
+// (Doc section: MakeBasicBatches Definition)
+
+arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
+  BatchesWithSchema out;
+  auto field = arrow::field("a", arrow::int32());
+  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 
3, 0, 2}));
+  ARROW_ASSIGN_OR_RAISE(auto b2_int,
+                        GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 
12}));
+  ARROW_ASSIGN_OR_RAISE(auto b3_int,
+                        GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 
121}));
+  ARROW_ASSIGN_OR_RAISE(auto b4_int,
+                        GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 
34}));
+  ARROW_ASSIGN_OR_RAISE(auto b5_int,
+                        GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
+  ARROW_ASSIGN_OR_RAISE(auto b6_int,
+                        GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 
12}));
+  ARROW_ASSIGN_OR_RAISE(auto b7_int,
+                        GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 
11}));
+  ARROW_ASSIGN_OR_RAISE(auto b8_int,
+                        GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
+
+  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
+  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
+  ARROW_ASSIGN_OR_RAISE(auto b3,
+                        GetExecBatchFromVectors({field, field}, {b3_int, 
b8_int}));
+  ARROW_ASSIGN_OR_RAISE(auto b4,
+                        GetExecBatchFromVectors({field, field, field, field},
+                                                {b4_int, b5_int, b6_int, 
b7_int}));
+  out.batches = {b1, b2, b3, b4};
+  out.schema = arrow::schema({field});
+  arrow::Result<BatchesWithSchema> result(std::move(out));
+  return result;
+}
+
+arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
+  BatchesWithSchema out;
+  auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", 
arrow::utf8())};
+  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 
7, 3}));
+  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, 
-1, 3}));
+  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 
3, -8}));
+  ARROW_ASSIGN_OR_RAISE(auto b1_str, 
GetBinaryArrayDataSample<arrow::StringType>(
+                                         {"alpha", "beta", "alpha"}));
+  ARROW_ASSIGN_OR_RAISE(auto b2_str, 
GetBinaryArrayDataSample<arrow::StringType>(
+                                         {"alpha", "gamma", "alpha"}));
+  ARROW_ASSIGN_OR_RAISE(auto b3_str, 
GetBinaryArrayDataSample<arrow::StringType>(
+                                         {"gamma", "beta", "alpha"}));
+  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, 
b1_str}));
+  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, 
b2_str}));
+  ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, 
b3_str}));
+  out.batches = {b1, b2, b3};
+
+  size_t batch_count = out.batches.size();
+  for (int repeat = 1; repeat < multiplicity; ++repeat) {
+    for (size_t i = 0; i < batch_count; ++i) {
+      out.batches.push_back(out.batches[i]);
+    }
+  }
+
+  out.schema = arrow::schema(fields);
+  arrow::Result<BatchesWithSchema> result(out);
+  return result;
+}
+
+// (Doc section: Source Example)
+/**
+ * \brief
+ * Source-Sink Example
+ * This example shows how a source and sink can be used
+ * in an execution plan. This includes source node receiving data
+ * and the sink node emits the data as an output represented in
+ * a table.
+ * \param exec_context : execution context
+ * \return arrow::Status
+ */
+arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, 
basic_data.gen()};
+
+  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
+                        cp::MakeExecNode("source", plan.get(), {}, 
source_node_options));
+
+  ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), 
{source},
+                                                      
cp::SinkNodeOptions{&sink_gen}));
+
+  // // // translate sink_gen (async) to sink_reader (sync)
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = 
cp::MakeGeneratorReader(
+      basic_data.schema, std::move(sink_gen), exec_context.memory_pool());
+
+  // // validate the ExecPlan
+  ABORT_ON_FAILURE(plan->Validate());
+  std::cout << "Exec Plan Created: " << plan->ToString() << std::endl;
+  // // // start the ExecPlan
+  ARROW_RETURN_NOT_OK(plan->StartProducing());
+
+  // // collect sink_reader into a Table
+  std::shared_ptr<arrow::Table> response_table;
+
+  ARROW_ASSIGN_OR_RAISE(response_table,
+                        
arrow::Table::FromRecordBatchReader(sink_reader.get()));
+
+  std::cout << "Results : " << response_table->ToString() << std::endl;
+
+  // // plan stop producing
+  plan->StopProducing();
+  // // plan mark finished
+  auto futures = plan->finished();
+  ARROW_RETURN_NOT_OK(futures.status());
+  futures.Wait();
+
+  return arrow::Status::OK();
+}
+// (Doc section: Source Example)
+
+// (Doc section: Filter Example)
+/**
+ * \brief
+ * Source-Filter-Sink
+ * This example shows how a filter can be used in an execution plan,
+ * along with the source and sink operations. The output from the
+ * exeuction plan is obtained as a table via the sink node.
+ * \param exec_context : execution context
+ * \return arrow::Status
+ */
+arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset,
+                        CreateDataSetFromCSVData());
+
+  auto options = std::make_shared<arrow::dataset::ScanOptions>();
+  // sync scanning is not supported by ScanNode
+  options->use_async = true;

Review comment:
       Updated and removed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to