gianm opened a new issue #12262:
URL: https://github.com/apache/druid/issues/12262


   This proposal is about extending Druid to support multi-stage distributed 
queries, something that I think will be really exciting and amp up what Druid 
is capable of.
   
   ## Motivation
   
   Today Druid's distributed query stack is single-stage: the Broker receives a 
query, slices it up into pieces, and sends one piece to each Historical. The 
Historicals produce partial results for their piece of the query, send them 
back to the Broker, and the Broker merges them into the final result set.
   
   This venerable design works well when the bulk of processing can be done at 
the leaf servers that store the actual data. It scales well: Druid can handle 
thousands of QPS of queries, very CPU and memory efficiently, so long as there 
are enough Historicals running and so long as the amount of data that makes it 
to the Broker is relatively small.
   
   Druid has a lot of functionality that is designed to make this single-stage 
distributed query stack as useful as possible. Whenever possible, we push down 
projections, filters, aggregations, limits, and joins. We also have a suite of 
builtin approximations, like approximate topN and various sketches, that people 
can use to minimize the amount of state that must be tracked beyond the leaf 
servers.
   
   But there are some cases where this query stack doesn't work well:
   
   1. Queries with very large result sets. Imagine a GROUP BY query with no 
LIMIT that would return billions of rows, because it groups on something with 
very high cardinality. Today these result sets must flow through the Broker, 
which creates a bottleneck.
   2. Additional SQL support. We'd like to keep expanding Druid's ability to 
support SQL until we can do it all. Some operations that'd we like to support, 
like joining together two distributed tables, cannot be done in the 
single-stage design.
   3. Complex query structures. Today, a query with lots of JOINs and 
subqueries will execute certain queries in a fully distributed manner -- 
basically, anything that can be represented as a base distributed table joined 
with broadcasted tables -- and then finish the remainder of query processing on 
the Broker. In some cases this works OK, because most of the work can be done 
as part of the distributed queries. But in other cases it results in a lot of 
data being collected on the Broker, which leads to an error like "Subquery 
generated results beyond maximum".
   4. Ingestion. Some databases use their query stack to handle ingestion. It 
makes sense if you think about an ingestion as a query that writes its results 
to a table. It would be nice if we could do this too, so we wouldn't need to 
maintain an ingestion execution stack that is separate from the query execution 
stack.
   
   ## Design
   
   There are a lot of interesting pieces here, so I just want to touch on each 
one in this main proposal. Each one should then be fleshed out separately.
   
   ### Query representation and SQL planning
   
   **Background**
   
   Druid models native queries as "fat" query types that represent a full 
distributed query. There are four query types used by the SQL engine: scan, 
timeseries, topN, and groupBy. Each query type represents an entire 
single-stage distributed computation: there is a first piece that can run 
distributed, and a second piece that must run on the Broker. They all 
internally handle filtering, projection, sorting, limiting, etc. They all have 
a "dataSource" field that describes where they should get their data from, 
which can be "table", representing an actual Druid datasource; "query", 
representing a subquery, and which can be any other query type; "join", 
representing two other datasources joined together; or a handful of other 
less-common datasource types.
   
   The SQL planner's main job is to morph a tree of relational operators either 
a single native query or a tree of native queries. In the latter case, it uses 
"query" or "join" datasources to link the native queries together. It has 
limited flexibility in how it does this, because native queries have a rigid 
computational structure: they always do broadcast join first, then projection, 
then filter, then aggregation, then sort.
   
   **Proposal**
   
   To support ever-more complex query structures, we will need a way of 
representing data flow that is more flexible than native queries. The tried and 
true DAG approach will work well here. So I propose that the multi-stage engine 
should model queries as a DAG of "stages". Each stage:
   
   - Runs distributed across as many servers as makes sense.
   - May consume partitioned data from some set of input stages. There will 
always be some stages with no input stages; these stages would be reading from 
Druid segments, external data, or something like that.
   - Produces data either aligned with its input partitions, or reshuffled in 
some way.
   
   The stages would be finer-grained than native queries. For example, a stage 
might do scan-filter-project, or aggregate, or sort, or limit. This is a common 
approach in relational databases.
   
   As a first step, I suggest we keep native queries in the picture by 
translating SQL -> native query -> DAG-based query. This can be done with 
minimal changes to the SQL planner, and would enable classic native queries to 
exist at the same time as DAG-based queries without much code duplication. But 
at some point, we'll want to go directly from SQL -> DAG-based query, since 
that will give the SQL planner more flexibility to reorder operations.
   
   Here's an example of how a query would look. This SQL query:
   
   ```
   SELECT
     session,
     COUNT(*) AS cnt
   FROM tbl
   WHERE browser = 'Firefox'
   GROUP BY session
   ORDER BY cnt DESC
   LIMIT 10
   ```
   
   Would have five stages:
   
   1. Scan: Read `tbl`, apply the filter `browser = 'Firefox'`, and project out 
`session`. No shuffle.
   2. Aggregate I: Locally group by `session` and compute `count(*)` for each. 
Shuffle by `session`.
   3. Aggregate II: Continue grouping by `session` within each partition, and 
sum the partial counts to get full counts. No shuffle. This produces a fully 
grouped resultset, still partitioned by `session`.
   4. Sort: Locally order by `cnt DESC`. Shuffle everything into a single 
partition.
   5. Limit: Take the first 10 rows from the single partition generated by the 
prior stage.
   
   When stages connect without shuffling, we can pipeline execution locally on 
the same servers, so there is no need for buffering or cross-server traffic. 
When stages connect _with_ shuffling, we'll need to exchange data across 
servers. Depending on the needs of the query, the producing stage could stream 
to the consuming stage, or the producing stage could buffer up all results 
before the consuming stage starts.
   
   For the query above, if we know (or are willing to bet) that there are not 
too many distinct `session` values then we can run it just as efficiently as 
the current single-stage approach. First, we'll make sure that Scan and 
Aggregate I are scheduled on the same set of workers, so the output of Scan can 
be pipelined into Aggregate I locally in memory. Then, we'll configure the 
shuffle in Aggregate I to shuffle everything down to a single partition. Next, 
we'll make sure that Aggregate II, Sort, and Limit all run on the same server. 
That server would be responsible for gathering all the partitioned Aggregate I 
outputs and preparing the final result. Finally, we'll set things up so all 
stages run concurrently, and so Aggregate II streams from Aggregate I. Put 
together, this is exactly what the Historicals and Brokers do today.
   
   ### Ingestion
   
   In #11929 there is a proposal for adding a SQL INSERT statement. It could be 
implemented on top of our existing batch ingestion tasks: the SQL layer could 
convert relational operators to indexing tasks just as well as it can convert 
them to native queries. But I think it would be nicer to implement it on top of 
a multi-stage query stack.
   
   We'd just need to do two things:
   
   1. Provide a way for the query stack to read external data. There's a 
natural way to do this through an "external" DataSource that maps onto 
RowBasedSegments. It would be similar to how we plug lookups into the query 
engines, except a little more complicated because we'll need to split the 
external datasource before sending it down to various servers.
   2. Provide a way for the query stack to generate segments. Once we have this 
multi-stage structure this is also natural: we can add a final stage to any 
query that shuffles data to match the target segment size and then generates 
and publishes segments.
   
   So I propose that we work towards this instead of having the SQL planner 
generate batch indexing tasks. It has a nice side benefit: part (1) alone means 
that we'd be able to query external data in a regular query too. I don't think 
that'll be a core use case for Druid, but it has some usefulness, like 
previewing what an INSERT _might_ do, or doing an ad-hoc join of Druid 
datasources with some external data.
   
   We'll also need to figure out what to do about streaming ingest at some 
point. I'm not sure what to do there but I think there are a few options that 
make sense. Even if the multi-stage query stack doesn't have builtin support 
for streaming queries, we can layer streamingness on top in the same way that 
we do today with indexing tasks: there is a supervisor that manages a series of 
tasks, each of which reads a chunk of data from Kafka and then publishes it.
   
   ### Server setup
   
   There is an obvious question: where will this query engine run? Today we 
have Brokers, Historicals, and indexing stuff (MiddleManager or Indexers). I 
don't want to add a net new service or process type in the long run, because we 
have a lot already. But I think in the short run we should add a new process 
type.
   
   I think at first the Broker should remain in charge of SQL planning, and may 
route certain queries to these new multi-stage query processes if they are 
running in that particular cluster. It could do this based on user request 
(like a context parameter) or based on aspects of the query (like presence of 
INSERT, or external data, or a "complex enough" query).
   
   However, like I said, in the long run I don't think it's a good plan to have 
an extra process type on top of all the ones we already have. So I think in the 
long run we should shoot for this process type actually being able to serve as 
a replacement for the Broker, Historicals, and MMs/Indexers. By that I mean 
that in the fullness of time, it should be able to:
   
   - Receive and plan SQL queries, like a Broker.
   - Cache segments locally, like a Historical.
   - Run simple single-stage queries in an extremely efficient way, like the 
Historical + Broker combo.
   - Run ingestions (INSERT statements) in a distributed and scalable manner, 
like MMs or Indexers.
   
   At that point, a small Druid cluster would be pretty simple: just a 
Coordinator and this new process type (or a few of them for scale-out). And a 
large cluster could still specialize. Imagine setting up a tier of these new 
processes that cache segments but don't perform ingestion, and another tier 
that _don't_ cache segments but _do_ perform ingestion. That's similar to 
setting up Historicals and MMs today, but more configuration-driven. It would 
allow simple clusters to be simple and complex clusters to be complex.
   
   ## Code changes
   
   A sketch of the code changes I think we'll need to make the above work out:
   
   1. Low-level mechanism for shuffling data between servers. This should 
include an efficient way to transfer large amounts of data over the network. 
Currently we use Smile-over-HTTP for this, which works well for relatively 
small amounts of data, but I think we can do better. It should also include 
both a pipelined implementation (producing stage streams to the consuming 
stage), and a buffered implementation (producing stage generates all data 
before consuming stage starts) for operations that cannot be pipelined.
   2. Cluster-level execution coordinator for a multi-stage query.
   3. Server-local execution coordinator for a single stage of a multi-stage 
query.
   4. An actual process type for the above three things to run in.
   5. Converter that generates multi-stage (DAG-based) queries from today's 
native query types. (This enables hooking into the existing SQL layer, since it 
generates native queries.)
   6. To make ingestion-through-query work: adapters from external data into 
the query stack, and from the query stack to the segment-generation code.
   
   At Imply we've started to prototype a multi-stage query engine with the 
above design. We've implemented some of the pieces: not all of them, but enough 
to perform some basic queries. At this point I thought it would be a good time 
to open up a discussion with the wider community, since as we continue working 
on this stuff, we'd like to integrate it into Apache Druid.
   
   ## Rationale
   
   A.K.A. the "why not" section.
   
   **Why not integrate with an existing open-source query engine?**
   
   I wouldn't want to stop anyone from integrating with another open-source 
query engine. Some people might prefer to deploy that way.
   
   But I think we will want this kind of functionality as something native in 
Druid, for two reasons. First: I think the user experience will be nicer if we 
don't require a dependency on another system. Second: I expect there will be 
opportunities for optimization that we can use if everything is built in to 
Druid, and that would be tough to implement with an external query stack.
   
   Besides, the work we'll need to do in order to build a multi-stage query 
engine will also benefit integrations with other engines.
   
   **Why not gradually add multi-stage capabilities to the existing query 
stack?**
   
   I think it's a question of complexity.
   
   The core query stack has three main hook points: individual segment scan, 
result merge on Historicals, and result merge on Broker. The implementations of 
the native query types, especially groupBy, have got quite complex over the 
years as new features have been added and needed to be mapped onto these hook 
points. For example: subtotals, ordering, limiting, having, are all typically 
done as part of "result merge on the Broker". The hook points are doing more 
work than originally envisioned, which makes the code more difficult to follow 
and extend.
   
   So I think it's time to try a different approach with the query stack, 
rather than adding new capabilities. As we create a DAG-based query stack, we 
can split queries up into stages and move more of the "fancy stuff" to the 
framework, which will simplify the query-specific logic.
   
   We can still share code, though. It won't be 100% new. We can share all of 
the StorageAdapter stuff (cursors, etc), the ColumnSelectorFactory stuff, and 
the single-segment processing code for groupBy and topN. This lower-level code 
is super optimized and works well. It's more the mid/high level stuff that IMO 
would benefit from a different approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to