This is an automated email from the ASF dual-hosted git repository. sanechka pushed a commit to branch server-timeline in repository https://gitbox.apache.org/repos/asf/druid.git
commit 5beeac98d4748076c3e4cce2c371e80a9c0a94f5 Author: Sasha Syrotenko <[email protected]> AuthorDate: Wed Mar 25 10:31:18 2026 +0200 WIP --- plan.md | 209 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ task.md | 18 ++++++ 2 files changed, 227 insertions(+) diff --git a/plan.md b/plan.md new file mode 100644 index 00000000000..dd1857a888f --- /dev/null +++ b/plan.md @@ -0,0 +1,209 @@ +# BrokerServerViewWithUnavailableSegments Implementation Plan + +## Context + +Currently, the Broker's `BrokerServerView` only tracks segments loaded on historicals/indexers. Worker pools (Dart) cannot query segments that exist in metadata but aren't loaded on any server. The goal is to create a unified `TimelineServerView` that includes ALL used segments — available or not — so that consumers like Dart can plan queries against unloaded segments too. + +## Step 1: Add callback mechanism to MetadataSegmentView + +**Modify:** `sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java` + +**New callback interface** (nested or separate file): +```java +public interface MetadataSegmentViewCallback { + void segmentsAdded(Collection<DataSegment> segments); + void segmentsRemoved(Collection<DataSegment> segments); + void initialized(); +} +``` + +**Changes to MetadataSegmentView:** +- Add field: `ConcurrentMap<MetadataSegmentViewCallback, Executor> callbacks` +- Add field: `volatile Map<SegmentId, DataSegment> currentSegmentMap = Collections.emptyMap()` (for delta computation) +- Add method: `registerCallback(Executor exec, MetadataSegmentViewCallback callback)` +- Add method: `boolean isInitialized()` — returns `cachePopulated.getCount() == 0` +- Add method: `Set<SegmentId> getPublishedSegmentIds()` — returns current key set (for point lookups by the new class) +- Modify `poll()` (line 149-179): + - After building the `ImmutableSortedSet`, also build a `Map<SegmentId, DataSegment>` from the new segments + - Compute delta: `added = newMap.keys - oldMap.keys`, `removed = oldMap.keys - newMap.keys` + - Swap both `publishedSegments` and `currentSegmentMap` + - Invoke callbacks with delta collections + - On first poll (old map is empty): fire `initialized()` after `segmentsAdded()` + +Delta computation is O(N) using HashMap key-set differences. Since `poll()` runs on a single scheduled thread, no extra synchronization needed for the diff itself. + +## Step 2: Create BrokerServerViewWithUnavailableSegments + +**New file:** `sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerServerViewWithUnavailableSegments.java` + +Lives in `sql` module (same as MetadataSegmentView) to avoid circular dependency. + +### Class structure + +```java +@ManageLifecycle +public class BrokerServerViewWithUnavailableSegments implements TimelineServerView +``` + +**Constructor injection:** +- `BrokerServerView brokerServerView` +- `MetadataSegmentView metadataSegmentView` +- `BrokerSegmentMetadataCacheConfig cacheConfig` +- `TierSelectorStrategy historicalTierSelectorStrategy` +- `@Named(REALTIME_SELECTOR) TierSelectorStrategy realtimeTierSelectorStrategy` +- `BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig` + +**Guard:** Constructor throws `ISE` if `!cacheConfig.isMetadataSegmentCacheEnable()`. + +### Internal state (guarded by `synchronized(lock)`) + +- `Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines` — unified timeline per datasource +- `Map<SegmentId, ServerSelector> selectors` — all segments in the timeline +- `Set<SegmentId> metadataSegmentIds` — segments known from MetadataSegmentView +- `Set<SegmentId> availableSegmentIds` — segments with at least one server (from BrokerServerView) +- `ConcurrentMap<TimelineCallback, Executor> timelineCallbacks` — downstream callbacks + +### Callback registration: in the CONSTRUCTOR, not @LifecycleStart + +Following the established pattern from `CoordinatorSegmentMetadataCache` (line 153), register callbacks on both `brokerServerView` and `metadataSegmentView` **in the constructor**. This guarantees callbacks are in place before either source fires events (since lifecycle `@LifecycleStart` runs after all constructors). + +- BrokerServerView: `brokerServerView.registerTimelineCallback(Execs.directExecutor(), ...)` +- MetadataSegmentView: `metadataSegmentView.registerCallback(Execs.directExecutor(), ...)` + +Using `directExecutor()` is safe — Java `synchronized` monitors are reentrant on the same thread, and our lock is a separate object from BrokerServerView's. + +### Callback handlers + +**From BrokerServerView:** + +`segmentAdded(server, segment)`: +``` +synchronized(lock): + availableSegmentIds.add(id) + if selectors contains id: + // Already in timeline from metadata. Replace empty selector with real one from BrokerServerView. + brokerSelector = brokerServerView.getTimeline(ds).findChunk(interval, version, partitionNum).getObject() + if brokerSelector != current selector: + timeline.remove(old chunk), timeline.add(chunk with brokerSelector) + selectors.put(id, brokerSelector) + else: + // New segment. Reuse BrokerServerView's real selector. + brokerSelector = brokerServerView.getTimeline(ds).findChunk(interval, version, partitionNum).getObject() + timeline.add(chunk with brokerSelector) + selectors.put(id, brokerSelector) + fire segmentAdded callback +``` + +`segmentRemoved(segment)` (segment gone from ALL servers): +``` +synchronized(lock): + availableSegmentIds.remove(id) + if metadataSegmentIds contains id: + // Still "used" per coordinator — keep in timeline but swap to empty selector + replace selector in timeline with a new empty ServerSelector + // Do NOT fire segmentRemoved — segment is still logically present + else: + // Gone from both sources — remove from timeline entirely + remove from timeline, selectors + fire segmentRemoved callback +``` + +`serverSegmentRemoved(server, segment)`: pass through to downstream callbacks. + +**From MetadataSegmentView:** + +`segmentsAdded(segments)`: +``` +synchronized(lock): + for each segment: + metadataSegmentIds.add(id) + if not in selectors: + // Not on any server. Add with empty ServerSelector. + create empty ServerSelector, add to timeline, selectors +``` + +`segmentsRemoved(segments)`: +``` +synchronized(lock): + for each segment: + metadataSegmentIds.remove(id) + if not in availableSegmentIds: + // Gone from metadata AND not on any server — remove from timeline + remove from timeline, selectors + fire segmentRemoved callback + // else: still on a server, keep in timeline (BrokerServerView handles its lifecycle) +``` + +### Helper methods + +```java +// Creates a ServerSelector with no servers — pick() will return null +private ServerSelector createEmptySelector(DataSegment segment) + +// Gets or creates timeline for a datasource (skipTombstones=true, same as BrokerServerView) +private VersionedIntervalTimeline<String, ServerSelector> getOrCreateTimeline(String dataSource) + +// Standard callback dispatch (same pattern as BrokerServerView lines 414-425) +private void runTimelineCallbacks(Function<TimelineCallback, CallbackAction> function) +``` + +### TimelineServerView delegation + +- `getTimeline()` — returns from our own `timelines` map (synchronized) +- `getDruidServers()` — delegates to `brokerServerView` +- `getQueryRunner()` — delegates to `brokerServerView` +- `registerTimelineCallback()` — registers on our own `timelineCallbacks` +- `registerServerCallback()` / `registerSegmentCallback()` — delegates to `brokerServerView` + +### Initialization + +Two `CountDownLatch` instances (`brokerViewInitLatch`, `metadataViewInitLatch`). Fire `timelineInitialized()` to downstream callbacks only after both sources have initialized. Use a helper `maybeFireInitialized()` called from both init handlers. + +## Step 3: Guice wiring + +**Modify:** `services/src/main/java/org/apache/druid/cli/CliBroker.java` + +After the existing lifecycle registrations (line ~155), add: +```java +LifecycleModule.register(binder, BrokerServerViewWithUnavailableSegments.class); +``` + +The existing `TimelineServerView -> BrokerServerView` binding is **unchanged**. The new class is a separate injectable singleton — consumers that need the full view inject `BrokerServerViewWithUnavailableSegments` directly. + +## Step 4: Tests + +**New file:** `sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerServerViewWithUnavailableSegmentsTest.java` + +Key scenarios: +1. Metadata-only segment: appears in timeline with `pick() == null` +2. Available segment: appears in timeline with working ServerSelector +3. Segment in both sources: selector is the real one from BrokerServerView +4. Segment becomes unavailable (removed from BrokerServerView, still in metadata): stays in timeline with empty selector +5. Segment becomes unused (removed from metadata, still on server): stays in timeline with real selector +6. Full removal (gone from both): removed from timeline +7. `metadataSegmentCacheEnable=false`: throws ISE +8. Initialization gating: `timelineInitialized` fires only after both sources init + +Also add tests for MetadataSegmentView delta computation (callback invocation with correct added/removed sets). + +## Known limitations + +**Version overshadowing**: If metadata reports a newer segment version that isn't loaded on any historical, `VersionedIntervalTimeline.lookup()` will return the newer (unavailable) version, overshadowing the older (loaded) version. The task.md spec explicitly accepts this behavior ("lookup will always return the latest complete partition set"). Consumers must handle `pick() == null` gracefully. + +## Files to create/modify + +| File | Action | +|------|--------| +| `sql/.../schema/MetadataSegmentView.java` | Add callback registration, delta computation in `poll()`, `isInitialized()`, `getPublishedSegmentIds()` | +| `sql/.../schema/MetadataSegmentViewCallback.java` | New callback interface | +| `sql/.../schema/BrokerServerViewWithUnavailableSegments.java` | New class implementing TimelineServerView | +| `services/.../cli/CliBroker.java` | Register new class in lifecycle | +| `sql/.../schema/BrokerServerViewWithUnavailableSegmentsTest.java` | New test class | +| `sql/.../schema/MetadataSegmentViewTest.java` | Add/extend tests for callback + delta | + +## Verification + +1. Unit tests for delta computation in MetadataSegmentView +2. Unit tests for all corner cases in BrokerServerViewWithUnavailableSegments +3. Integration: inject the new view, verify that `getTimeline()` returns segments not on any historical +4. Build: `mvn -pl sql -am compile` to verify no circular dependencies diff --git a/task.md b/task.md new file mode 100644 index 00000000000..ad4495b46a6 --- /dev/null +++ b/task.md @@ -0,0 +1,18 @@ +## Changes in Apache + +- Allow registration of a callback (similar to TimelineCallback) in MetadataSegmentView. +- When MetadataSegmentView finishes a poll from the Coordinator, it should determine the delta of segments added/removed/updated so that it can invoke the callback accordingly. +- Bind a new class BrokerServerViewWithUnavailableSegments on the Broker. + This class implements TimelineServerView and maintains a full timeline of all segments (available on historicals, available on indexers/peons, not available). +- The new class registers a TimelineCallback on both BrokerServerView and MetadataSegmentView. +- We keep a single timeline for each datasource. When a callback is received for a segment (with or without server) added or removed, + we simply add it to the respective datasource timeline. +- The behaviour of the VersionedIntervalTimeline class ensures that timeline.lookup() will always return + the latest complete partition set in the timeline object irrespective of whether those segments are available or not. +- When metadataSegmentCacheEnable is false, the new server view class will throw an exception when any method is invoked since we do not have info of unavailable segments. +- An alternate name for the new class could be BrokerServerViewOfLatestUsedSegments to denote that + this is a server view of latest version used segments. I think that might convey the intent better. + +### Some corner cases worth noting: +- Callback from BrokerServerView says segment has become unavailable - remove segment from timeline only if MetadataSegmentView thinks that segment is now unused, otherwise keep it in the timeline with an empty server list. +- Callback from MetadataSegmentView says segment has been removed (i.e. segment is now unused) - segment should be removed from timeline. \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
