vibhatha commented on a change in pull request #12033:
URL: https://github.com/apache/arrow/pull/12033#discussion_r781989744
##########
File path: docs/source/cpp/streaming_execution.rst
##########
@@ -305,3 +305,451 @@ Datasets may be scanned multiple times; just make
multiple scan
nodes from that dataset. (Useful for a self-join, for example.)
Note that producing two scan nodes like this will perform all
reads and decodes twice.
+
+Constructing ``ExecNode`` using Options
+=======================================
+
+Using the execution plan we can construct various queries.
+To construct such queries, we have provided a set of building blocks
+referred to as :class:`ExecNode` s. These nodes provide the ability to
+construct operations like filtering, projection, join, etc.
+
+This is the list of operations associated with the execution plan;
+
+.. list-table:: Operations and Options
+ :widths: 50 50
+ :header-rows: 1
+
+ * - Operation
+ - Options
+ * - ``source``
+ - :class:`arrow::compute::SourceNodeOptions`
+ * - ``filter``
+ - :class:`arrow::compute::FilterNodeOptions`
+ * - ``project``
+ - :class:`arrow::compute::ProjectNodeOptions`
+ * - ``aggregate``
+ - :class:`arrow::compute::ScalarAggregateOptions`
+ * - ``sink``
+ - :class:`arrow::compute::SinkNodeOptions`
+ * - ``consuming_sink``
+ - :class:`arrow::compute::ConsumingSinkNodeOptions`
+ * - ``order_by_sink``
+ - :class:`arrow::compute::OrderBySinkNodeOptions`
+ * - ``select_k_sink``
+ - :class:`arrow::compute::SelectKSinkNodeOptions`
+ * - ``scan``
+ - :class:`arrow::compute::ScanNodeOptions`
+ * - ``hash_join``
+ - :class:`arrow::compute::HashJoinNodeOptions`
+ * - ``write``
+ - :class:`arrow::dataset::WriteNodeOptions`
+ * - ``union``
+ - N/A
+
+
+.. _stream_execution_source_docs:
+
+``source``
+----------
+
+`source` operation can be considered as an entry point to create a streaming
execution plan.
+A source node can be constructed as follows.
+:class:`arrow::compute::SourceNodeOptions` are used to create the ``source``
operation.
+The :class:`Schema` of the data passing through and a function to generate
data
+``arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>>``
+are required to create this option. Additionally, when using `source`
operator,
+the data scanning operations like filter and project may need to be applied
+in a later part of the execution plan.
+
+Struct to hold the data generator definition;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: BatchesWithSchema Definition)
+ :end-before: (Doc section: BatchesWithSchema Definition)
+ :linenos:
+ :lineno-match:
+
+Generating Batches for computation;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: MakeBasicBatches Definition)
+ :end-before: (Doc section: MakeBasicBatches Definition)
+ :linenos:
+ :lineno-match:
+
+Example of using ``source`` (usage of sink is explained in detail in
:ref:`sink<stream_execution_sink_docs>`);
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: Source Example)
+ :end-before: (Doc section: Source Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_filter_docs:
+
+``filter``
+----------
+
+``filter`` operation as the name suggests, provides an option to define a data
filtering
+criteria. It keeps only rows matching a given expression.
+Filters can be written using :class:`arrow::compute::Expression`.
+For example, if we wish to keep rows of column ``b`` greater than 3,
+then we can use the following expression::, can be written using
+:class:`arrow::compute::FilterNodeOptions` as follows::
+
+ // a > 3
+ arrow::compute::Expression filter_opt = arrow::compute::greater(
+ arrow::compute::field_ref("a"),
+ arrow::compute::literal(3));
+
+Using this option, the filter node can be constructed as follows::
+
+ // creating filter node
+ arrow::compute::ExecNode* filter;
+ ARROW_ASSIGN_OR_RAISE(filter, arrow::compute::MakeExecNode("filter",
+ // plan
+ plan.get(),
+ // previous node
+ {scan},
+ //filter node options
+ arrow::compute::FilterNodeOptions{filter_opt}));
+
+Filter Example;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: Filter Example)
+ :end-before: (Doc section: Filter Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_project_docs:
+
+``project``
+-----------
+
+``project`` operation rearranges, deletes, transforms, and creates columns.
+Each output column is computed by evaluating an expression
+against the source record batch. This is exposed via
+:class:`arrow::compute::ProjectNodeOptions` class which requires,
+a :class:`arrow::compute::Expression`, names for the output columns (if names
are not
+provided, the string representations of exprs will be used).
+
+Sample Expression for projection::
+
+ // a * 2 (multiply values in a column by 2)
+ arrow::compute::Expression a_times_2 = arrow::compute::call("multiply",
+ {arrow::compute::field_ref("a"), arrow::compute::literal(2)});
+
+
+Creating a project node::
+
+ arrow::compute::ExecNode* project;
+ ARROW_ASSIGN_OR_RAISE(project,
+ arrow::compute::MakeExecNode("project",
+ // plan
+ plan.get(),
+ // previous node
+ {scan},
+ // project node options
+ arrow::compute::ProjectNodeOptions{{a_times_2}}));
+
+Project Example;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: Project Example)
+ :end-before: (Doc section: Project Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_aggregate_docs:
+
+``aggregate``
+-------------
+
+``aggregate`` operation provides various data aggregation options.
+The :class:`arrow::compute::AggregateNodeOptions` is used to
+define the aggregation criterion. These options can be
+selected from :ref:`aggregation options <aggregation-option-list>`.
+
+Example::
+
+ arrow::compute::IndexOptions index_options(arrow::MakeScalar("1"));
+
+An example for creating an aggregate node::
+
+ arrow::compute::CountOptions
options(arrow::compute::CountOptions::ONLY_VALID);
+
+ auto aggregate_options = arrow::compute::AggregateNodeOptions{
+ /*aggregates=*/{{"hash_count", &options}},
+ /*targets=*/{"a"},
+ /*names=*/{"count(a)"},
+ /*keys=*/{"b"}};
+
+ ARROW_ASSIGN_OR_RAISE(cp::ExecNode * aggregate,
+ cp::MakeExecNode("aggregate", plan.get(), {source},
+ aggregate_options));
+
+Aggregate example;
+
+Filter Example;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: Aggregate Example)
+ :end-before: (Doc section: Aggregate Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_sink_docs:
+
+``sink``
+--------
+
+``sink`` operation can be considered as the option providing output or final
node of an streaming
+execution definition. :class:`arrow::compute::SinkNodeOptions` interface is
used to pass
+the required options. Requires
+``arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>>* generator``
+and ``arrow::util::BackpressureOptions backpressure``.
+An execution plan should only have one "terminal" node (one sink node).
+
+Example::
+
+ arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+ arrow::compute::ExecNode* sink;
+
+ ARROW_ASSIGN_OR_RAISE(sink, arrow::compute::MakeExecNode("sink", plan.get(),
{source},
+
arrow::compute::SinkNodeOptions{&sink_gen}));
+
+As a part of the Source Example, the Sink operation is also included;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: Source Example)
+ :end-before: (Doc section: Source Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_consuming_sink_docs:
+
+``consuming_sink``
+------------------
+
+``consuming_sink`` operator is a sink operation containing consuming operation
within the
+execution plan (i.e. the exec plan should not complete until the consumption
has completed).
+
+Example::
+
+ // define a Custom SinkNodeConsumer
+ std::atomic<uint32_t> batches_seen{0};
+ arrow::Future<> finish = arrow::Future<>::Make();
+ struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
+
+ CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen,
arrow::Future<>finish):
+ batches_seen(batches_seen), finish(std::move(finish)) {}
+ // Consumption logic can be written here
+ arrow::Status Consume(cp::ExecBatch batch) override {
+ // data can be consumed in the expected way
+ // transfer to another system or just do some work
+ // and write to disk
+ (*batches_seen)++;
+ return arrow::Status::OK();
+ }
+
+ arrow::Future<> Finish() override { return finish; }
+
+ std::atomic<uint32_t> *batches_seen;
+ arrow::Future<> finish;
+
+ };
+
+ std::shared_ptr<CustomSinkNodeConsumer> consumer =
+ std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
+
+ arrow::compute::ExecNode *consuming_sink;
+
+ ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink",
plan.get(),
+ {source}, cp::ConsumingSinkNodeOptions(consumer)));
+
+
+Consuming-Sink Example;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: ConsumingSink Example)
+ :end-before: (Doc section: ConsumingSink Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_order_by_sink_docs:
+
+``order_by_sink``
+-----------------
+
+``order_by_sink`` operation is an extension to the ``sink`` operation.
+This operation provides the ability to guarantee the ordering of the
+stream by providing the :class:`arrow::compute::OrderBySinkNodeOptions`.
+Here the :class:`arrow::compute::SortOptions` are provided to define which
columns
+are used for sorting and whether to sort by ascending or descending values.
+
+Example::
+
+ arrow::compute::ExecNode *sink;
+
+ ARROW_ASSIGN_OR_RAISE(sink,
+ arrow::compute::MakeExecNode("order_by_sink", plan.get(),
+ {source},
+ arrow::compute::OrderBySinkNodeOptions{
+ /*sort_options*/arrow::compute::SortOptions{
+ { arrow::compute::SortKey{
+ //Column key(s) to order by and how to order by these sort keys.
+ "a",
+ // Sort Order
+ arrow::compute::SortOrder::Descending
+ }}},&sink_gen}));
+
+
+Order-By-Sink Example:
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: OrderBySink Example)
+ :end-before: (Doc section: OrderBySink Example)
+ :linenos:
+ :lineno-match:
+
+
+.. _stream_execution_select_k_docs:
+
+``select_k_sink``
+-----------------
+
+``select_k_sink`` option enables selecting k number of elements.
+:class:`arrow::compute::SelectKOptions` which is a defined by
+using :struct:`OrderBySinkNode` definition. This option returns a sink node
that receives
+inputs and then compute top_k/bottom_k.
+
+Create SelectK Option::
+
+ arrow::compute::SelectKOptions options =
arrow::compute::SelectKOptions::TopKDefault(
+ /*k=*/2, {"i32"});
+
+ ARROW_ASSIGN_OR_RAISE(
+ arrow::compute::ExecNode * k_sink_node,
+ arrow::compute::MakeExecNode("select_k_sink",
+ plan.get(), {source},
+ arrow::compute::SelectKSinkNodeOptions{options, &sink_gen}));
+
+
+SelectK Example;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: KSelect Example)
+ :end-before: (Doc section: KSelect Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_scan_docs:
+
+``scan``
+---------
+
+`scan` is an operation used to load and process data, and the behavior of is
defined using
+:class:`arrow::dataset::ScanNodeOptions`. This option contains a set of
definitions.
+The :ref:`dataset<cpp-dataset-reading>` API also use scanner for processing
data.
+In contrast to `source` operation, `scan` operation can load the data and
apply scanning
+operations like filter and project to the loaded data.
+
+Creating a Scan `ExecNode`::
+
+ auto options = std::make_shared<arrow::dataset::ScanOptions>();
+ options->use_async = true;
+ options->projection = Materialize({}); // create empty projection
+
+ // construct the scan node
+ cp::ExecNode* scan;
+ auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
+
+ ARROW_ASSIGN_OR_RAISE(scan,
+ cp::MakeExecNode("scan", plan.get(), {},
+ scan_node_options));
+
+Scan example;
+
+.. literalinclude::
../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: Scan Example)
+ :end-before: (Doc section: Scan Example)
+ :linenos:
+ :lineno-match:
+
+.. _stream_execution_write_docs:
+
+``write``
+---------
+
+``write`` option enables writing a result to supported file formats (example
`parquet`,
+`feather`, `csv`, etc).
+The write options are provided via the
:class:`arrow::dataset::WriteNodeOptions` and
+defined using :class:`arrow::dataset::FileSystemDatasetWriteOptions`,
+``std::shared_ptr<arrow::Schema>``, and
+``std::shared_ptr<arrow::util::AsyncToggle> backpressure_toggle``. Here the
+:class:`arrow::dataset::FileSystemDatasetWriteOptions` contains the meta-data
required
+to write the data.
Review comment:
Got it, updated the doc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]