This is an automated email from the ASF dual-hosted git repository.

sanechka pushed a commit to branch server-timeline
in repository https://gitbox.apache.org/repos/asf/druid.git

commit 5beeac98d4748076c3e4cce2c371e80a9c0a94f5
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Mar 25 10:31:18 2026 +0200

    WIP
---
 plan.md | 209 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 task.md |  18 ++++++
 2 files changed, 227 insertions(+)

diff --git a/plan.md b/plan.md
new file mode 100644
index 00000000000..dd1857a888f
--- /dev/null
+++ b/plan.md
@@ -0,0 +1,209 @@
+# BrokerServerViewWithUnavailableSegments Implementation Plan
+
+## Context
+
+Currently, the Broker's `BrokerServerView` only tracks segments loaded on 
historicals/indexers. Worker pools (Dart) cannot query segments that exist in 
metadata but aren't loaded on any server. The goal is to create a unified 
`TimelineServerView` that includes ALL used segments — available or not — so 
that consumers like Dart can plan queries against unloaded segments too.
+
+## Step 1: Add callback mechanism to MetadataSegmentView
+
+**Modify:** 
`sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java`
+
+**New callback interface** (nested or separate file):
+```java
+public interface MetadataSegmentViewCallback {
+    void segmentsAdded(Collection<DataSegment> segments);
+    void segmentsRemoved(Collection<DataSegment> segments);
+    void initialized();
+}
+```
+
+**Changes to MetadataSegmentView:**
+- Add field: `ConcurrentMap<MetadataSegmentViewCallback, Executor> callbacks`
+- Add field: `volatile Map<SegmentId, DataSegment> currentSegmentMap = 
Collections.emptyMap()` (for delta computation)
+- Add method: `registerCallback(Executor exec, MetadataSegmentViewCallback 
callback)`
+- Add method: `boolean isInitialized()` — returns `cachePopulated.getCount() 
== 0`
+- Add method: `Set<SegmentId> getPublishedSegmentIds()` — returns current key 
set (for point lookups by the new class)
+- Modify `poll()` (line 149-179):
+  - After building the `ImmutableSortedSet`, also build a `Map<SegmentId, 
DataSegment>` from the new segments
+  - Compute delta: `added = newMap.keys - oldMap.keys`, `removed = oldMap.keys 
- newMap.keys`
+  - Swap both `publishedSegments` and `currentSegmentMap`
+  - Invoke callbacks with delta collections
+  - On first poll (old map is empty): fire `initialized()` after 
`segmentsAdded()`
+
+Delta computation is O(N) using HashMap key-set differences. Since `poll()` 
runs on a single scheduled thread, no extra synchronization needed for the diff 
itself.
+
+## Step 2: Create BrokerServerViewWithUnavailableSegments
+
+**New file:** 
`sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerServerViewWithUnavailableSegments.java`
+
+Lives in `sql` module (same as MetadataSegmentView) to avoid circular 
dependency.
+
+### Class structure
+
+```java
+@ManageLifecycle
+public class BrokerServerViewWithUnavailableSegments implements 
TimelineServerView
+```
+
+**Constructor injection:**
+- `BrokerServerView brokerServerView`
+- `MetadataSegmentView metadataSegmentView`
+- `BrokerSegmentMetadataCacheConfig cacheConfig`
+- `TierSelectorStrategy historicalTierSelectorStrategy`
+- `@Named(REALTIME_SELECTOR) TierSelectorStrategy realtimeTierSelectorStrategy`
+- `BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig`
+
+**Guard:** Constructor throws `ISE` if 
`!cacheConfig.isMetadataSegmentCacheEnable()`.
+
+### Internal state (guarded by `synchronized(lock)`)
+
+- `Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines` — 
unified timeline per datasource
+- `Map<SegmentId, ServerSelector> selectors` — all segments in the timeline
+- `Set<SegmentId> metadataSegmentIds` — segments known from MetadataSegmentView
+- `Set<SegmentId> availableSegmentIds` — segments with at least one server 
(from BrokerServerView)
+- `ConcurrentMap<TimelineCallback, Executor> timelineCallbacks` — downstream 
callbacks
+
+### Callback registration: in the CONSTRUCTOR, not @LifecycleStart
+
+Following the established pattern from `CoordinatorSegmentMetadataCache` (line 
153), register callbacks on both `brokerServerView` and `metadataSegmentView` 
**in the constructor**. This guarantees callbacks are in place before either 
source fires events (since lifecycle `@LifecycleStart` runs after all 
constructors).
+
+- BrokerServerView: 
`brokerServerView.registerTimelineCallback(Execs.directExecutor(), ...)`
+- MetadataSegmentView: 
`metadataSegmentView.registerCallback(Execs.directExecutor(), ...)`
+
+Using `directExecutor()` is safe — Java `synchronized` monitors are reentrant 
on the same thread, and our lock is a separate object from BrokerServerView's.
+
+### Callback handlers
+
+**From BrokerServerView:**
+
+`segmentAdded(server, segment)`:
+```
+synchronized(lock):
+  availableSegmentIds.add(id)
+  if selectors contains id:
+    // Already in timeline from metadata. Replace empty selector with real one 
from BrokerServerView.
+    brokerSelector = brokerServerView.getTimeline(ds).findChunk(interval, 
version, partitionNum).getObject()
+    if brokerSelector != current selector:
+      timeline.remove(old chunk), timeline.add(chunk with brokerSelector)
+      selectors.put(id, brokerSelector)
+  else:
+    // New segment. Reuse BrokerServerView's real selector.
+    brokerSelector = brokerServerView.getTimeline(ds).findChunk(interval, 
version, partitionNum).getObject()
+    timeline.add(chunk with brokerSelector)
+    selectors.put(id, brokerSelector)
+  fire segmentAdded callback
+```
+
+`segmentRemoved(segment)` (segment gone from ALL servers):
+```
+synchronized(lock):
+  availableSegmentIds.remove(id)
+  if metadataSegmentIds contains id:
+    // Still "used" per coordinator — keep in timeline but swap to empty 
selector
+    replace selector in timeline with a new empty ServerSelector
+    // Do NOT fire segmentRemoved — segment is still logically present
+  else:
+    // Gone from both sources — remove from timeline entirely
+    remove from timeline, selectors
+    fire segmentRemoved callback
+```
+
+`serverSegmentRemoved(server, segment)`: pass through to downstream callbacks.
+
+**From MetadataSegmentView:**
+
+`segmentsAdded(segments)`:
+```
+synchronized(lock):
+  for each segment:
+    metadataSegmentIds.add(id)
+    if not in selectors:
+      // Not on any server. Add with empty ServerSelector.
+      create empty ServerSelector, add to timeline, selectors
+```
+
+`segmentsRemoved(segments)`:
+```
+synchronized(lock):
+  for each segment:
+    metadataSegmentIds.remove(id)
+    if not in availableSegmentIds:
+      // Gone from metadata AND not on any server — remove from timeline
+      remove from timeline, selectors
+      fire segmentRemoved callback
+    // else: still on a server, keep in timeline (BrokerServerView handles its 
lifecycle)
+```
+
+### Helper methods
+
+```java
+// Creates a ServerSelector with no servers — pick() will return null
+private ServerSelector createEmptySelector(DataSegment segment)
+
+// Gets or creates timeline for a datasource (skipTombstones=true, same as 
BrokerServerView)
+private VersionedIntervalTimeline<String, ServerSelector> 
getOrCreateTimeline(String dataSource)
+
+// Standard callback dispatch (same pattern as BrokerServerView lines 414-425)
+private void runTimelineCallbacks(Function<TimelineCallback, CallbackAction> 
function)
+```
+
+### TimelineServerView delegation
+
+- `getTimeline()` — returns from our own `timelines` map (synchronized)
+- `getDruidServers()` — delegates to `brokerServerView`
+- `getQueryRunner()` — delegates to `brokerServerView`
+- `registerTimelineCallback()` — registers on our own `timelineCallbacks`
+- `registerServerCallback()` / `registerSegmentCallback()` — delegates to 
`brokerServerView`
+
+### Initialization
+
+Two `CountDownLatch` instances (`brokerViewInitLatch`, 
`metadataViewInitLatch`). Fire `timelineInitialized()` to downstream callbacks 
only after both sources have initialized. Use a helper `maybeFireInitialized()` 
called from both init handlers.
+
+## Step 3: Guice wiring
+
+**Modify:** `services/src/main/java/org/apache/druid/cli/CliBroker.java`
+
+After the existing lifecycle registrations (line ~155), add:
+```java
+LifecycleModule.register(binder, 
BrokerServerViewWithUnavailableSegments.class);
+```
+
+The existing `TimelineServerView -> BrokerServerView` binding is 
**unchanged**. The new class is a separate injectable singleton — consumers 
that need the full view inject `BrokerServerViewWithUnavailableSegments` 
directly.
+
+## Step 4: Tests
+
+**New file:** 
`sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerServerViewWithUnavailableSegmentsTest.java`
+
+Key scenarios:
+1. Metadata-only segment: appears in timeline with `pick() == null`
+2. Available segment: appears in timeline with working ServerSelector
+3. Segment in both sources: selector is the real one from BrokerServerView
+4. Segment becomes unavailable (removed from BrokerServerView, still in 
metadata): stays in timeline with empty selector
+5. Segment becomes unused (removed from metadata, still on server): stays in 
timeline with real selector
+6. Full removal (gone from both): removed from timeline
+7. `metadataSegmentCacheEnable=false`: throws ISE
+8. Initialization gating: `timelineInitialized` fires only after both sources 
init
+
+Also add tests for MetadataSegmentView delta computation (callback invocation 
with correct added/removed sets).
+
+## Known limitations
+
+**Version overshadowing**: If metadata reports a newer segment version that 
isn't loaded on any historical, `VersionedIntervalTimeline.lookup()` will 
return the newer (unavailable) version, overshadowing the older (loaded) 
version. The task.md spec explicitly accepts this behavior ("lookup will always 
return the latest complete partition set"). Consumers must handle `pick() == 
null` gracefully.
+
+## Files to create/modify
+
+| File | Action |
+|------|--------|
+| `sql/.../schema/MetadataSegmentView.java` | Add callback registration, delta 
computation in `poll()`, `isInitialized()`, `getPublishedSegmentIds()` |
+| `sql/.../schema/MetadataSegmentViewCallback.java` | New callback interface |
+| `sql/.../schema/BrokerServerViewWithUnavailableSegments.java` | New class 
implementing TimelineServerView |
+| `services/.../cli/CliBroker.java` | Register new class in lifecycle |
+| `sql/.../schema/BrokerServerViewWithUnavailableSegmentsTest.java` | New test 
class |
+| `sql/.../schema/MetadataSegmentViewTest.java` | Add/extend tests for 
callback + delta |
+
+## Verification
+
+1. Unit tests for delta computation in MetadataSegmentView
+2. Unit tests for all corner cases in BrokerServerViewWithUnavailableSegments
+3. Integration: inject the new view, verify that `getTimeline()` returns 
segments not on any historical
+4. Build: `mvn -pl sql -am compile` to verify no circular dependencies
diff --git a/task.md b/task.md
new file mode 100644
index 00000000000..ad4495b46a6
--- /dev/null
+++ b/task.md
@@ -0,0 +1,18 @@
+## Changes in Apache
+
+- Allow registration of a callback (similar to TimelineCallback) in 
MetadataSegmentView. 
+- When MetadataSegmentView finishes a poll from the Coordinator, it should 
determine the delta of segments added/removed/updated so that it can invoke the 
callback accordingly. 
+- Bind a new class BrokerServerViewWithUnavailableSegments on the Broker. 
+  This class implements TimelineServerView and maintains a full timeline of 
all segments (available on historicals, available on indexers/peons, not 
available).
+- The new class registers a TimelineCallback on both BrokerServerView and 
MetadataSegmentView. 
+- We keep a single timeline for each datasource. When a callback is received 
for a segment (with or without server) added or removed,
+  we simply add it to the respective datasource timeline. 
+- The behaviour of the VersionedIntervalTimeline class ensures that 
timeline.lookup() will always return 
+  the latest complete partition set in the timeline object irrespective of 
whether those segments are available or not. 
+- When metadataSegmentCacheEnable is false, the new server view class will 
throw an exception when any method is invoked since we do not have info of 
unavailable segments. 
+- An alternate name for the new class could be 
BrokerServerViewOfLatestUsedSegments to denote that 
+  this is a server view of latest version used segments. I think that might 
convey the intent better.
+
+### Some corner cases worth noting:
+- Callback from BrokerServerView says segment has become unavailable - remove 
segment from timeline only if MetadataSegmentView thinks that segment is now 
unused, otherwise keep it in the timeline with an empty server list. 
+- Callback from MetadataSegmentView says segment has been removed (i.e. 
segment is now unused) - segment should be removed from timeline.
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to