gianm opened a new issue #12262:
URL: https://github.com/apache/druid/issues/12262
This proposal is about extending Druid to support multi-stage distributed
queries, something that I think will be really exciting and amp up what Druid
is capable of.
## Motivation
Today Druid's distributed query stack is single-stage: the Broker receives a
query, slices it up into pieces, and sends one piece to each Historical. The
Historicals produce partial results for their piece of the query, send them
back to the Broker, and the Broker merges them into the final result set.
This venerable design works well when the bulk of processing can be done at
the leaf servers that store the actual data. It scales well: Druid can handle
thousands of QPS of queries, very CPU and memory efficiently, so long as there
are enough Historicals running and so long as the amount of data that makes it
to the Broker is relatively small.
Druid has a lot of functionality that is designed to make this single-stage
distributed query stack as useful as possible. Whenever possible, we push down
projections, filters, aggregations, limits, and joins. We also have a suite of
builtin approximations, like approximate topN and various sketches, that people
can use to minimize the amount of state that must be tracked beyond the leaf
servers.
But there are some cases where this query stack doesn't work well:
1. Queries with very large result sets. Imagine a GROUP BY query with no
LIMIT that would return billions of rows, because it groups on something with
very high cardinality. Today these result sets must flow through the Broker,
which creates a bottleneck.
2. Additional SQL support. We'd like to keep expanding Druid's ability to
support SQL until we can do it all. Some operations that'd we like to support,
like joining together two distributed tables, cannot be done in the
single-stage design.
3. Complex query structures. Today, a query with lots of JOINs and
subqueries will execute certain queries in a fully distributed manner --
basically, anything that can be represented as a base distributed table joined
with broadcasted tables -- and then finish the remainder of query processing on
the Broker. In some cases this works OK, because most of the work can be done
as part of the distributed queries. But in other cases it results in a lot of
data being collected on the Broker, which leads to an error like "Subquery
generated results beyond maximum".
4. Ingestion. Some databases use their query stack to handle ingestion. It
makes sense if you think about an ingestion as a query that writes its results
to a table. It would be nice if we could do this too, so we wouldn't need to
maintain an ingestion execution stack that is separate from the query execution
stack.
## Design
There are a lot of interesting pieces here, so I just want to touch on each
one in this main proposal. Each one should then be fleshed out separately.
### Query representation and SQL planning
**Background**
Druid models native queries as "fat" query types that represent a full
distributed query. There are four query types used by the SQL engine: scan,
timeseries, topN, and groupBy. Each query type represents an entire
single-stage distributed computation: there is a first piece that can run
distributed, and a second piece that must run on the Broker. They all
internally handle filtering, projection, sorting, limiting, etc. They all have
a "dataSource" field that describes where they should get their data from,
which can be "table", representing an actual Druid datasource; "query",
representing a subquery, and which can be any other query type; "join",
representing two other datasources joined together; or a handful of other
less-common datasource types.
The SQL planner's main job is to morph a tree of relational operators either
a single native query or a tree of native queries. In the latter case, it uses
"query" or "join" datasources to link the native queries together. It has
limited flexibility in how it does this, because native queries have a rigid
computational structure: they always do broadcast join first, then projection,
then filter, then aggregation, then sort.
**Proposal**
To support ever-more complex query structures, we will need a way of
representing data flow that is more flexible than native queries. The tried and
true DAG approach will work well here. So I propose that the multi-stage engine
should model queries as a DAG of "stages". Each stage:
- Runs distributed across as many servers as makes sense.
- May consume partitioned data from some set of input stages. There will
always be some stages with no input stages; these stages would be reading from
Druid segments, external data, or something like that.
- Produces data either aligned with its input partitions, or reshuffled in
some way.
The stages would be finer-grained than native queries. For example, a stage
might do scan-filter-project, or aggregate, or sort, or limit. This is a common
approach in relational databases.
As a first step, I suggest we keep native queries in the picture by
translating SQL -> native query -> DAG-based query. This can be done with
minimal changes to the SQL planner, and would enable classic native queries to
exist at the same time as DAG-based queries without much code duplication. But
at some point, we'll want to go directly from SQL -> DAG-based query, since
that will give the SQL planner more flexibility to reorder operations.
Here's an example of how a query would look. This SQL query:
```
SELECT
session,
COUNT(*) AS cnt
FROM tbl
WHERE browser = 'Firefox'
GROUP BY session
ORDER BY cnt DESC
LIMIT 10
```
Would have five stages:
1. Scan: Read `tbl`, apply the filter `browser = 'Firefox'`, and project out
`session`. No shuffle.
2. Aggregate I: Locally group by `session` and compute `count(*)` for each.
Shuffle by `session`.
3. Aggregate II: Continue grouping by `session` within each partition, and
sum the partial counts to get full counts. No shuffle. This produces a fully
grouped resultset, still partitioned by `session`.
4. Sort: Locally order by `cnt DESC`. Shuffle everything into a single
partition.
5. Limit: Take the first 10 rows from the single partition generated by the
prior stage.
When stages connect without shuffling, we can pipeline execution locally on
the same servers, so there is no need for buffering or cross-server traffic.
When stages connect _with_ shuffling, we'll need to exchange data across
servers. Depending on the needs of the query, the producing stage could stream
to the consuming stage, or the producing stage could buffer up all results
before the consuming stage starts.
For the query above, if we know (or are willing to bet) that there are not
too many distinct `session` values then we can run it just as efficiently as
the current single-stage approach. First, we'll make sure that Scan and
Aggregate I are scheduled on the same set of workers, so the output of Scan can
be pipelined into Aggregate I locally in memory. Then, we'll configure the
shuffle in Aggregate I to shuffle everything down to a single partition. Next,
we'll make sure that Aggregate II, Sort, and Limit all run on the same server.
That server would be responsible for gathering all the partitioned Aggregate I
outputs and preparing the final result. Finally, we'll set things up so all
stages run concurrently, and so Aggregate II streams from Aggregate I. Put
together, this is exactly what the Historicals and Brokers do today.
### Ingestion
In #11929 there is a proposal for adding a SQL INSERT statement. It could be
implemented on top of our existing batch ingestion tasks: the SQL layer could
convert relational operators to indexing tasks just as well as it can convert
them to native queries. But I think it would be nicer to implement it on top of
a multi-stage query stack.
We'd just need to do two things:
1. Provide a way for the query stack to read external data. There's a
natural way to do this through an "external" DataSource that maps onto
RowBasedSegments. It would be similar to how we plug lookups into the query
engines, except a little more complicated because we'll need to split the
external datasource before sending it down to various servers.
2. Provide a way for the query stack to generate segments. Once we have this
multi-stage structure this is also natural: we can add a final stage to any
query that shuffles data to match the target segment size and then generates
and publishes segments.
So I propose that we work towards this instead of having the SQL planner
generate batch indexing tasks. It has a nice side benefit: part (1) alone means
that we'd be able to query external data in a regular query too. I don't think
that'll be a core use case for Druid, but it has some usefulness, like
previewing what an INSERT _might_ do, or doing an ad-hoc join of Druid
datasources with some external data.
We'll also need to figure out what to do about streaming ingest at some
point. I'm not sure what to do there but I think there are a few options that
make sense. Even if the multi-stage query stack doesn't have builtin support
for streaming queries, we can layer streamingness on top in the same way that
we do today with indexing tasks: there is a supervisor that manages a series of
tasks, each of which reads a chunk of data from Kafka and then publishes it.
### Server setup
There is an obvious question: where will this query engine run? Today we
have Brokers, Historicals, and indexing stuff (MiddleManager or Indexers). I
don't want to add a net new service or process type in the long run, because we
have a lot already. But I think in the short run we should add a new process
type.
I think at first the Broker should remain in charge of SQL planning, and may
route certain queries to these new multi-stage query processes if they are
running in that particular cluster. It could do this based on user request
(like a context parameter) or based on aspects of the query (like presence of
INSERT, or external data, or a "complex enough" query).
However, like I said, in the long run I don't think it's a good plan to have
an extra process type on top of all the ones we already have. So I think in the
long run we should shoot for this process type actually being able to serve as
a replacement for the Broker, Historicals, and MMs/Indexers. By that I mean
that in the fullness of time, it should be able to:
- Receive and plan SQL queries, like a Broker.
- Cache segments locally, like a Historical.
- Run simple single-stage queries in an extremely efficient way, like the
Historical + Broker combo.
- Run ingestions (INSERT statements) in a distributed and scalable manner,
like MMs or Indexers.
At that point, a small Druid cluster would be pretty simple: just a
Coordinator and this new process type (or a few of them for scale-out). And a
large cluster could still specialize. Imagine setting up a tier of these new
processes that cache segments but don't perform ingestion, and another tier
that _don't_ cache segments but _do_ perform ingestion. That's similar to
setting up Historicals and MMs today, but more configuration-driven. It would
allow simple clusters to be simple and complex clusters to be complex.
## Code changes
A sketch of the code changes I think we'll need to make the above work out:
1. Low-level mechanism for shuffling data between servers. This should
include an efficient way to transfer large amounts of data over the network.
Currently we use Smile-over-HTTP for this, which works well for relatively
small amounts of data, but I think we can do better. It should also include
both a pipelined implementation (producing stage streams to the consuming
stage), and a buffered implementation (producing stage generates all data
before consuming stage starts) for operations that cannot be pipelined.
2. Cluster-level execution coordinator for a multi-stage query.
3. Server-local execution coordinator for a single stage of a multi-stage
query.
4. An actual process type for the above three things to run in.
5. Converter that generates multi-stage (DAG-based) queries from today's
native query types. (This enables hooking into the existing SQL layer, since it
generates native queries.)
6. To make ingestion-through-query work: adapters from external data into
the query stack, and from the query stack to the segment-generation code.
At Imply we've started to prototype a multi-stage query engine with the
above design. We've implemented some of the pieces: not all of them, but enough
to perform some basic queries. At this point I thought it would be a good time
to open up a discussion with the wider community, since as we continue working
on this stuff, we'd like to integrate it into Apache Druid.
## Rationale
A.K.A. the "why not" section.
**Why not integrate with an existing open-source query engine?**
I wouldn't want to stop anyone from integrating with another open-source
query engine. Some people might prefer to deploy that way.
But I think we will want this kind of functionality as something native in
Druid, for two reasons. First: I think the user experience will be nicer if we
don't require a dependency on another system. Second: I expect there will be
opportunities for optimization that we can use if everything is built in to
Druid, and that would be tough to implement with an external query stack.
Besides, the work we'll need to do in order to build a multi-stage query
engine will also benefit integrations with other engines.
**Why not gradually add multi-stage capabilities to the existing query
stack?**
I think it's a question of complexity.
The core query stack has three main hook points: individual segment scan,
result merge on Historicals, and result merge on Broker. The implementations of
the native query types, especially groupBy, have got quite complex over the
years as new features have been added and needed to be mapped onto these hook
points. For example: subtotals, ordering, limiting, having, are all typically
done as part of "result merge on the Broker". The hook points are doing more
work than originally envisioned, which makes the code more difficult to follow
and extend.
So I think it's time to try a different approach with the query stack,
rather than adding new capabilities. As we create a DAG-based query stack, we
can split queries up into stages and move more of the "fancy stuff" to the
framework, which will simplify the query-specific logic.
We can still share code, though. It won't be 100% new. We can share all of
the StorageAdapter stuff (cursors, etc), the ColumnSelectorFactory stuff, and
the single-segment processing code for groupBy and topN. This lower-level code
is super optimized and works well. It's more the mid/high level stuff that IMO
would benefit from a different approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]