loquisgon opened a new pull request #12137:
URL: https://github.com/apache/druid/pull/12137


   # Druid batch ingestion replace high level design
   
   ## This PR is draft since only the implementation is coded and not unit or 
integration tests not it has received extensive testing yet, The purpose of 
this draft PR is to start socializing the design of the new replace 
functionality.
   
   ## Problem statement
   
   With regular native batch ingestion specs using `appendToExisting` set to 
`false` the new data referenced in the spec's `inputIntervals` replaces data in 
previously correspondign time chunks but it leaves previous time chunks intact 
when the new input data does not overlap with them. This causes some confusion 
for customers wanting to replace *all* the data in the input intervals with 
just the new data referenced in the ingestion spec's data. This wotk introduces 
a new "replace" functionality to solve the need to replace all the existing 
data with the new data in th einput intervals.
   
   A *replace batch ingestion spec* (or simply *replace spec*) is a batch 
ingestion spec with the following characteristics:
   
   - Input data (the usual stuff)
   
   - Batch ingestion intervals in the ioConfig section of the spec (the usual 
stuff) (we call this the "replace intervals")
   
   - A flag in the ioConfig (reuse `replaceExisting`) that indicates that this 
is replace of time chunks (this is new) 
   
   When the replace spec is executed, the following will happen to the existing 
data source's data. Every time chunk that overlap with the replace intervals 
(we call this the "replacement time chunks") will all be replaced as follows. 
For every one of the replacement time chunks:
   
   - If the input data referenced in the replace spec contains data for that 
replacement time chunk, all the previous data in the time chunk will be either 
partially overshadowed (when the replace spec's segment granularity is finer) 
or completely overshadowed (when the replace spec's segment granularity is the 
same or coarser) and the input data will be used as a replacement (usual 
stuff). Please note in the case that the replacement spec has finer granularity 
and if there are enough replacement time chunks with data for the finer 
granularity to cover all the coarser granularity previous data then in this 
case previous data is also completely overshadowed.
   
   - If the input data does not contain data for the replacement time chunk 
then a tombstone will be created for the replacement time chunk and this 
tombstone will either completely overshadow the existing segment or partitally 
overshadow it according to the input’s and existing segment’s granularity 
(tombstones is a new concept, just a metadata only “empty” segment – no 
physical empty segments will be created)
   
   - To save metadata rows the tombstones will be created only if there is a 
previously used & visible segment overlapping with the tombstone's interval
   
   The tombstones will be eliminated, removed, according to the usual rules of 
when segments get overshadowed, unused etc. If new rules need to be added they 
will be added and documented.
   
   
   ## Design idea
   The idea is that segment creation for replace ingestion identifies real 
segments in input intervals as well as tombstones. The tombstones are 
represented as `DataSegment` like real segments but inside its metadata they 
are clearly identifiable as a "tombstone". After this point, the publishing (by 
overlord) and assignment to historicals (by coordinator) are practically the 
same as of today. The only change required in the historical is when a 
tombstone is loaded then a special, light simple `QueriableIndex` is created 
for the tombstone which is clearly marked as being for a tombstone (a new 
default method `default boolean isFromTombstone()` returning `false` by default 
will be added to the `QueriableIndex` interface which will return `true` for 
the queriable index created for the tombstone. When a query comes to the 
historical then the historical's code will create `NoOpQueryRunner`s for each 
one of the tombstones in the query.
   
   ## Scope
   For speed of delivery & proving the concept as quickly as we can we will 
restrict this new functionality to `SinglePhaseSubTask` and `IndexTask`.
   
   ## Create & publish tombstones
   - Figure out what time chunks correspond to the input intervals of the 
replace spec
   - Subtract from those the ones that are not empty: i.e. any time chunk that 
overlaps with a segment produced by the replace spec that has data
   - Technically, after the step above we have the intervals that can be used 
to create the tombstones. In reality we don't need to create tombstones for 
them all. We just need to create tombstones for those tombstone intervals that 
"overlap" with existing, used, segments with an immediately older version (i.e. 
current segments). So from the intervals above also remove those intervals that 
do not overlap with any current, used, visible, segment. We may not do this 
optimization in the first version of the code for development speed but we can 
do this later.
   - Now we have the "tombstone intervals"
   - For each of these tombstone intervals create a `DataSegment`. Call these 
`DataSegments` tombstones (the `payload` section's `type` can be used to mark 
them so)
   - After all the tombstones are created, add these segments to the regular 
new segments that were created for the ingestion in progress
   - Then publish all the augmented set of `DataSegments` as usual
   - After publishing *Supervisor* guarantees that all those data segments are 
inserted in the metadata (this is current normal behavior for publishing 
segments)
   
   ## Coordinator assignment
    - The coordinator by means of the load rules detects that new segments 
exist for the data source. The new segments may contain tombstones, the 
coordinator does not care about this and uses its usual assignment strategy to 
assign the data source’s segments (some of them are tombstones) to 
historically. There may be a need to adjust assignment for tombstones at some 
point but initially we may choose just to let it use the current strategy.
   
   
   ## Historical query path
   - The historical detects that a new segment has been assigned to it. If the 
segment is a tombstone then the historical creates an empty `QueriableIndex` 
marked as “tombstone” (using a new `QueriableIndex`  method: `boolean 
isFromTombstone()`) and announces the tombstone
   - When the historical receives a query for the data source, the historical’s 
ServerManager detects the tombstone queryable indices (using the 
“isFromTombstone” method) and for each of those tombstones it creates a 
`NoOpQueryRunner` (this is an existing query runner that always returns an 
empty result set, exactly what is needed for tombstones). Thus the part of the 
query hitting a tombstone queryable index will return an empty sequence since 
that is how the `NoOpQueryRunner` works and it is the expected result set from 
a tombstone since tombstones do not have any rows.
   
   ## Broker
   Broker is unchanged, tombstones look like regulare segments to it.
   
   ## Compaction:
   - Compacting an existing data source with tombstones proceeds as follows
   - The compaction task will traverse the used segments for a data source and 
it will simply skip those segments that are not tombstones since they will have 
no data. The previous tombstones might be overshadowed depending on the new 
compaction granularity or depending on whether there was data appended to the 
tombstone time chunk after the initial tombstone creation (in the latter case 
the appended data since it is in the same time chunk will completely overshadow 
the exiting tombstone). Overshadowed tombstones will be processed exactly as 
normal data segments after this.
   
   ## Design alternatives
   
   ### Create and publish tombstones
   Using the existing mechanism for representing tombstones (i.e. `DataSegment` 
objects and its representation in the `SEGMENT_METADATA` metadata table) along 
with the publishing process seems like a natural fit. Therefore this is the 
only option we considered for this phase.
   
   ### Integrating tombstones with the query path
   Have coordinator announce tombstones and broker takes care of the query path 
upon hearing the announcements
   When the coordinator reads the segments that are used but have not been 
assigned for a data source it does two things. First, it identifies the "real" 
segments, that is those segments that are not tombstones. Proceed to *assign* 
them to data servers (i.e. historicals) as usual. Second, for the tombstones in 
the segments the coordinator just *announces* them using code similar to what 
the historicals use to annouce real segments.
   
   The code changes in the coordinator are then to be able to distinguish 
tombstones that have not been announced and then incorporate enough of the 
*announcement* code to be able to announce the tombstones itself.
   
   The coordinator will not be a data server since it will not load tombstones 
nor respond to queries about them. Also, since the coordinator announces 
tombstones, it is the only server type that announces them, tombstones do not 
have to be balanced.
   
   In addition, the coordinator has a duty that checks the versioned timeline, 
factor in the tombstones in the version
     timeline so the coordinator can tell if a tombstone gets overshadowed so 
it mark the tombstone as unused. In this case it
   also  unannounces tombstones that are unused
   
   Broker listens to tombstones announcements and integrates them in its 
datasource timeline
   
   When a broker receives a query then it finds out the tombstones in the 
intervals and removes them from any queries it sends to relevant historicals.
   
   Historicals are not changed at all since they will never receive intervals 
for tombstones.
   
   The major disadvantage of this is that it breaks current design for 
`DataSegment` handling (i.e. it does not mimic real segments). Other 
disadvantages are ensuring high availability on the face of coordinator 
failures as well as that implementation needs to deal properly with unanouncing 
tombstones. It is not desirable to unannounce tombstones on graceful shutdown 
of a coordinator since then the broker would remove them from its timeline and 
some queries would potentially return incorrect results. 
   
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to