OurNewestMember opened a new issue, #13110:
URL: https://github.com/apache/druid/issues/13110
### Description
Reduce the delay to be able to query compacted/reindexed data by
implementing a "druid" type supervisor. The supervisor creates realtime tasks
which read druid segments (already-published and potentially also realtime) and
creates an index using exactly-once read semantics.
Alternatives
- using automatic compaction, increase the coordinator poll rate for duties
(this should be able to schedule and execute automatic compaction jobs more
frequently than, eg, every 30 minutes)
- limitation: target data source must match the source
- limitation: automatic compaction supports fewer features compared to
other ingest mechanisms
- "out of band" scheduled batch indexing - use external orchestration to
know when new data should be read and then submit batch ingest tasks
- limitation: the orchestrator needs to implement substantial logic
regarding segment availability and most likely do so in a way that should
facilitate exactly-once ingestion
- ..."ditto" for the "already-loaded" data to know what work has already
been completed!
- Probably also needs to know when segments are replaced to make the
appropriate API calls (eg, submit tasks, coordinator segment API calls, etc) to
sync the downstream data state
- This approach also probably includes ingesting from a druid query (eg,
native scan or multi-stage)
- pushing the ingested data to an output stream (eg, kafka topic)
- limitation: would likely still either incur a substantial delay (eg, to
know that the segments are actually published so they can "safely" treated as
"ready for the next stage" to facilitate exactly-once ingestion)
- limitation: duplicate data movement
- limitation: probably also needs to "re-publish" events when segments are
replaced
- publishing a stream of segment events (eg, published, maybe dropped, too)
- this is probably a version of "out of band" orchestration facilitated by
a "push" mechanism (rather than pull/poll for the segment events) working at
the segment (not event) level
- Create a "stream multiplexer" or "stream derivative" supervisor type which
takes an existing supported stream type and makes it available for multiple
supervisors
- this might require the overlord to act as a stream server so that the
other druid supervisors can get the "offsets", etc
- limitation: this probably requires even another supervisor type for the
"derivative" supervisors to specify their connection to the new multiplexer
supervisor
Questions/etc:
- If this were implemented to be able to read source data from realtime
processes, would the task need the ability to know from which segment the
source data came (in case the segment isn't published because of a task
failure, read replication)?
- Is there already some "characterizing" information about the current
state of an index (such as index size, number of incoming events
processed/discarded to get to that index state, min/max offsets read from the
incoming stream to get to that index state, etc) to know what data is
represented when the index is queried at a point in time?
- For runtime performance on the "existing" realtime ingest processes,
should there be a flag to explicitly enable exposing such index introspection
values when queried? (ie, so that the realtimes don't need to incur the
workload of maintaining and exposing the index state if the user determines it
does not need to be queried for any reason)
- Is it feasible for the supervisor to guarantee exactly-once ingestion at
the level of either the source segment (published) or the "source source event"
(the input event for the druid source which the supervisor is ingesting
from...not the output row which can vary both between queries and between
different executions with the same source data and ingest spec)?
- Would the supervisor need the ability to create tasks with
appendToExisting: false when the druid source has replaced a segment?
- Eg, 2 segments for time chunk A in the upstream source are dropped and
replaced with a new segment -- should the supervisor be able to create tasks to
read in all of the replacement data (plus any existing segments from its own
data source which are within time chunk A but completely excluded by the time
span of the new segment in the upstream source) and then replace all of its own
existing segments in time chunk A?
- Logic that considers replaced segments may want to reduce the total
workload with application-specific logic
- Example: if segments are replaced due to compaction, it may help to know
that the resulting segments came from a compaction task that did not define a
filter and did not change dimensions, metrics or query granularity -- this
could indicate that the resulting segments would return the same data at query
time, and therefore any "downstream" segments produced from the
"to-be-replaced" segments should also return the same data...without needing to
replace the downstream segments.
- Is this a form of an arbitrary supervisor for which the exactly-once
mechanism might require enhancements to the existing system to expose the
source "offsets"?
- Could an alternate instantiation ingest from blob storage or HTTP and
simply require a different "source offset provider" (the blob storage might use
a hierarchical object "path" plus object modified time plus splittable source
row number to identify which data has or has not been "seen" yet)
Key areas
- "granularity" of source offset tracking - at the druid segment level or at
the "source source event" level
- when source replaces data - should the new logic support replacing
existing data?
- Is it feasible to implement (or update) an index state mechanism which can
be queried?
- Example 1: enable flag in ingest spec and then query realtime process to
receive indexing metadata (eg a list of source offsets that were consumed to
produce the indexes that were queried) -- an enhancement to the processing
statistics?
- Example 2 (moonshot?): enable flag in batch task spec and then query
indexing process to receive indexing metadata indicating current ingestion
position based on the current "split" (eg, object ABC, current indexes for
querying are the following:
index1:{"start":0,"end":12345000,"offsetType":"splittableSourceRows","persistTime":"2026-08-22T00:12:34.000Z","processed":1234400,"thrownAway":1000},
index2:{"start":12345001,"end":23456000,"offsetType":"splittableSourceRows","persistTime":null,"processed":11110100,"thrownAway":899})
- More generic supervisor types: eg, a "super supervisor" (eg, to multiplex
a single incoming stream to multiple druid supervisors), an arbitrary
supervisor (eg, able to track "offsets" for arbitrary sources like data blobs,
HTTP requests, etc)
### Motivation
Please provide the following for the desired feature or change:
- this feature could potentially replace the infrastructure and cost of
duplicate streaming ingests to achieve multiple rollups, etc in a near-realtime
fashion
This issue likely needs refinement to identify potential analysis and action
items.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]