clintropolis opened a new issue, #17481:
URL: https://github.com/apache/druid/issues/17481
### Motivation
Today Druid rollup tables can be utilized to greatly improve performance by
significantly reducing the overall number of rows to be scanned by
pre-aggregating the data at ingest time. However, this does come at the cost of
forcing operators to make difficult decisions about which columns to include
and at what granularity to perform rollup.
If the detailed table is still needed, then the only current way to possibly
handle this is by having totally separate datasources with different ingestion
configurations, which creates additional complexity for users having to know
which datasources contain which columns at what level of rollup. This can quite
quickly get complicated if multiple views are needed, and is a large burden to
place on users to 'get it right' at query time, not to mention the operational
overhead, and any potential differences in how 'up to date' each individual
view is.
### Proposed changes
This proposal intends to introduce the concept of projections to Druid,
which for the initial implementation are materialized transformations to
pre-aggregate data contained within Druid segments as part of the same
datasource. By building projections for commonly used queries, especially those
having columns with lower value cardinality, we can significantly reduce the
number of rows to be processed by those queries, dramatically lowering both
compute and io cost. This gives us the ability to take advantage of all of the
features of rollup tables, but still retain a fine level of detail when
required, all within the same table in exchange for some extra storage space.
#### dataSchema JSON spec
The intial implementation is focused on the low level mechanics of building
projections for realtime segments and persisting them to immutable segments
with query time support for both states. The implication here is that at first,
projections only can be defined by classic JSON based ingestion (but they can
still be used by queries using MSQ or the new Dart engine). Future development
will allow projections to be created as part of MSQ based ingestion as well.
This approach was chosen to prove the viability of the implementation before
we design a more user friendly (and modern) way to define and manage
projections, which will be detailed in a follow-up proposal once many of the
lower level pieces are in place. Ultimately, MSQ based ingestion goes through
the same code-paths to actually generate segments, and so the work done so far
will be re-usable for SQL inserts and replaces once we finish designing and
implementing the higher level interfaces for defining projections for a table.
To define a projection, a new `projections` section to the `dataSchema` of
JSON based ingestion specs has been added. Since the initial implementation is
focusing on grouping and pre-aggregating, the spec looks kind of similar to
what you might find in the JSON spec for a group by query.
| field | description | required |
| --- | --- | --- |
| `type` | must be `'aggregate'` | yes |
| `name` | The internal name of the projection. This value is not typically
needed to be known by users, but can optionally be specified with the
`useProjection` context flag | yes |
| `virtualColumns` | Virtual columns used to compute the projection. The
inputs to these virtual columns must exist on the base table. Virtual columns
on time can be used to define a query granularity for the projection. During
ingestion the processing logic finds the ‘finest’ granularity virtual column
that is a `timestamp_floor` expression and uses it as the `__time` column for
the projection if specified in the grouping columns. Projections do not need to
have a time column defined, in which case they can still match queries that are
not grouping on time. At query time, the virtual columns can be matched to the
virtual columns of a query, regardless of their internal name in the
projection. | no |
| `groupingColumns` | The grouping columns of the projection. These columns
must exist on the base table, or must be defined in the virtual columns. If
grouping on a virtual time column, the 'finest' granularity time column is used
as the __time column for the projection. If no time column is defined, all rows
of the projection have the same __time value, controlled by the start timestamp
of the segment granularity. The order of the grouping columns define the order
in which data is sorted in the projection, always ascending. | yes |
| `aggregators` | Pre-aggregated columns. The inputs to these must exist on
the base table, or be defined in the `virtualColumns` of the projection. At
query time the aggregators of the projection can be matched to equivalent
aggregators defined in a query, regardless of their name, or any options that
do not impact the 'intermediary' aggregate processing, such as finalization
options.| no |
For example, using the standard wikipedia quickstart data, we could define a
projection that contains the channel and page columns, an HLL sketch of the
users column for a 'distinct count' approximation of which users modified
things for a given channel and page, and the sum of lines added and deleted for
those channels and pages. The projection spec would look like this:
```
"dataSchema": {
"granularitySpec": {
...
},
"dataSource": ...,
"timestampSpec": {
...
},
"dimensionsSpec": {
...
},
"projections": [
{
"type": "aggregate",
"name": "channel_page_hourly_distinct_user_added_deleted",
"groupingColumns": [
{
"type": "long",
"name": "__gran"
},
{
"type": "string",
"name": "channel"
},
{
"type": "string",
"name": "page"
}
],
"virtualColumns": [
{
"type": "expression",
"expression": "timestamp_floor(__time, 'PT1H')",
"name": "__gran",
"outputType": "LONG"
}
],
"aggregators": [
{
"type": "HLLSketchBuild",
"name": "distinct_users",
"fieldName": "user"
},
{
"type": "longSum",
"name": "sum_added",
"fieldName": "added"
},
{
"type": "longSum",
"name": "sum_deleted",
"fieldName": "deleted"
}
]
},
...
]
},
...
```
#### Context flags
There are a few new query context flags which have been added to aid in
experimentation with projections.
* `useProjection` accepts a specific projection name and instructs the query
engine that it must use that projection, and will fail the query if the
projection does not match the query
* `forceProjections` accepts `true` or `false` and instructs the query
engine that it must use a projection, and will fail the query if it cannot find
a matching projection
* `noProjections` accpets `true` or `false` and instructs the query engines
to not use any projections
These were primarily added to aid in testing, but could potentially be used
by advanced users.
#### Query metrics
To help aid in tracking how effective projections are, a `projection`
dimension has been added which contains the name of the projection has been
added. Additionally, methods such as `reportSegmentRows` and
`reportPreFilteredRows` will reflect the values of using the projection rather
than the base table since the projection is substituted at query processing
time.
#### How it works
Projections required a number of internal changes to set the stage for
implementing the feature. #16533 rather heavily refactored how `Cursor` and
`VectorCursor` were created at query time by pushing in a new `CursorBuildSpec`
object which contains a much more rich set of data describing the 'shape' of
what data is required to be read by the cursor, which was key to implemeting
projections at query time. #17058 added some hooks into the `CursorHolder`
interface so it could report if data is available in a pre-aggregated state for
a given `CursorBuildSpec`, allowing query engines to switch to using
'combining' aggregators if such pre-aggregated data exists. #17064 prepared
`IncrementalIndex` by introducing an abstraction allowing for different views
of the same data to be the backing store for cursors and column selector
factories, and #17084 added the means of finding 'equivalent' virtual columns,
so that a projection could be allowed to precompute some virtual column and
still ma
tch at query time.
Projections are built on the fly by the `IncrementalIndex`. As a row is
processed for the base schema, it is also processed by each defined projection.
All grouping columns defined on the projection either have a hook into the
'parent' column indexer if it exists as a physical column on the base table,
else a projection only indexer is used if the column is a virtual column that
only exists on the projection. Each projection has its own 'facts table', and
all will add to the estimated memory usage so that the extra overhead of
projections is accounted for.
When persisted, projections are stored as additional internal columns in a
segment, together which effectively form an embedded table for the projection.
Grouping columns which have a 'parent' column on the base table will share some
parts, such as the value dictionary, but have their own dictionary id column
file and their own bitmap indexes.
Querying currently works for both realtime and historical segments works
pretty similarly; projections are ordered by 'best', where 'best' is defined
has having the lowest number of rows to scan (or estimated lowest number for
realtime, based on fewest number of grouping columns). `CursorFactory`
implementations iterate over this sorted list to check if the projection
matches a given `CursorBuildSpec` where matches is defined as having all
required grouping columns, virtual columns, and aggregators, and that the query
granularity is fine enough. If a match is found, then that projection will be
substituted as the `CursorHolder`, and the query engine can rewrite the query
to be prepared to process pre-aggregated data.
### Rationale
The more traditional way to achieve this is with materialized views, however
we decided to implement projections instead for a number of reasons.
Materialized views are typically done as separate physical tables, and in a
database like Druid, would bring with them a lot of very complicated
synchronization problems to solve. Projections on the other hand, live inside
of a regular Druid segment, and as such are constrained to be within the same
interval as the segment.
Users will not typically query a projection directly, or even need to know
they exist, rather they will be used automatically at query time if the
projection 'fits' the query being issued. If the projection does not exist in
some of the segments, such as if they are added later or only in some of the
segments, then the query can always fall back to the base table to compute
everything that would have been in the projection if it existed. This model is
also what makes them dramatically easier to implement than traditional
materialized views, and frankly, a lot easier to take advantage of for users,
because the burden of knowing which view to query simply goes away.
### Operational impact
This feature was designed to be as isolated as possible - projections should
not impact ingestion unless they are specified, projections do not affect query
performance unless they exist within segments, projections do no affect segment
load unless they are defined.
The ingest time impact, if projections are defined, is likely more frequent
persists when using byte based incremental persist limits since projections
will add to the total tracked byte limit. Using row count based incremental
persist limits will be quite tough to use effectively with projections, as
those limits will only consider the base table, so there is likely an increased
likelyhood of running out of heap memory while building incremental segments.
On the historical side, projections being defined in segments will cause a
slight increase in heap pressure from the additional column suppliers
associated with the projections, though approximately the same as the overhead
of additional columns on the base table. The more projections (e.g. the more
columns) which are defined will increase how pronounced this issue is, so to be
conservative, operators should monitor the heap overhead of segments which have
been loaded 'at rest'.
Projections also obviously will require additional disk space, though it
should be smaller than if the equivalent data was defined within a totally
separate segment, as value dictionaries are shared between the base table and
the projection tables within the same segment.
### Test plan
I have access to some clusters which I plan to begin doing some larger scale
tests to determine the effectiveness of projections on cluster performance,
other than that the feature is pretty self contained, so is low risk to
existing deployments.
### Future work
#### Projection introspection
The initial prototype of projections has very low visibility for users.
Basically, there is no way to determine if projections are defined at all, nor
a way to determine which segments have the projections and which do not. I
would like to add a friendly way to expose this, such as via
`INFORMATION_SCHEMA` or `sys`. Internally this will likely be done by expanding
the native `segmentMetadata` query to include information about projections. I
am unsure if there is work to due to make this integrated with the
'centralized' schema work that has been happening, I need to catch up on the
state of that project to determine what (if anything) needs done to work ok
with that stuff.
#### MSQ support for creating projections
This part is still being designed and will be the subject of a subsequent
proposal. We have a couple of options here. My preference would be to build out
a catalog and make the management of projection schemas part of this. I think
ideally we would probably inject some catalog supplier into all kinds of
ingestion tasks so that the set of projections could be fetched and applied
when building segments. Eventually this feels like it should
The other option is to design some SQL syntax for adding projections as part
of INSERT/REPLACE statements. This would work, but it feels a bit clunky to me
as well when compared to the catalog based solution.
#### Auto-compaction (or some other mechanism?) to apply projection changes
to existing segments
In the prototype, auto-compaction is not yet aware of projections, so the
first step will be updating auto-compaction to preserve existing projections.
Beyond this, I am imagining either auto-compaction, or something very similar
will be used to apply projections to existing.
#### filters
This is likely the near the top of the list of the follow-up work, adding
support for filtering projections to reduce row counts even further. The
primary reason that they are not yet included in the design is because I am
still deciding how exactly I'd like them to work.
We could just make it work the same way as this initial implementation, but
the downside of this is that if the filtering dramatically reduces the value
dictionary sizes of the projection columns, we still must do stuff like binary
search against the parent dictionary in order to locate values.
Another option at the cost of a bit of extra complexity, is to do something
like we do with JSON columns, where projection columns would have a mapping
dictionary of integer to integer, so that a smaller dictionary could be used
for operations and a second lookup operation performed to substitute the
projection dictionary id with the parent dictionary id to lookup the actual
value. This would speed up the binary search used to find values for filtering,
and I think is worth considering, and could re-use a similar strategy that is
used by JSON columns.
#### More flexible projection row ordering
The initial design and prototype behaves a lot like top level druid rollup
tables for ordering, that is the ordering is defined by the order the grouping
columns appear in the spec, always in ascending order. However, I believe it
should be possible to open this up a bit to allow defining different orderings,
without a lot of extra work. However, this was not yet done for the initial
design because there is a larger external piece required to actually take
advantage of these more flexible orderings, that is making the query engines
handling of segment ordering much more sophisticated beyond the current uses
(timeseries checking that the segment is time ordered). In the future, query
engines should be able to specify a preferred ordering based on the query being
issued by the user, and if the segment ordering matches, the query engine
should be able to take advantage of this fact to reduce the amount of work
required for processing at the segment level. This work isn't specific to
projections, and does lead nicely to the next 'future work' i'd like to talk
about:
#### better projection matching
The comparator does ok and has worked well enough to get this feature off
the ground, but is obviously not the best solution long term for matching
projections. I do not yet know what is the best solution however, so this
remains an area ripe for exploration so we can drop the current big for loop
over the ordered set that is used to find a matching projection at query time.
#### larger scale refactors to continue reducing the prominence of
granularity and the time column
There are a number of what I would consider "ugly" things which were done in
the initial prototype of projections in order to work well with the current
query engines, a big one of those revolves around the special Druid time
column. We have been doing some recent work to downplay the prominence of the
special time column, for example we now allow segments to be ordered no longer
by time first. Projections carry this idea a bit further by making a time
column completely optional on projections, though this is done by making a
constant time column, since in many places we still expect a __time column to
exist.
#### new segment format
This PR is very conservative in terms of segment format changes, going to
pretty great lengths to work within the constraints of what is currently
possible with the Druid V9 segment format, however a number of pretty ugly
things have to be done that I think could be done a lot better. I will have a
proposal for what I have in mind for this soon.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]