loquisgon opened a new pull request #12137:
URL: https://github.com/apache/druid/pull/12137
# Druid batch ingestion replace high level design
## This PR is draft since only the implementation is coded and not unit or
integration tests not it has received extensive testing yet, The purpose of
this draft PR is to start socializing the design of the new replace
functionality.
## Problem statement
With regular native batch ingestion specs using `appendToExisting` set to
`false` the new data referenced in the spec's `inputIntervals` replaces data in
previously correspondign time chunks but it leaves previous time chunks intact
when the new input data does not overlap with them. This causes some confusion
for customers wanting to replace *all* the data in the input intervals with
just the new data referenced in the ingestion spec's data. This wotk introduces
a new "replace" functionality to solve the need to replace all the existing
data with the new data in th einput intervals.
A *replace batch ingestion spec* (or simply *replace spec*) is a batch
ingestion spec with the following characteristics:
- Input data (the usual stuff)
- Batch ingestion intervals in the ioConfig section of the spec (the usual
stuff) (we call this the "replace intervals")
- A flag in the ioConfig (reuse `replaceExisting`) that indicates that this
is replace of time chunks (this is new)
When the replace spec is executed, the following will happen to the existing
data source's data. Every time chunk that overlap with the replace intervals
(we call this the "replacement time chunks") will all be replaced as follows.
For every one of the replacement time chunks:
- If the input data referenced in the replace spec contains data for that
replacement time chunk, all the previous data in the time chunk will be either
partially overshadowed (when the replace spec's segment granularity is finer)
or completely overshadowed (when the replace spec's segment granularity is the
same or coarser) and the input data will be used as a replacement (usual
stuff). Please note in the case that the replacement spec has finer granularity
and if there are enough replacement time chunks with data for the finer
granularity to cover all the coarser granularity previous data then in this
case previous data is also completely overshadowed.
- If the input data does not contain data for the replacement time chunk
then a tombstone will be created for the replacement time chunk and this
tombstone will either completely overshadow the existing segment or partitally
overshadow it according to the input’s and existing segment’s granularity
(tombstones is a new concept, just a metadata only “empty” segment – no
physical empty segments will be created)
- To save metadata rows the tombstones will be created only if there is a
previously used & visible segment overlapping with the tombstone's interval
The tombstones will be eliminated, removed, according to the usual rules of
when segments get overshadowed, unused etc. If new rules need to be added they
will be added and documented.
## Design idea
The idea is that segment creation for replace ingestion identifies real
segments in input intervals as well as tombstones. The tombstones are
represented as `DataSegment` like real segments but inside its metadata they
are clearly identifiable as a "tombstone". After this point, the publishing (by
overlord) and assignment to historicals (by coordinator) are practically the
same as of today. The only change required in the historical is when a
tombstone is loaded then a special, light simple `QueriableIndex` is created
for the tombstone which is clearly marked as being for a tombstone (a new
default method `default boolean isFromTombstone()` returning `false` by default
will be added to the `QueriableIndex` interface which will return `true` for
the queriable index created for the tombstone. When a query comes to the
historical then the historical's code will create `NoOpQueryRunner`s for each
one of the tombstones in the query.
## Scope
For speed of delivery & proving the concept as quickly as we can we will
restrict this new functionality to `SinglePhaseSubTask` and `IndexTask`.
## Create & publish tombstones
- Figure out what time chunks correspond to the input intervals of the
replace spec
- Subtract from those the ones that are not empty: i.e. any time chunk that
overlaps with a segment produced by the replace spec that has data
- Technically, after the step above we have the intervals that can be used
to create the tombstones. In reality we don't need to create tombstones for
them all. We just need to create tombstones for those tombstone intervals that
"overlap" with existing, used, segments with an immediately older version (i.e.
current segments). So from the intervals above also remove those intervals that
do not overlap with any current, used, visible, segment. We may not do this
optimization in the first version of the code for development speed but we can
do this later.
- Now we have the "tombstone intervals"
- For each of these tombstone intervals create a `DataSegment`. Call these
`DataSegments` tombstones (the `payload` section's `type` can be used to mark
them so)
- After all the tombstones are created, add these segments to the regular
new segments that were created for the ingestion in progress
- Then publish all the augmented set of `DataSegments` as usual
- After publishing *Supervisor* guarantees that all those data segments are
inserted in the metadata (this is current normal behavior for publishing
segments)
## Coordinator assignment
- The coordinator by means of the load rules detects that new segments
exist for the data source. The new segments may contain tombstones, the
coordinator does not care about this and uses its usual assignment strategy to
assign the data source’s segments (some of them are tombstones) to
historically. There may be a need to adjust assignment for tombstones at some
point but initially we may choose just to let it use the current strategy.
## Historical query path
- The historical detects that a new segment has been assigned to it. If the
segment is a tombstone then the historical creates an empty `QueriableIndex`
marked as “tombstone” (using a new `QueriableIndex` method: `boolean
isFromTombstone()`) and announces the tombstone
- When the historical receives a query for the data source, the historical’s
ServerManager detects the tombstone queryable indices (using the
“isFromTombstone” method) and for each of those tombstones it creates a
`NoOpQueryRunner` (this is an existing query runner that always returns an
empty result set, exactly what is needed for tombstones). Thus the part of the
query hitting a tombstone queryable index will return an empty sequence since
that is how the `NoOpQueryRunner` works and it is the expected result set from
a tombstone since tombstones do not have any rows.
## Broker
Broker is unchanged, tombstones look like regulare segments to it.
## Compaction:
- Compacting an existing data source with tombstones proceeds as follows
- The compaction task will traverse the used segments for a data source and
it will simply skip those segments that are not tombstones since they will have
no data. The previous tombstones might be overshadowed depending on the new
compaction granularity or depending on whether there was data appended to the
tombstone time chunk after the initial tombstone creation (in the latter case
the appended data since it is in the same time chunk will completely overshadow
the exiting tombstone). Overshadowed tombstones will be processed exactly as
normal data segments after this.
## Design alternatives
### Create and publish tombstones
Using the existing mechanism for representing tombstones (i.e. `DataSegment`
objects and its representation in the `SEGMENT_METADATA` metadata table) along
with the publishing process seems like a natural fit. Therefore this is the
only option we considered for this phase.
### Integrating tombstones with the query path
Have coordinator announce tombstones and broker takes care of the query path
upon hearing the announcements
When the coordinator reads the segments that are used but have not been
assigned for a data source it does two things. First, it identifies the "real"
segments, that is those segments that are not tombstones. Proceed to *assign*
them to data servers (i.e. historicals) as usual. Second, for the tombstones in
the segments the coordinator just *announces* them using code similar to what
the historicals use to annouce real segments.
The code changes in the coordinator are then to be able to distinguish
tombstones that have not been announced and then incorporate enough of the
*announcement* code to be able to announce the tombstones itself.
The coordinator will not be a data server since it will not load tombstones
nor respond to queries about them. Also, since the coordinator announces
tombstones, it is the only server type that announces them, tombstones do not
have to be balanced.
In addition, the coordinator has a duty that checks the versioned timeline,
factor in the tombstones in the version
timeline so the coordinator can tell if a tombstone gets overshadowed so
it mark the tombstone as unused. In this case it
also unannounces tombstones that are unused
Broker listens to tombstones announcements and integrates them in its
datasource timeline
When a broker receives a query then it finds out the tombstones in the
intervals and removes them from any queries it sends to relevant historicals.
Historicals are not changed at all since they will never receive intervals
for tombstones.
The major disadvantage of this is that it breaks current design for
`DataSegment` handling (i.e. it does not mimic real segments). Other
disadvantages are ensuring high availability on the face of coordinator
failures as well as that implementation needs to deal properly with unanouncing
tombstones. It is not desirable to unannounce tombstones on graceful shutdown
of a coordinator since then the broker would remove them from its timeline and
some queries would potentially return incorrect results.
This PR has:
- [ ] been self-reviewed.
- [ ] using the [concurrency
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
(Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]