vibhatha commented on a change in pull request #12033:
URL: https://github.com/apache/arrow/pull/12033#discussion_r778215575
##########
File path: docs/source/cpp/streaming_execution.rst
##########
@@ -305,3 +305,601 @@ Datasets may be scanned multiple times; just make
multiple scan
nodes from that dataset. (Useful for a self-join, for example.)
Note that producing two scan nodes like this will perform all
reads and decodes twice.
+
+Constructing ``ExecNode`` using Options
+=======================================
+
+Using the execution plan we can construct various queries.
+To construct such queries, we have provided a set of building blocks
+or referred as :class:`ExecNode` s. These nodes provide the ability to
+construct operations like filtering, projection, join, etc.
+
+This is the list of :class:`ExecutionNode` s exposed;
+
+1. :class:`SourceNode`
+2. :class:`FilterNode`
+3. :class:`ProjectNode`
+4. :class:`ScalarAggregateNode`
+5. :class:`SinkNode`
+6. :class:`ConsumingSinkNode`
+7. :struct:`OrderBySinkNode`
+8. SelectK-SinkNode
+9. Scan-Node
+10. :class:`HashJoinNode`
+11. Write-Node
+12. :class:`UnionNode`
+
+There are a set of :class:`ExecNode` s designed to provide various operations
required
+in designing a streaming execution plan.
+
+``SourceNode``
+--------------
+
+:struct:`arrow::compute::SourceNode` can be considered as an entry point to
create a streaming execution plan.
+A source node can be constructed as follows.
+
+:class:`arrow::compute::SoureNodeOptions` are used to create the
:struct:`arrow::compute::SourceNode`.
+The :class:`Schema` of the data passing through and a function to generate
data
+`std::function<arrow::Future<arrow::util::optional<arrow::compute::ExecBatch>>()>`
+are required to create this option::
+
+ // data generator
+ arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> gen() { ... }
+
+ // data schema
+ auto schema = arrow::schema({...})
+
+ // source node options
+ auto source_node_options = arrow::compute::SourceNodeOptions{schema, gen};
+
+ // create a source node
+ ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * source,
+ arrow::compute::MakeExecNode("source", plan.get(),
{},
+ source_node_options));
+
+``FilterNode``
+--------------
+
+:class:`FilterNode`, as the name suggests, provide a container to define a
data filtering criteria.
+Filter can be written using :class:`arrow::compute::Expression`. For instance
if the row values
+of a particular column needs to be filtered by a boundary value, ex: all
values of column b
+greater than 3, can be written using
:class:`arrow::compute::FilterNodeOptions` as follows::
+
+ // a > 3
+ arrow::compute::Expression filter_opt = arrow::compute::greater(
+ arrow::compute::field_ref("a"),
+
arrow::compute::literal(3));
+
+Using this option, the filter node can be constructed as follows::
+
+ // creating filter node
+ arrow::compute::ExecNode* filter;
+ ARROW_ASSIGN_OR_RAISE(filter, arrow::compute::MakeExecNode("filter",
+ // plan
+ plan.get(),
+ // previous input
+ {scan},
+ //filter node options
+ arrow::compute::FilterNodeOptions{filter_opt}));
+
+``ProjectNode``
+---------------
+
+:class:`ProjectNode` executes expressions on input batches and produces new
batches.
+Each expression will be evaluated against each batch which is pushed to this
+node to produce a corresponding output column. This is exposed via
+:class:`arrow::compute::ProjectNodeOptions` component which requires,
+a :class:`arrow::compute::Expression`, names of the project columns (names are
not provided,
+the string representations of exprs will be used) and a boolean flag to
determine
+synchronous/asynchronous nature (by default asynchronous option is set to
`true`).
+
+Sample Expression for projection::
+
+ // a * 2 (multiply values in a column by 2)
+ arrow::compute::Expression a_times_2 = arrow::compute::call("multiply",
+
{arrow::compute::field_ref("a"), arrow::compute::literal(2)});
+
+
+Creating a project node::
+
+ arrow::compute::ExecNode *project;
+ ARROW_ASSIGN_OR_RAISE(project,
+ arrow::compute::MakeExecNode("project",
+ // plan
+ plan.get(),
+ // previous node
+ {scan},
+ // project node options
+ arrow::compute::ProjectNodeOptions{{a_times_2}}));
+
+``ScalarAggregateNode``
+-----------------------
+
+:class:`ScalarAggregateNode` is an :class:`ExecNode` which provides various
+aggregation options. The :class:`arrow::compute::AggregateNodeOptions`
provides the
+container to define the aggregation criterion. These options can be
+selected from `arrow::compute` options.
+
+1. `ScalarAggregateOptions`
+
+In this aggregation mode, using option, `skip_nulls` the null values are
ignored.
+Also checks with another flag `min_count`, if less than this many non-null
values
+are observed, emit null.
+
+Example::
+
+ auto agg_options = cp::ScalarAggregateOptions agg_opt(false, 2);
+
+2. `CountOptions`
+
+:class:`arrow::compute::CountOptions` aggregation option provides three
sub-options to
+determine the counting approach.
+
+a. `ONLY_VALID` : Count only non-null values
+b. `ONLY_NULL` : Count both non-null and null values
+c. `ALL` : Count both non-null and null values
+
+Example::
+
+ arrow::compute::CountOptions options(cp::CountOptions::ONLY_VALID);
+
+3. `ModeOptions`
+
+:class:`arrow::compute::ModeOptions` aggregation option computes mode for a
distribution,
+by returns top-n common values and counts.
+By default, returns the most common value and count
+
+Example::
+
+ // n: top value `n` values
+ // skip_nulls: if true (the default), null values are ignored.
+ // Otherwise, if any value
is null, emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::ModeOptions mode_option(/*n*/5, /*skip_nulls*/true,
/*min_count*/2);
+
+4. `VarianceOptions`
+
+:class:`arrow::compute::VarianceOptions` option controls the Delta Degrees of
Freedom
+(ddof) of Variance and Stddev kernel. The divisor used in calculations is N -
ddof,
+where N is the number of elements. By default, ddof is zero, and population
variance
+or stddev is returned.
+
+Example::
+
+ // ddof:
+ // skip_nulss: If true (the default), null values are ignored.
+ //////Otherwise, if any value is null, emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::VarianceOptions variance_option(/*ddof/*1,
+ /*skip_nulls*/true,
+ /*min_count*/3);
+
+5. `QuantileOptions`
+
+:class:`arrow::compute::QuantileOptions` This option controls the Quantile
kernel behavior.
+By default, returns the median value. There is an interpolation method to use
when quantile
+lies between two data points. The provided options for interpolation are;
`LINEAE`, `LOWER`, `HIGHER`,
+`NEAREST` and `MIDPOINT`.
+
+Example::
+
+ // q: quantile must be between 0 and 1 inclusive
+ ////// (scalar value or a std::vector as input)
+ // interpolation: one of `LINEAER`, `LOWER`, 'HIGHER',
+ ////// `NEAREST`, `MIDPOINT`
+ // skip_nulls: If true (the default), null values are ignored. Otherwise,
+ ////// if any value is null, emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::QuantileOptions quantile_options(/*q*/0.50,
+ /*interpolation*/cp::QuantileOptions::Interpolation::LINEAR,
+ /*skip_nulls*/true,
+ /*min_count*/3);
+
+6. `TDigestOptions`
+
+`arrow::compute::TDigestOptions` option controls TDigest approximate quantile
kernel behavior.
+By default, returns the median value.
+
+Example::
+
+ // q: quantile must be between 0 and 1 inclusive
+ // delta: compression parameter, default 100
+ // buffer_size: input buffer size, default 500
+ // skip_nulls: if true (the default), null values are ignored. Otherwise,
if any value is null,
+ ////// emit null.
+ // min_count: If less than this many non-null values are observed, emit
null.
+ arrow::compute::TDigestOptions tdigest_option(/*q*/0.5,
+ /*delta*/200,
+ /*buffer_size*/600,
+ /*skip_nulls*/true,
+ /*min_count*/5);
+
+7. IndexOptions
+
+:class:`arrow::compute::IndexOptions` This option controls Index kernel
behavior.
+This is used to find the index of a particular scalar value.
+
+Example::
+
+ arrow::compute::IndexOptions index_options(arrow::MakeScalar("1"));
+
+An example for creating an aggregate node::
+
+ arrow::compute::CountOptions
options(arrow::compute::CountOptions::ONLY_VALID);
+
+ auto aggregate_options = arrow::compute::AggregateNodeOptions{
+ /*aggregates=*/{{"hash_count", &options}},
+ /*targets=*/{"a"},
+ /*names=*/{"count(a)"},
+ /*keys=*/{"b"}};
+
+ ARROW_ASSIGN_OR_RAISE(cp::ExecNode * aggregate,
+ cp::MakeExecNode("aggregate", plan.get(),
{source},
+ aggregate_options));
+
+
+Scan-Node
+---------
+
+There is no class or struct defined as ScanNode in the source.
+But :class:`arrow::compute::ScanNodeOptions` container includes the options
+passed to `MakeScanNode` internal function which creates an :class:`ExecNode`
+performing the defined task. This component includes a few options,
+defined in the :class:`arrow::compute::ScanNodeOptions` and this requires,
+`std::shared_ptr<arrow::dataset::Dataset>`,
+`std::shared_ptr<arrow::compute::ScanOptions>`,
+`std::shared_ptr<arrow::util::AsyncToggle>`.
+
+The :class:`arrow::compute::ScanOptions` includes the scaning options::
+
+ arrow::compute::Expression Materialize(std::vector<std::string> names,
+ bool include_aug_fields = false) {
+ if (include_aug_fields) {
+ for (auto aug_name : {"__fragment_index",
+ "__batch_index", "__last_in_fragment"}) {
+ names.emplace_back(aug_name);
+ }
+ }
+
+ std::vector<arrow::compute::Expression> exprs;
+ for (const auto& name : names) {
+ exprs.push_back(arrow::compute::field_ref(name));
+ }
+
+ return arrow::compute::project(exprs, names);
+ }
+
+ auto options = std::make_shared<arrow::dataset::ScanOptions>();
+ // sync scanning is not supported by ScanNode
+ options->use_async = true;
+ options->projection = Materialize({}); // create empty projection
+
+ // construct the scan node
+ cp::ExecNode* scan;
+ auto scan_node_options = arrow::dataset::ScanNodeOptions{
+ /*dataset*/dataset,
+ /*scan_options*/options};
+
+ ARROW_ASSIGN_OR_RAISE(scan,
+ arrow::compute::MakeExecNode("scan",
+ plan.get(),
+ {},
+ scan_node_options));
+
+``SinkNode``
+------------
+
+:class:`SinkNode` can be considered as the output or final node of an
streaming
+execution definition. :class:`arrow::compute::SinkNodeOptions` interface is
used to pass
+the required options. Requires
+`std::function<arrow::Future<arrow::util::optional<arrow::compute::ExecBatch>>()>*
generator`
+and `arrow::util::BackpressureOptions backpressure`.
+
+Example::
+
+ arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+ arrow::compute::ExecNode* sink;
+
+ ARROW_ASSIGN_OR_RAISE(sink, arrow::compute::MakeExecNode("sink",
plan.get(), {source},
+
arrow::compute::SinkNodeOptions{&sink_gen}));
+
+
+The output can be obtained as a table::
+
+ // // // translate sink_gen (async) to sink_reader (sync)
+ std::shared_ptr<arrow::RecordBatchReader> sink_reader =
cp::MakeGeneratorReader(
+ basic_data.schema, std::move(sink_gen), exec_context.memory_pool());
+
+ // // validate the ExecPlan
+ ABORT_ON_FAILURE(plan->Validate());
+ std::cout << "Exec Plan Created: " << plan->ToString() << std::endl;
+ // // // start the ExecPlan
+ ABORT_ON_FAILURE(plan->StartProducing());
+
+ // // collect sink_reader into a Table
+ std::shared_ptr<arrow::Table> response_table;
+
+ ARROW_ASSIGN_OR_RAISE(response_table,
+
arrow::Table::FromRecordBatchReader(sink_reader.get()));
+
+ std::cout << "Results : " << response_table->ToString() << std::endl;
+
+
+``ConsumingSinkNode``
+---------------------
+
+:class:`arrow::compute::ConsumingSinkNode` is a sink node that owns consuming
the data and
+will not finish until the consumption is finished. Use SinkNode if you are
+transferring the ownership of the data to another system.
+Use :class:`arrow::compute::ConsumingSinkNode` if the data is being consumed
within the exec
+plan (i.e. the exec plan should not complete until the consumption has
completed).
+
+Example::
+
+ // define a Custom SinkNodeConsumer
+ std::atomic<uint32_t> batches_seen{0};
+ arrow::Future<> finish = arrow::Future<>::Make();
+ struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
+
+ CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen,
arrow::Future<>finish):
+ batches_seen(batches_seen), finish(std::move(finish)) {}
+ // Consumption logic can be written here
+ arrow::Status Consume(cp::ExecBatch batch) override {
+ // data can be consumed in the expected way
+ // transfer to another system or just do some work
+ // and write to disk
+ (*batches_seen)++;
+ return arrow::Status::OK();
+ }
+
+ arrow::Future<> Finish() override { return finish; }
+
+ std::atomic<uint32_t> *batches_seen;
+ arrow::Future<> finish;
+
+ };
+
+ std::shared_ptr<CustomSinkNodeConsumer> consumer =
+ std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
+
+ arrow::compute::ExecNode *consuming_sink;
+
+ ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink",
plan.get(),
+ {source}, cp::ConsumingSinkNodeOptions(consumer)));
+
+
+``OrderBySinkNode``
+-------------------
+
+This is an extension to the :class:`SinkNode` definition and provides the
ability
+to guarantee the ordering of the stream by providing the,
+:class:`arrow::compute::OrderBySinkNodeOptions`.
+Here the :class:`arrow::compute::SortOptions` are provided to define which
columns
+are used for sorting and under which criterion.
+
+Example::
+
+ arrow::compute::ExecNode *sink;
+
+ ARROW_ASSIGN_OR_RAISE(sink,
+ arrow::compute::MakeExecNode("order_by_sink", plan.get(),
+ {source},
+ arrow::compute::OrderBySinkNodeOptions{
+ /*sort_options*/arrow::compute::SortOptions{
+ { arrow::compute::SortKey{
+ //Column key(s) to order by and how to order by these sort keys.
+ "a",
+ // Sort Order
+ arrow::compute::SortOrder::Descending
+ }}},&sink_gen}));
+
+
+SelectK-Node
+------------
+
+There is no Select-K-SinkNode available as an entity within the source, but
the behavior
+is defined with the options :class:`arrow::compute::SelectKOptions` which is a
defined by
+using :struct:`OrderBySinkNode` definition. This option returns a sink node
that receives
+inputs and then compute top_k/bottom_k.
+
+Example::
+
+ arrow::compute::SelectKOptions options =
arrow::compute::SelectKOptions::TopKDefault(
+ /*k=*/2, {"i32"});
+
+ ARROW_ASSIGN_OR_RAISE(
+ arrow::compute::ExecNode * k_sink_node,
+ arrow::compute::MakeExecNode("select_k_sink",
+ plan.get(), {source},
+ arrow::compute::SelectKSinkNodeOptions{options, &sink_gen}));
+
+Scan-Node
+---------
+
+There is no definition Scan-Node in the source, but the behavior of is defined
using
+:class:`arrow::dataset::ScanNodeOptions`. This option contains a set of
definitions.
+
+Option definitions for :class:`arrow::dataset::ScanNodeOptions`::
+
+
+ /// A row filter (which will be pushed down to partitioning/reading if
supported).
+ arrow::compute::Expression filter //
+ /// A projection expression (which can add/remove/rename columns).
+ arrow::compute::Expression projection; //
+ /// Schema with which batches will be read from fragments. This is also
known as the
+ /// "reader schema" it will be used (for example) in constructing CSV file
readers to
+ /// identify column types for parsing. Usually only a subset of its fields
(see
+ /// MaterializedFields) will be materialized during a scan.
+ std::shared_ptr<arrow::Schema> dataset_schema;
+ /// Schema of projected record batches. This is independent of
dataset_schema as its
+ /// fields are derived from the projection. For example, let
+ ///
+ /// dataset_schema = {"a": int32, "b": int32, "id": utf8}
+ /// projection = project({equal(field_ref("a"), field_ref("b"))},
{"a_plus_b"})
+ ///
+ /// (no filter specified). In this case, the projected_schema would be
+ ///
+ /// {"a_plus_b": int32}
+ std::shared_ptr<arrow::Schema> projected_schema;
+
+ /// Maximum row count for scanned batches.
+ int64_t batch_size // 1024 * 1024;
+
+ /// How many batches to read ahead within a file
+ ///
+ /// Set to 0 to disable batch readahead
+ ///
+ /// Note: May not be supported by all formats
+ /// Note: May not be supported by all scanners
+ /// Note: Will be ignored if use_threads is set to false
+ int32_t batch_readahead // 32;
+
+ /// How many files to read ahead
+ ///
+ /// Set to 0 to disable fragment readahead
+ ///
+ /// Note: May not be enforced by all scanners
+ /// Note: Will be ignored if use_threads is set to false
+ int32_t fragment_readahead // 8;
+ /// If true the scanner will scan in parallel
+ ///
+ /// Note: If true, this will use threads from both the cpu_executor and the
+ /// io_context.executor
+ /// Note: This must be true in order for any readahead to happen
+ bool use_threads = false;
+
+ /// If true then an asycnhronous implementation of the scanner will be
used.
+ /// This implementation is newer and generally performs better. However,
it
+ /// makes extensive use of threading and is still considered experimental
+ bool use_async = false;
+
+ /// Fragment-specific scan options.
+ // Some implemented FragementScanOptions are;
+ // CsvFragmentScanOptions, IpcFragmentScanOptions,
ParquetFragmentScanOptions
+ std::shared_ptr<arrow::dataset::FragmentScanOptions> fragment_scan_options;
+
+
+Creating a Scan `ExecNode`::
+
+ auto options = std::make_shared<arrow::dataset::ScanOptions>();
+ options->use_async = true;
+ options->projection = Materialize({}); // create empty projection
+
+ // construct the scan node
+ cp::ExecNode* scan;
+ auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
+
+ ARROW_ASSIGN_OR_RAISE(scan,
+ cp::MakeExecNode("scan", plan.get(), {},
+ scan_node_options));
+
+
+Write-Node
+----------
+
+The option to write a result to a file format is provided by this execution
node type.
+A definition doesn't exist as an :class:`ExecNode`, but the write options are
provided
+via the :class:`arrow::dataset::WriteNodeOptions` and defined using
+:class::`arrow::dataset::FileSystemDatasetWriteOptions`,
`std::shared_ptr<arrow::Schema>`,
+and `std::shared_ptr<arrow::util::AsyncToggle> backpressure_toggle`. Here the
+:class::`arrow::dataset::FileSystemDatasetWriteOptions` contains the meta-data
required
+to write the data.
+
+Creating `WriteNodeOptions`::
+
+ std::string root_path = "";
+ std::string uri = "file://" + '/path/to/file';
+ std::shared_ptr<arrow::fs::FileSystem> filesystem =
+ arrow::fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
+
+ auto base_path = root_path + "/parquet_dataset";
+ ABORT_ON_FAILURE(filesystem->DeleteDir(base_path));
+ ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
+
+ // The partition schema determines which fields are part of the
partitioning.
+ auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
+ // We'll use Hive-style partitioning,
+ // which creates directories with "key=value" pairs.
+
+ auto partitioning =
+ std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
+ // We'll write Parquet files.
+ auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+
+ arrow::dataset::FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.filesystem = filesystem;
+ write_options.base_dir = base_path;
+ write_options.partitioning = partitioning;
+ write_options.basename_template = "part{i}.parquet";
+
+ arrow::dataset::WriteNodeOptions write_node_options {write_options,
+ dataset->schema()};
+
+Creating a `write` `ExecNode`::
+
+ ARROW_ASSIGN_OR_RAISE(cp::ExecNode *wr, cp::MakeExecNode("write",
plan.get(),
+ {scan}, write_node_options));
+
+ ABORT_ON_FAILURE(wr->Validate());
+ ABORT_ON_FAILURE(plan->Validate());
+ // // // start the ExecPlan
+ ABORT_ON_FAILURE(plan->StartProducing());
+ plan->finished().Wait(); // make sure to add this method
+
+``UnionNode``
+-------------
+
+:class:`UnionNode` is the :class:`ExecNode` interface to perform a union
+operation on two datasets. The union operation can be executed
+on multiple data sources(:class:`ExecNodes`).
+
+The following example demonstrates how this can be achieved using
+two data sources. Following a union operations the output is obtained using
+a aggregation operation.
Review comment:
Replaced by referring to the existing docs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]