OurNewestMember opened a new issue, #13110:
URL: https://github.com/apache/druid/issues/13110

   ### Description
   
   Reduce the delay to be able to query compacted/reindexed data by 
implementing a "druid" type supervisor.  The supervisor creates realtime tasks 
which read druid segments (already-published and potentially also realtime) and 
creates an index using exactly-once read semantics.
   
   
   Alternatives
   - using automatic compaction, increase the coordinator poll rate for duties 
(this should be able to schedule and execute automatic compaction jobs more 
frequently than, eg, every 30 minutes)
     - limitation: target data source must match the source
     - limitation: automatic compaction supports fewer features compared to 
other ingest mechanisms
   - "out of band" scheduled batch indexing - use external orchestration to 
know when new data should be read and then submit batch ingest tasks
     - limitation: the orchestrator needs to implement substantial logic 
regarding segment availability and most likely do so in a way that should 
facilitate exactly-once ingestion
       - ..."ditto" for the "already-loaded" data to know what work has already 
been completed!
       - Probably also needs to know when segments are replaced to make the 
appropriate API calls (eg, submit tasks, coordinator segment API calls, etc) to 
sync the downstream data state
     - This approach also probably includes ingesting from a druid query (eg, 
native scan or multi-stage)
   - pushing the ingested data to an output stream (eg, kafka topic)
     - limitation: would likely still either incur a substantial delay (eg, to 
know that the segments are actually published so they can "safely" treated as 
"ready for the next stage" to facilitate exactly-once ingestion)
     - limitation: duplicate data movement
     - limitation: probably also needs to "re-publish" events when segments are 
replaced
   - publishing a stream of segment events (eg, published, maybe dropped, too)
     - this is probably a version of "out of band" orchestration facilitated by 
a "push" mechanism (rather than pull/poll for the segment events) working at 
the segment (not event) level
   - Create a "stream multiplexer" or "stream derivative" supervisor type which 
takes an existing supported stream type and makes it available for multiple 
supervisors
     - this might require the overlord to act as a stream server so that the 
other druid supervisors can get the "offsets", etc
     - limitation: this probably requires even another supervisor type for the 
"derivative" supervisors to specify their connection to the new multiplexer 
supervisor
   
   Questions/etc:
   - If this were implemented to be able to read source data from realtime 
processes, would the task need the ability to know from which segment the 
source data came (in case the segment isn't published because of a task 
failure, read replication)?
     - Is there already some "characterizing" information about the current 
state of an index (such as index size, number of incoming events 
processed/discarded to get to that index state, min/max offsets read from the 
incoming stream to get to that index state, etc) to know what data is 
represented when the index is queried at a point in time?
       - For runtime performance on the "existing" realtime ingest processes, 
should there be a flag to explicitly enable exposing such index introspection 
values when queried?  (ie, so that the realtimes don't need to incur the 
workload of maintaining and exposing the index state if the user determines it 
does not need to be queried for any reason)
   - Is it feasible for the supervisor to guarantee exactly-once ingestion at 
the level of either the source segment (published) or the "source source event" 
(the input event for the druid source which the supervisor is ingesting 
from...not the output row which can vary both between queries and between 
different executions with the same source data and ingest spec)?
   - Would the supervisor need the ability to create tasks with 
appendToExisting: false when the druid source has replaced a segment?
     - Eg, 2 segments for time chunk A in the upstream source are dropped and 
replaced with a new segment -- should the supervisor be able to create tasks to 
read in all of the replacement data (plus any existing segments from its own 
data source which are within time chunk A but completely excluded by the time 
span of the new segment in the upstream source) and then replace all of its own 
existing segments in time chunk A?
   - Logic that considers replaced segments may want to reduce the total 
workload with application-specific logic
     - Example: if segments are replaced due to compaction, it may help to know 
that the resulting segments came from a compaction task that did not define a 
filter and did not change dimensions, metrics or query granularity -- this 
could indicate that the resulting segments would return the same data at query 
time, and therefore any "downstream" segments produced from the 
"to-be-replaced" segments should also return the same data...without needing to 
replace the downstream segments.
   - Is this a form of an arbitrary supervisor for which the exactly-once 
mechanism might require enhancements to the existing system to expose the 
source "offsets"?
     - Could an alternate instantiation ingest from blob storage or HTTP and 
simply require a different "source offset provider" (the blob storage might use 
a hierarchical object "path" plus object modified time plus splittable source 
row number to identify which data has or has not been "seen" yet)
   
   Key areas
   - "granularity" of source offset tracking - at the druid segment level or at 
the "source source event" level
   - when source replaces data - should the new logic support replacing 
existing data?
   - Is it feasible to implement (or update) an index state mechanism which can 
be queried?
     - Example 1: enable flag in ingest spec and then query realtime process to 
receive indexing metadata (eg a list of source offsets that were consumed to 
produce the indexes that were queried) -- an enhancement to the processing 
statistics?
     - Example 2 (moonshot?): enable flag in batch task spec and then query 
indexing process to receive indexing metadata indicating current ingestion 
position based on the current "split" (eg, object ABC, current indexes for 
querying are the following: 
index1:{"start":0,"end":12345000,"offsetType":"splittableSourceRows","persistTime":"2026-08-22T00:12:34.000Z","processed":1234400,"thrownAway":1000},
 
index2:{"start":12345001,"end":23456000,"offsetType":"splittableSourceRows","persistTime":null,"processed":11110100,"thrownAway":899})
   - More generic supervisor types: eg, a "super supervisor" (eg, to multiplex 
a single incoming stream to multiple druid supervisors), an arbitrary 
supervisor (eg, able to track "offsets" for arbitrary sources like data blobs, 
HTTP requests, etc)
   
   ### Motivation
   
   Please provide the following for the desired feature or change:
   - this feature could potentially replace the infrastructure and cost of 
duplicate streaming ingests to achieve multiple rollups, etc in a near-realtime 
fashion
   
   
   This issue likely needs refinement to identify potential analysis and action 
items.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to