vibhatha commented on a change in pull request #12033:
URL: https://github.com/apache/arrow/pull/12033#discussion_r778234433
##########
File path: docs/source/cpp/streaming_execution.rst
##########
@@ -175,9 +175,607 @@ their completion::
// alive until this future is marked finished.
Future<> complete = plan->finished();
+Constructing ``ExecNode`` using Options
+=======================================
+
+Using the execution plan we can construct varioud execution queries.
+To construct such queries, we have provided a set of containers or
+referred as :class:`ExecNode` s. These nodes provide the ability to
+construct operations like filtering, projection, join, etc.
+
+This is the list of :class:`ExecutionNode` s exposed;
+
+1. :class:`SourceNode`
+2. :class:`FilterNode`
+3. :class:`ProjectNode`
+4. :class:`ScalarAggregateNode`
+5. :class:`SinkNode`
+6. :class:`ConsumingSinkNode`
+7. :struct:`OrderBySinkNode`
+8. SelectK-SinkNode
+9. Scan-Node
+10. :class:`HashJoinNode`
+11. Write-Node
+12. :class:`UnionNode`
+
+There are a set of :class:`ExecNode` s designed to provide various operations
required
+in designing a streaming execution plan.
+
+``SourceNode``
+--------------
+
+:struct:`arrow::compute::SourceNode` can be considered as an entry point to
create a streaming execution plan.
+A source node can be constructed as follows.
+
+:class:`arrow::compute::SoureNodeOptions` are used to create the
:struct:`arrow::compute::SourceNode`.
+The :class:`Schema` of the data passing through and a function to generate
data
+`std::function<arrow::Future<arrow::util::optional<arrow::compute::ExecBatch>>()>`
+are required to create this option::
+
+ // data generator
+ arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() { ... }
+
+ // data schema
+ auto schema = arrow::schema({...})
+
+ // source node options
+ auto source_node_options = arrow::compute::SourceNodeOptions{schema, gen};
+
+ // create a source node
+ ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * source,
+ arrow::compute::MakeExecNode("source", plan.get(),
{},
+ source_node_options));
+
+``FilterNode``
+--------------
+
+:class:`FilterNode`, as the name suggests, provide a container to define a
data filtering criteria.
+Filter can be written using :class:`arrow::compute::Expression`. For instance
if the row values
+of a particular column needs to be filtered by a boundary value, ex: all
values of column b
+greater than 3, can be written using
:class:`arrow::compute::FilterNodeOptions` as follows::
+
+ // a > 3
+ arrow::compute::Expression filter_opt = arrow::compute::greater(
+ arrow::compute::field_ref("a"),
+
arrow::compute::literal(3));
+
+Using this option, the filter node can be constructed as follows::
+
+ // creating filter node
+ arrow::compute::ExecNode* filter;
+ ARROW_ASSIGN_OR_RAISE(filter, arrow::compute::MakeExecNode("filter",
+ // plan
+ plan.get(),
+ // previous input
+ {scan},
+ //filter node options
+ arrow::compute::FilterNodeOptions{filter_opt}));
+
+``ProjectNode``
+---------------
+
+:class:`ProjectNode` executes expressions on input batches and produces new
batches.
+Each expression will be evaluated against each batch which is pushed to this
+node to produce a corresponding output column. This is exposed via
+:class:`arrow::compute::ProjectNodeOptions` component which requires,
+a :class:`arrow::compute::Expression`, names of the project columns (names are
not provided,
+the string representations of exprs will be used) and a boolean flag to
determine
+synchronous/asynchronous nature (by default asynchronous option is set to
`true`).
+
+Sample Expression for projection::
+
+ // a * 2 (multiply values in a column by 2)
+ arrow::compute::Expression a_times_2 = arrow::compute::call("multiply",
+
{arrow::compute::field_ref("a"), arrow::compute::literal(2)});
+
+
+Creating a project node::
+
+ arrow::compute::ExecNode *project;
+ ARROW_ASSIGN_OR_RAISE(project,
+ arrow::compute::MakeExecNode("project",
+ // plan
+ plan.get(),
+ // previous node
+ {scan},
+ // project node options
+ arrow::compute::ProjectNodeOptions{{a_times_2}}));
+
+``ScalarAggregateNode``
+-----------------------
+
+:class:`ScalarAggregateNode` is an :class:`ExecNode` which provides various
+aggregation options. The :class:`arrow::compute::AggregateNodeOptions`
provides the
+container to define the aggregation criterion. These options can be
+selected from `arrow::compute` options.
+
+1. `ScalarAggregateOptions`
+
+In this aggregation mode, using option, `skip_nulls` the null values are
ignored.
+Also checks with another flag `min_count`, if less than this many non-null
values
+are observed, emit null.
+
+Example::
+
+ auto agg_options = cp::ScalarAggregateOptions agg_opt(false, 2);
+
+2. `CountOptions`
+
+:class:`arrow::compute::CountOptions` aggregation option provides three
sub-options to
+determine the counting approach.
+
+a. `ONLY_VALID` : Count only non-null values
+b. `ONLY_NULL` : Count both non-null and null values
+c. `ALL` : Count both non-null and null values
+
+Example::
+
+ arrow::compute::CountOptions options(cp::CountOptions::ONLY_VALID);
+
+3. `ModeOptions`
+
+:class:`arrow::compute::ModeOptions` aggregation option computes mode for a
distribution,
+by returns top-n common values and counts.
+By default, returns the most common value and count
+
+Example::
+
+ // n: top value `n` values
+ // skip_nulls: if true (the default), null values are ignored.
+ // Otherwise, if any value
is null, emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::ModeOptions mode_option(/*n*/5, /*skip_nulls*/true,
/*min_count*/2);
+
+4. `VarianceOptions`
+
+:class:`arrow::compute::VarianceOptions` option controls the Delta Degrees of
Freedom
+(ddof) of Variance and Stddev kernel. The divisor used in calculations is N -
ddof,
+where N is the number of elements. By default, ddof is zero, and population
variance
+or stddev is returned.
+
+Example::
+
+ // ddof:
+ // skip_nulss: If true (the default), null values are ignored.
+ //////Otherwise, if any value is null, emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::VarianceOptions variance_option(/*ddof/*1,
+ /*skip_nulls*/true,
+ /*min_count*/3);
+
+5. `QuantileOptions`
+
+:class:`arrow::compute::QuantileOptions` This option controls the Quantile
kernel behavior.
+By default, returns the median value. There is an interpolation method to use
when quantile
+lies between two data points. The provided options for interpolation are;
`LINEAE`, `LOWER`, `HIGHER`,
+`NEAREST` and `MIDPOINT`.
+
+Example::
+
+ // q: quantile must be between 0 and 1 inclusive
+ ////// (scalar value or a std::vector as input)
+ // interpolation: one of `LINEAER`, `LOWER`, 'HIGHER',
+ ////// `NEAREST`, `MIDPOINT`
+ // skip_nulls: If true (the default), null values are ignored. Otherwise,
+ ////// if any value is null, emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::QuantileOptions quantile_options(/*q*/0.50,
+ /*interpolation*/cp::QuantileOptions::Interpolation::LINEAR,
+ /*skip_nulls*/true,
+ /*min_count*/3);
+
+6. `TDigestOptions`
+
+`arrow::compute::TDigestOptions` option controls TDigest approximate quantile
kernel behavior.
+By default, returns the median value.
+
+Example::
+
+ // q: quantile must be between 0 and 1 inclusive
+ // delta: compression parameter, default 100
+ // buffer_size: input buffer size, default 500
+ // skip_nulls: if true (the default), null values are ignored. Otherwise,
if any value is null,
+ ////// emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::TDigestOptions tdigest_option(/*q*/0.5,
+ /*delta*/200,
+ /*buffer_size*/600,
+ /*skip_nulls*/true,
+ /*min_count*/5);
+
+7. IndexOptions
+
+:class:`arrow::compute::IndexOptions` This option controls Index kernel
behavior.
+This is used to find the index of a particular scalar value.
+
+Example::
+
+ arrow::compute::IndexOptions index_options(arrow::MakeScalar("1"));
+
+An example for creating an aggregate node::
+
+ arrow::compute::CountOptions
options(arrow::compute::CountOptions::ONLY_VALID);
+
+ auto aggregate_options = arrow::compute::AggregateNodeOptions{
+ /*aggregates=*/{{"hash_count", &options}},
+ /*targets=*/{"a"},
+ /*names=*/{"count(a)"},
+ /*keys=*/{"b"}};
+
+ ARROW_ASSIGN_OR_RAISE(cp::ExecNode * aggregate,
+ cp::MakeExecNode("aggregate", plan.get(),
{source},
+ aggregate_options));
+
+
+Scan-Node
Review comment:
Replaced the nodes with `scan`, `project`, etc operations and didn't
explicitly discussed about nodes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]