clintropolis opened a new issue, #17481:
URL: https://github.com/apache/druid/issues/17481

   ### Motivation
   
   Today Druid rollup tables can be utilized to greatly improve performance by 
significantly reducing the overall number of rows to be scanned by 
pre-aggregating the data at ingest time. However, this does come at the cost of 
forcing operators to make difficult decisions about which columns to include 
and at what granularity to perform rollup.
   
   If the detailed table is still needed, then the only current way to possibly 
handle this is by having totally separate datasources with different ingestion 
configurations, which creates additional complexity for users having to know 
which datasources contain which columns at what level of rollup. This can quite 
quickly get complicated if multiple views are needed, and is a large burden to 
place on users to 'get it right' at query time, not to mention the operational 
overhead, and any potential differences in how 'up to date' each individual 
view is.
   
   
   ### Proposed changes
   
   This proposal intends to introduce the concept of projections to Druid, 
which for the initial implementation are materialized transformations to 
pre-aggregate data contained within Druid segments as part of the same 
datasource. By building projections for commonly used queries, especially those 
having columns with lower value cardinality, we can significantly reduce the 
number of rows to be processed by those queries, dramatically lowering both 
compute and io cost. This gives us the ability to take advantage of all of the 
features of rollup tables, but still retain a fine level of detail when 
required, all within the same table in exchange for some extra storage space.
   
   #### dataSchema JSON spec
   The intial implementation is focused on the low level mechanics of building 
projections for realtime segments and persisting them to immutable segments 
with query time support for both states. The implication here is that at first, 
projections only can be defined by classic JSON based ingestion (but they can 
still be used by queries using MSQ or the new Dart engine). Future development 
will allow projections to be created as part of MSQ based ingestion as well.
   
   This approach was chosen to prove the viability of the implementation before 
we design a more user friendly (and modern) way to define and manage 
projections, which will be detailed in a follow-up proposal once many of the 
lower level pieces are in place. Ultimately, MSQ based ingestion goes through 
the same code-paths to actually generate segments, and so the work done so far 
will be re-usable for SQL inserts and replaces once we finish designing and 
implementing the higher level interfaces for defining projections for a table.
   
   To define a projection, a new `projections` section to the `dataSchema` of 
JSON based ingestion specs has been added. Since the initial implementation is 
focusing on grouping and pre-aggregating, the spec looks kind of similar to 
what you might find in the JSON spec for a group by query.
   
   | field | description | required |
   | --- | --- | --- |
   | `type` | must be `'aggregate'` | yes |
   | `name` | The internal name of the projection. This value is not typically 
needed to be known by users, but can optionally be specified with the 
`useProjection` context flag | yes |
   | `virtualColumns` | Virtual columns used to compute the projection. The 
inputs to these virtual columns must exist on the base table. Virtual columns 
on time can be used to define a query granularity for the projection. During 
ingestion the processing logic finds the ‘finest’ granularity virtual column 
that is a `timestamp_floor` expression and uses it as the `__time` column for 
the projection if specified in the grouping columns. Projections do not need to 
have a time column defined, in which case they can still match queries that are 
not grouping on time. At query time, the virtual columns can be matched to the 
virtual columns of a query, regardless of their internal name in the 
projection. | no |
   | `groupingColumns` | The grouping columns of the projection. These columns 
must exist on the base table, or must be defined in the virtual columns. If 
grouping on a virtual time column, the 'finest' granularity time column is used 
as the __time column for the projection. If no time column is defined, all rows 
of the projection have the same __time value, controlled by the start timestamp 
of the segment granularity. The order of the grouping columns define the order 
in which data is sorted in the projection, always ascending. | yes |
   | `aggregators` | Pre-aggregated columns. The inputs to these must exist on 
the base table, or be defined in the `virtualColumns` of the projection. At 
query time the aggregators of the projection can be matched to equivalent 
aggregators defined in a query, regardless of their name, or any options that 
do not impact the 'intermediary' aggregate processing, such as finalization 
options.| no |
   
   For example, using the standard wikipedia quickstart data, we could define a 
projection that contains the channel and page columns, an HLL sketch of the 
users column for a 'distinct count' approximation of which users modified 
things for a given channel and page, and the sum of lines added and deleted for 
those channels and pages. The projection spec would look like this:
   
   ```   
       "dataSchema": {
         "granularitySpec": {
           ...
         },
         "dataSource": ...,
         "timestampSpec": {
           ...
         },
         "dimensionsSpec": {
           ...
         },
         "projections": [
           {
             "type": "aggregate",
             "name": "channel_page_hourly_distinct_user_added_deleted",
             "groupingColumns": [
               {
                 "type": "long",
                 "name": "__gran"
               },
               {
                 "type": "string",
                 "name": "channel"
               },
               {
                 "type": "string",
                 "name": "page"
               }
             ],
             "virtualColumns": [
               {
                 "type": "expression",
                 "expression": "timestamp_floor(__time, 'PT1H')",
                 "name": "__gran",
                 "outputType": "LONG"
               }
             ],
             "aggregators": [
               {
                 "type": "HLLSketchBuild",
                 "name": "distinct_users",
                 "fieldName": "user"
               },
               {
                 "type": "longSum",
                 "name": "sum_added",
                 "fieldName": "added"
               },
               {
                 "type": "longSum",
                 "name": "sum_deleted",
                 "fieldName": "deleted"
               }
             ]
           },
           ...
         ]
       },
       ...
   ```
   
   #### Context flags
   There are a few new query context flags which have been added to aid in 
experimentation with projections. 
   * `useProjection` accepts a specific projection name and instructs the query 
engine that it must use that projection, and will fail the query if the 
projection does not match the query
   * `forceProjections` accepts `true` or `false` and instructs the query 
engine that it must use a projection, and will fail the query if it cannot find 
a matching projection
   * `noProjections` accpets `true` or `false` and instructs the query engines 
to not use any projections
   
   These were primarily added to aid in testing, but could potentially be used 
by advanced users.
   
   #### Query metrics
   To help aid in tracking how effective projections are, a `projection` 
dimension has been added which contains the name of the projection has been 
added. Additionally, methods such as `reportSegmentRows` and 
`reportPreFilteredRows` will reflect the values of using the projection rather 
than the base table since the projection is substituted at query processing 
time.
   
   
   #### How it works
   Projections required a number of internal changes to set the stage for 
implementing the feature. #16533 rather heavily refactored how `Cursor` and 
`VectorCursor` were created at query time by pushing in a new `CursorBuildSpec` 
object which contains a much more rich set of data describing the 'shape' of 
what data is required to be read by the cursor, which was key to implemeting 
projections at query time. #17058 added some hooks into the `CursorHolder` 
interface so it could report if data is available in a pre-aggregated state for 
a given `CursorBuildSpec`, allowing query engines to switch to using 
'combining' aggregators if such pre-aggregated data exists. #17064 prepared 
`IncrementalIndex` by introducing an abstraction allowing for different views 
of the same data to be the backing store for cursors and column selector 
factories, and #17084 added the means of finding 'equivalent' virtual columns, 
so that a projection could be allowed to precompute some virtual column and 
still ma
 tch at query time.
   
   Projections are built on the fly by the `IncrementalIndex`. As a row is 
processed for the base schema, it is also processed by each defined projection. 
All grouping columns defined on the projection either have a hook into the 
'parent' column indexer if it exists as a physical column on the base table, 
else a projection only indexer is used if the column is a virtual column that 
only exists on the projection. Each projection has its own 'facts table', and 
all will add to the estimated memory usage so that the extra overhead of 
projections is accounted for.
   
   When persisted, projections are stored as additional internal columns in a 
segment, together which effectively form an embedded table for the projection. 
Grouping columns which have a 'parent' column on the base table will share some 
parts, such as the value dictionary, but have their own dictionary id column 
file and their own bitmap indexes.
   
   Querying currently works for both realtime and historical segments works 
pretty similarly; projections are ordered by 'best', where 'best' is defined 
has having the lowest number of rows to scan (or estimated lowest number for 
realtime, based on fewest number of grouping columns). `CursorFactory` 
implementations iterate over this sorted list to check if the projection 
matches a given `CursorBuildSpec` where matches is defined as having all 
required grouping columns, virtual columns, and aggregators, and that the query 
granularity is fine enough. If a match is found, then that projection will be 
substituted as the `CursorHolder`, and the query engine can rewrite the query 
to be prepared to process pre-aggregated data.
   
   ### Rationale
   
   The more traditional way to achieve this is with materialized views, however 
we decided to implement projections instead for a number of reasons. 
Materialized views are typically done as separate physical tables, and in a 
database like Druid, would bring with them a lot of very complicated 
synchronization problems to solve. Projections on the other hand, live inside 
of a regular Druid segment, and as such are constrained to be within the same 
interval as the segment.
   
   Users will not typically query a projection directly, or even need to know 
they exist, rather they will be used automatically at query time if the 
projection 'fits' the query being issued. If the projection does not exist in 
some of the segments, such as if they are added later or only in some of the 
segments, then the query can always fall back to the base table to compute 
everything that would have been in the projection if it existed. This model is 
also what makes them dramatically easier to implement than traditional 
materialized views, and frankly, a lot easier to take advantage of for users, 
because the burden of knowing which view to query simply goes away.
   
   
   ### Operational impact
   
   This feature was designed to be as isolated as possible - projections should 
not impact ingestion unless they are specified, projections do not affect query 
performance unless they exist within segments, projections do no affect segment 
load unless they are defined.
   
   The ingest time impact, if projections are defined, is likely more frequent 
persists when using byte based incremental persist limits since projections 
will add to the total tracked byte limit. Using row count based incremental 
persist limits will be quite tough to use effectively with projections, as 
those limits will only consider the base table, so there is likely an increased 
likelyhood of running out of heap memory while building incremental segments.
   
   On the historical side, projections being defined in segments will cause a 
slight increase in heap pressure from the additional column suppliers 
associated with the projections, though approximately the same as the overhead 
of additional columns on the base table. The more projections (e.g. the more 
columns) which are defined will increase how pronounced this issue is, so to be 
conservative, operators should monitor the heap overhead of segments which have 
been loaded 'at rest'.
   
   Projections also obviously will require additional disk space, though it 
should be smaller than if the equivalent data was defined within a totally 
separate segment, as value dictionaries are shared between the base table and 
the projection tables within the same segment.
   
   ### Test plan
   I have access to some clusters which I plan to begin doing some larger scale 
tests to determine the effectiveness of projections on cluster performance, 
other than that the feature is pretty self contained, so is low risk to 
existing deployments.
   
   
   ### Future work
   
   #### Projection introspection
   The initial prototype of projections has very low visibility for users. 
Basically, there is no way to determine if projections are defined at all, nor 
a way to determine which segments have the projections and which do not. I 
would like to add a friendly way to expose this, such as via 
`INFORMATION_SCHEMA` or `sys`. Internally this will likely be done by expanding 
the native `segmentMetadata` query to include information about projections. I 
am unsure if there is work to due to make this integrated with the 
'centralized' schema work that has been happening, I need to catch up on the 
state of that project to determine what (if anything) needs done to work ok 
with that stuff.
   
   #### MSQ support for creating projections
   This part is still being designed and will be the subject of a subsequent 
proposal. We have a couple of options here. My preference would be to build out 
a catalog and make the management of projection schemas part of this. I think 
ideally we would probably inject some catalog supplier into all kinds of 
ingestion tasks so that the set of projections could be fetched and applied 
when building segments. Eventually this feels like it should 
   
   The other option is to design some SQL syntax for adding projections as part 
of INSERT/REPLACE statements. This would work, but it feels a bit clunky to me 
as well when compared to the catalog based solution.
   
   #### Auto-compaction (or some other mechanism?) to apply projection changes 
to existing segments
   In the prototype, auto-compaction is not yet aware of projections, so the 
first step will be updating auto-compaction to preserve existing projections. 
Beyond this, I am imagining either auto-compaction, or something very similar 
will be used to apply projections to existing.
   
   
   #### filters
   This is likely the near the top of the list of the follow-up work, adding 
support for filtering projections to reduce row counts even further. The 
primary reason that they are not yet included in the design is because I am 
still deciding how exactly I'd like them to work.
   
   We could just make it work the same way as this initial implementation, but 
the downside of this is that if the filtering dramatically reduces the value 
dictionary sizes of the projection columns, we still must do stuff like binary 
search against the parent dictionary in order to locate values.
   
   Another option at the cost of a bit of extra complexity, is to do something 
like we do with JSON columns, where projection columns would have a mapping 
dictionary of integer to integer, so that a smaller dictionary could be used 
for operations and a second lookup operation performed to substitute the 
projection dictionary id with the parent dictionary id to lookup the actual 
value. This would speed up the binary search used to find values for filtering, 
and I think is worth considering, and could re-use a similar strategy that is 
used by JSON columns.
   
   #### More flexible projection row ordering
   The initial design and prototype behaves a lot like top level druid rollup 
tables for ordering, that is the ordering is defined by the order the grouping 
columns appear in the spec, always in ascending order. However, I believe it 
should be possible to open this up a bit to allow defining different orderings, 
without a lot of extra work. However, this was not yet done for the initial 
design because there is a larger external piece required to actually take 
advantage of these more flexible orderings, that is making the query engines 
handling of segment ordering much more sophisticated beyond the current uses 
(timeseries checking that the segment is time ordered). In the future, query 
engines should be able to specify a preferred ordering based on the query being 
issued by the user, and if the segment ordering matches, the query engine 
should be able to take advantage of this fact to reduce the amount of work 
required for processing at the segment level. This work isn't specific to 
 projections, and does lead nicely to the next 'future work' i'd like to talk 
about:
   
   #### better projection matching
   The comparator does ok and has worked well enough to get this feature off 
the ground, but is obviously not the best solution long term for matching 
projections. I do not yet know what is the best solution however, so this 
remains an area ripe for exploration so we can drop the current big for loop 
over the ordered set that is used to find a matching projection at query time.
   
   #### larger scale refactors to continue reducing the prominence of 
granularity and the time column
   There are a number of what I would consider "ugly" things which were done in 
the initial prototype of projections in order to work well with the current 
query engines, a big one of those revolves around the special Druid time 
column. We have been doing some recent work to downplay the prominence of the 
special time column, for example we now allow segments to be ordered no longer 
by time first. Projections carry this idea a bit further by making a time 
column completely optional on projections, though this is done by making a 
constant time column, since in many places we still expect a __time column to 
exist.
   
   #### new segment format
   This PR is very conservative in terms of segment format changes, going to 
pretty great lengths to work within the constraints of what is currently 
possible with the Druid V9 segment format, however a number of pretty ugly 
things have to be done that I think could be done a lot better. I will have a 
proposal for what I have in mind for this soon.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to