paul-rogers opened a new pull request, #13187:
URL: https://github.com/apache/druid/pull/13187

   Druid recently introduced the new MSQ framework for batch queries (which we 
will informally call MSQb here.) We would like to bring the MSQ goodness to 
Druid's interactive queries, to provide an MSQ-interactive (or MSQi). The needs 
of a batch engine are vastly different than the needs of an interactive engine: 
this is why both Spark and Presto (or Drill or Impala) exist. It is why MSQb 
runs via the Overlord, while interactive queries run via the Broker. This PR is 
a first of many steps to evolve Druid's low-latency query path toward being 
multi-stage. That is, this is the first step toward MSQi.
   
   This PR uses a the industry-standard notion of an operator DAG as discussed 
in [Issue #11933](https://github.com/apache/druid/issues/11933). Please see 
that issue for the motivation. Please see the `README.md` file for a technical 
overview. The basic idea (thanks to @imply-cheddar) is to retain (for now) the 
existing `QueryRunner` structure, but "shim in" operators in place of 
`Sequence`s. This then sets us up to evolve the `QueryRunner` classes in later 
PRs. The result is that we move from one fully-working state to another, with 
each step moving us toward the MSQi goal.
   
   This PR is large and complex, but it does present the entire operator 
architecture. We may find that we wish to divide this PR into smaller chunks. 
Those smaller PRs can refer back to this one for the "big picture."
   
   This PR replaces an [earlier PR 
#12641](https://github.com/apache/druid/pull/12641) that presented an earlier 
version of this work which was closed to allow us to focus on MSQb at that time.
   
   The PR is currently a draft to allow us to cross-check that each converted 
operator incorporates any recent changes from the corresponding `QueryRunner` 
and `Sequence` classes.
   
   ## Highlights of This PR
   
   Included here:
   
   * Operator definition and associated "helper" classes.
   * Query and fragment structure to run a DAG of operators.
   * A "generic" set of operators for common operations (limit, merge, etc.)
   * Conversion of the Scan query to use operators in place of `Sequence`s.
   * Conversion of the TimeSeries query as above.
   * A configuration layer that allows the new path to be enabled. (It is 
disabled by default.)
   * Revised `QueryLifecycle` and `QueryResponse` classes that can run the 
query either as old-school `Sequence`s or as the new `Fragment` structure.
   * Unit tests for all of the new code.
   * Rerun of all "Calcite query" tests with the operator path enabled.
   
   The primary goal of this PR is to introduce the basics of the operator 
approach. Functionally, the new and old approaches produce identical results. 
In a few places, the new approach exploited optimizations which skip 
unnecessary steps. Extreme stress tests (reading 5 million generated rows) 
shows that the operator path has lower overhead, but that gain is unlikely to 
be visible except for very large result sets.
   
   ### Operators
   
   This PR is based on the operator concept discussed in detail in the issue 
cited above. It is may be helpful to summarize the key ideas. An operator does 
one task in a data pipeline. Calcite converts a SQL query to a tree of "logical 
operators". In most engines, a rewrite step then converts the logical plan to a 
physical plan, complete with distribution decisions. The physical plan is 
executed as a DAG of physical operators.
   
   Historically, Druid converts the Calcite logical plan tree to a native 
query, then executes the native query as a set of `QueryRunner/Sequence` pairs. 
At a very high conceptual level, a "fragment" is somewhat the execution of a 
native query, while operators are like a `Sequence`. The design, however, is 
much different at the next level down.
   
   An operator does one thing, and is independent of the rest of the code 
except at three interface points:
   
   * The "parameters" given to a specific operator instance (what to sort, the 
limit to apply, what to merge, etc.)
   * The shape of the incoming (upstream) rows.
   * The shape of the outgoing (downstream) rows.
   
   Operators are simple, stateful classes that can be composed in any number of 
DAG structures, and can be unit tested in isolation. Later in the MSQi project, 
we will convert directly from the Calcite logical operators to a description 
(plan) of the physical operators, which is then used to create operator 
instances. For now, we stick with the detour to native queries so we can make 
small, incremental steps that move from one working state to the next.
   
   The key operator abstractions include:
   
   * `Operator`: an interface for a data pipeline component. An operator can be 
opened to provide an iterator over results, then closed. An operator can have 
zero inputs (a leaf operator), one input (a filter, limit or projection 
operator) or multiple inputs (join, merge, union, etc.)
   * `ResultIterator`: a super-simple iterator over the results (rows, batches) 
which an operator produces. Uses an exception to signal EOF, which reduces the 
code needed in a data pipeline relative to the Java iterator protocol.
   
   Multiple variations of operators are provided in this PR. All of these 
operators are simple in the sense that they only refer to other operators, but 
not to any of Druid's query infrastructure.
   
   * `LimitOperator`: applies a limit to a result set.
   * `NullOperator`: does nothing, like an empty list or empty iterator.
   * `MappingOperator`: takes one input and applies some form of mapping as 
defined by a derived class.
   * `ConcatOpreator`: performs a union of its inputs, emitting each one after 
the other.
   * `WrappingOperator` similar to "baggage" on sequences: an operator that 
does tasks at the start and end, of result set, but imposes no per-row overhead.
   * And many more.
   
   There are also native-query-specific operators for the Scan and Timeseries 
queries. As it turns out, there are many code paths common to all native 
queries. Operators exist for this path as well so that for Scan and Timeseries, 
it is "operators all the way down" while for other query types it is a mix of 
operators and `Sequence`s -- until we convert those other native queries later.
   
   ### General Operators
   
   Another group of operators are those common to all native queries, and are 
specific to Druid's implementation:
   
   * `CpuMetricOperator` a "wrapper" operator that gathers CPU metrics.
   * `SegmentLockOperator` implements the pin of each segment as the query runs.
   * `MergeOperator` generic priority queue ordered merge.
   * And many more.
   
   ### Scan Operators
   
   Operators for the scan query:
   
   * `CursorReader` reads from one or more `Cursor`s.
   * `ScanQueryOperator` replaces the `ScanQueryEngine`, which pretty much 
reads from a set of cursors.
   * `GroupedScanResultLimitOperator`, `UngroupedScanResultLimitOperator`: 
limit operators for scans.
   * `ScanResultOffsetOperator`: offset operator for scans.
   * `ScanListToArrayOperator`, `ScanCompactListToArrayOperator`: unpacks scan 
"batches" to individual rows.
   
   Surprisingly, these are the only scan-specific operators required.
   
   ### TimeSeries Operators
   
   For the Time Series query:
   
   * `TimeseriesEngineOperator` replaces the `TimeseriesQueryEngine` to read 
from one or more cursors (vectorize or non-vectorized).
   * `IntermediateAggOperator` to perform streaming aggregation of time groups.
   * `GrandTotalOperator` to perform the second-stage aggregation for time 
series including a "grand total" row.
   * Misc. plumbing classes, including one that tames the otherwise-wild 
`VectorCursorGranularizer` class.
   
   ## Fragments
   
   Operators combine to form a data pipeline. Data pipelines are distributed, 
as in Druid's scatter/gather architecture. A common terminology is to say that 
the entire query forms a DAG. The DAG is "sliced" at node boundaries, with 
exchanges between slices. At runtime, a *slice* is replicated across many 
nodes. Each instance of a slice is a *fragment*.
   
   This PR provides the basics of the query and fragment structure.  We 
discussed above how, in most engines, a planner converts SQL into a logical 
plan, then into a physical plan that describes the operator DAG. Slices of that 
plan are sent to nodes which then execute the fragments.  Druid, however, 
already has a mature, existing scatter/gather structure based around 
`QueryRunner`s, and we mentioned our goal is to reuse what exists, making 
incremental changes along the way. In the PR, we retain the majority of the 
native query structure. Fragments are introduced as a way of managing the group 
of operators needed for a single native query on one node. When queries run in 
the "Calcite test" framework, we simulate distribution via worker threads. In 
this case, we have a query with multiple fragments (one per thread.) A query 
structure acts as the overall structure which ties the fragments together. This 
PR does not introduce new exchange methods: it simply fits itself into the 
existing
  scatter/gather structure. Again, all this will evolve later, but we have to 
start simple.
   
   * `FragmentContext`: the state shared by all operators in a fragment. For 
now, this state includes the `ResponseContext` and, internally, the collection 
of all operators that form the fragment.
   * `FragmentManager`: orchestrates the steps to build and run an individual 
fragment.
   * `QueryManager`: is the "container" for all fragments in a query.
   
   We will need a way to pass fragment information to `QueryRunner`s so that 
they can create operators for a fragment. It turns out that `QueryPlus` is 
handy way to accomplish this: it now holds a `FragmentManager` used by query 
runners to build their operators. A simplified form, `FragmentContext`, makes 
fragment-level resources (timeout, response context, etc.) available to 
operators.
   
   ## Query Runners
   
   We've noted that Druid already has an existing `QueryRunner` based structure 
which we reuse in this PR. The `QueryRunner.run()` method can actually be seen 
as being a `QueryPlanner.plan()` method: the method decides which sequences are 
needed to run a query. The sequences do the actually running. In this light, it 
is easy to see how we convert to operators: the `QueryRunner.run()` method 
creates an operator instead.
   
   Our long-term goal is to retire the `QueryRunner` classes. Thinking ahead, 
the "planning" code can be gathered up in a "native query planner" (akin to 
`QueryKit` in MSQb). In anticipation, each converted `QueryRunner` calls to a 
"query planner" class to decide which operator(s) to create. In some cases, the 
operator is a 1:1 replacement for a sequence. In other cases, it turns out we 
can optimize the query by omitting unneeded operators, or by using one of 
several finely tuned operators in place of the generalized sequence in the 
existing code. It should be clear how, in a later step, a new native query 
planner  will call these methods directly without the need to first pass 
through a `QueryRunner`.
   
   The general pattern is:
   
   * Accept a `QueryPlus` and an upstream `QueryRunner`.
   * Create an operator which runs the `QueryRunner`.
   * Convert the resulting sequence to an "input operator."
   * Decide which operator(s) we need to add to perform the task at hand.
   * Wrap the result in a sequence compatible with the return value of 
`QueryRunner.run()`.
   
   This PR converts two entire native queries, and the common parts of other 
queries. As a result, for those partially-converted queries, we have a 
combination of sequences and operators. A set of shims allows operators to read 
from sequences, and to masquerade as sequences. This allows operators to 
seamlessly insert themselves into a sequence-based execution pipeline. When two 
operators are adjacent, the intervening sequence is optimized away, leaving the 
the two operators to talk directly. As the conversion continues, there will be 
more operators and fewer in the execution pipeline.
   
   ## Query Lifecycle
   
   The `QueryLifecycle` class runs a native query and is where we make the 
decision to use the "traditional" execution path (based on `Sequence`) or 
"MSQi" execution path (based on operators). That decision is based on a 
configuration property, as explained below. If we execute the query the 
traditional way, the object handed to the caller is a `Sequence`. If we go the 
new route, the returned object is a `FragmentManager`. As it turns out, 
@imply-cheddar recently added a `QueryResponse` object which is perfect for 
this: that class wraps the two alternatives in a common interface. Our API 
level expects a sequence, so the fragment-based `QueryResponse` simply wraps 
the operator DAG in a sequence (plus a bit of overhead.)
   
   ### Bootstrap
   
   The `QueryNGModule` defines a `QueryManagerFactory` which is injected into 
`QueryLifecycleFactory`. That factory then creates a `QueryLifecycle`. The 
query lifecycle checks if the query is enabled (as described above) using the 
config attached to ``QueryManagerFactory `. If so, it then uses 
``QueryManagerFactory `` to create a `FragmentManager` which is then attached 
to the `QueryPlus` for that query. 
   
   From then on, each `QueryRunner` checks if there is a `FragmentManager` 
instance attached to the `QueryPlus`. If so, the query runner creates an 
operator (if that query runner has been converted to do so), else it executes 
the "classic" code path to create a sequence.
   
   Note that, if the mechanism is not enabled (the default in this PR), there 
will never be a `FragmentManager` attached to the `QueryPlus`, and so execution 
will ignore the operator path.
   
   ### Configuration
   
   At present, the operator path is experimental, and thus disabled by default. 
The new path is enabled by setting a system property: 
`-druid.queryng.enabled=true` on the command line or in a properties file. Just 
to be clear: even if this flag is set, Druid will happily use the existing path 
for any queries not yet converted to use operators.
   
   ## Query, Fragment and Operator Profiles
   
   Since this change is purely at the "plumbing" level, it is hard for the 
casual observer (or even the dedicated developer) to see if we execute one path 
or another. The new "native query planner" methods work hard to optimize away 
bits of functionality not needed for a query, but it is hard to see that in 
action. We also want to know, in general, how much data flows through operators 
and how long things take. To address all of these concerns, the code provides a 
"profile" mechanism to gather basic stats per operator, then present them for a 
fragment or overall query. Since we don't yet have a good place to store such 
profiles, they are simply written to the log when enabled. Example:
   
   ```text
   Query ID: 50496d1d-b9fd-4e3e-8dc9-42195a534eeb
   Runtime (ms): 4
   Query Type: TimeseriesQuery
   
   -- Root Slice  --
   
   project-sql-results
   |   row-count: 1
   | 
   timeseries-to-array
   |   row-count: 1
   | 
   cpu-time
   |   cpu-time-ns: 145000
   | 
   finalizer
   |   row-count: 1
   | 
   intermediate-agg
   |   group-count: 1
   |   row-count: 1
   | 
   segment-retry
   |   try-count: 1
   |   missing-segment-count: 0
   | 
   finalizer
   |   row-count: 1
   | 
   intermediate-agg
   |   group-count: 1
   |   row-count: 1
   | 
   finalizer
   |   row-count: 1
   | 
   intermediate-agg
   |   group-count: 1
   |   row-count: 1
   | 
   ordered-scatter-gather
   |   input-count: 1
   |   row-count: 1
   | 
   Slice 2
   
   -- Slice 2  --
   
   Fragment 1
   
   timeseries-engine
     vectorized: true
     row-count: 0
   ```
   
   ## Tests
   
   One of the very handy things about operators is that they are highly modular 
and thus extremely easy to unit test. Tests exist for all the basic 
abstractions defined above. Further, all SQL `CalciteQueryTest` queries were 
run with the mechanism enabled.
   
   ## Next Steps
   
   The goal of this PR is to introduce the operator framework with two concrete 
implementations, starting with the simpler queries so attention can focus on 
the framework, less on the nuances of the more complex native queries. This PR 
is a fully-functional replacement for the two native queries, though it is 
disabled by default. As already noted, future PRs will convert other operators, 
then begin to convert query runners to an MSQi version of `QueryKit`. This then 
allows us to introduce the MSQb frame concept and to add multi-tier queries for 
expensive merges, for joins and so on.
   
   <hr>
   
   This PR has:
   - [X] been self-reviewed.
   - [X] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [X] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [X] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] been tested in a test Druid cluster. (N/A, since the code is not yet 
integrated into Druid.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to