hongkunxu opened a new issue, #16584:
URL: https://github.com/apache/pinot/issues/16584

   # RFC: Broker and Server Segment Query Cache for Apache Pinot
   
   Status: Draft 0.1
   Authors: Xiang Fu, Hongkun Xu 
   Created: 2025-08-13
   Target Release: TBD
   Discussion Thread: (link after posting to [email protected])
   
   
   ## 0. Executive Summary
   
   This RFC proposes a two-layer query caching feature for Apache Pinot:
        1.      **Broker Result Cache** – caches fully merged query results at 
the broker layer.
        2.      **Server Segment Result Cache** – caches per-segment partial 
results (aggregations, group-by tables, and selection blocks) at the server 
layer.
   
   The design emphasizes correctness on immutable/sealed segments, pluggability 
of cache backends, precise invalidation, and rich observability. It draws on 
patterns from Apache Druid (broker + historical caches), StarRocks (backend 
caches), and ClickHouse (final result cache + block caches) while fitting 
Pinot’s execution model.
   
   
   ## 1. Background & Motivation
   
   Pinot executes every query end-to-end even when the inputs and the plan are 
unchanged. In dashboard scenarios, identical queries recur at 5–60s intervals 
and create avoidable CPU and IO load. Immutable segments (offline and sealed 
realtime) are natural candidates for result reuse.
   
   Goals of this RFC:
   
   - Reduce p95/p99 latency and cluster cost for repetitive workloads.
   - Provide predictable correctness via strong versioning and conservative 
defaults.  
   - Offer a pluggable SPI and configuration knobs to match diverse deployments.
   
   > Non-goals for Phase 1: ORDER BY/top-k caching, semantic/approximate 
matching, mutable/upsert caching.
   
   
   ## 2. Terminology
   
   - Segment: Pinot data shard (offline or realtime). Realtime may be consuming 
or sealed.
   - CRC/Version: Segment-level versioning/epoch used to detect content changes.
   - Broker Response: Final BrokerResponseNative after merge/trim.
   - Partial Result: Per-segment contribution (agg array, group-by map, or 
selection rows) before broker merge.
   
   
   ## 3. Requirements
   
   3.1 Functional
   
   - Exact-match caching of final results (broker) and partial results (server) 
for supported operators.
   - Deterministic, canonical keys covering query + data versions + relevant 
options. 
   - Automatic invalidation on segment/schema/table-config changes and 
server/broker lifecycle events.
   - Manual invalidation APIs for operators and SREs.
   - Per-query escape hatches (disable flags) for debugging. 
   
   ### 3.2 Correctness & Safety
   
   - Enabled by default only for: offline and sealed realtime segments.
   - Disabled by default for: consuming realtime segments and upsert tables.
   - Staleness bounded by TTL but correctness enforced by versioned keys.
   
   ### 3.3 Performance
   
   - Weighted LRU with size in bytes, not entry count.
   - Optional compression for value payloads.
   - Single-flight de-duplication to mitigate thundering herds. 
   
   ### 3.4 Operability
   
   - Detailed metrics (hits/misses/bytes/latency).
   - Tracing spans and cache decision annotations.
   - Configurable per-table overrides.
   
   
   ## 4. High-Level Architecture
   
   Two orthogonal caches:
   
   ### 4.1 Broker Result Cache
   
   - Placement: Broker (query entry/exit).
   - Key: Canonical SQL + normalized options + routing table version + 
participating segment epochs + schema epoch.
   - Value: Serialized BrokerResponseNative (optionally compressed).
   - Lookup Path: Check before dispatch → on hit, return; on miss, execute and 
store.
   - Invalidation: Broker listens to ExternalView/segment metadata and 
schema/config change events to purge affected keys. 
   
   ### 4.2 Server Segment Result Cache
   
   - Placement: Server, per-segment, around the operator execution.
   - Key Composition: 
   
   Key = HASH(
     tableNameWithType,
     segmentName,
     segmentCrcOrEpoch,
     planSignature,            // canonical operator tree
     projectionSchemaSig,      // columns+types used
     queryOptionsSig,          // null handling, response format, group trim 
thresholds, etc.
     starTreeSig,              // star-tree id/version used by planner
     timeRangeConstraintSig,   // broker pruning constraints intersected with 
segment
     limitSig                  // affects partial value shapes
   )
   
   Value Forms:
       Aggregation (no group-by): double/long/decimal arrays per function.
       GroupBy: compact groupKey → agg[] map (already trimmed to server 
threshold).
       Selection (no ORDER BY): encoded row block up to per-segment limit.
       Invalidation: On segment add/remove/reload; on schema/config change; 
manual admin calls. For consuming segments (if enabled), key additionally 
includes (partitionId, endOffset).
   
   
   ## 5. Detailed Design
   
   ### 5.1 Canonicalization & Signatures
   
   **Filter AST Canonicalization:**
   
   - Normalize commutative operators (AND/OR) by sorting children.
   - Normalize predicate forms (e.g., a IN (3,1,2) → IN (1,2,3)).
   - Normalize literals to internal storage types (timezone, number scale).
   - Push-downs folded; redundant predicates eliminated where safe.
   
   **Projection/Transform Signature:**
   
   - Stable ordering by expression string; include resolved types and 
dictionary/transform usage.
   
   **Aggregation Signature:**
   
   - Function name + normalized args + params (e.g., approx_percentile(0.95) vs 
quantile(0.95)).
   - Group-by keys listed in stable order.
   
   **Routing/Time Constraint Signature (broker):**
   
   - Include selected segments and their epochs; include min/max time ranges 
used for pruning.
   
   ### 5.2 SPI Interfaces (Java Sketch)
   ```
   public interface QueryResultCache<K, V> {
     @Nullable V get(K key);
     void put(K key, V value, long weightBytes);
     void invalidate(Predicate<K> predicate);
     void clear();
   }
   
   public interface SegmentResultCache extends 
QueryResultCache<SegmentCacheKey, SegmentCacheValue> {}
   
   @Value class SegmentCacheKey { /* fields matching Key above; equals/hashCode 
*/ }
   
   sealed interface SegmentCacheValue permits AggPart, GroupByPart, 
SelectionPart {
     int serializedSizeBytes();
   }
   
   final class AggPart implements SegmentCacheValue { double[] doubles; long[] 
longs; /* ... */ }
   final class GroupByPart implements SegmentCacheValue { /* groupKey dict + 
value arrays + trim meta */ }
   final class SelectionPart implements SegmentCacheValue { /* encoded rows */ }
   ```
   ### 5.3 Value Encoding & Compression
   
   - Versioned headers: magic(2B) | version(1B) | type(1B) | flags(1B) | 
size(4B) | payload.
   - Payload encodings:
   - Agg arrays: little-endian primitives; optional RLE for zero-dense arrays.
   - GroupBy: two-level structure (key dictionary + columnar agg arrays); 
varint lengths.
   - Selection: reuse DataTable encoding or introduce a leaner row block for 
cache (TBD).
   - Compression: configurable none|LZ4|ZSTD; default LZ4.
   
   ### 5.4 Memory Accounting & Eviction
   
   - Weight = serializedSizeBytes (post-compress).
   - Caffeine Weigher returns exact byte size to bound memory.
   - Separate pools per value type optional (phase 2).
   
   ### 5.5 Concurrency Control
   
   - Per-key single-flight registry: only one thread populates a missing key; 
others await a CompletableFuture.
   - Optional stale-while-revalidate (SWR): serve entries within swr.ms while 
refresh is in-flight (disabled by default).
   
   ### 5.6 Integration Points
   
   Broker:
   
   - Entry: BrokerRequestHandler.handleRequest() → pre-check.
   - Exit: after reduceAndSetExecutionStats() → store on success.
   - Skip store for partial/errored responses or when 
queryOptions.resultCache=false.
   
   Server:
   
   - Around segment operator execution (e.g., PlanNodeRunner or OpChain root).
   - Before building operators, compute planSignature.
   - On hit: short-circuit and return cached SegmentCacheValue.
   - On miss: execute operators → convert to value → put().
   
   5.7 Invalidation Sources & Propagation
   
   - Segment lifecycle: addSegment/removeSegment/reloadSegment from 
InstanceDataManager hooks call invalidate(k -> k.segmentName.equals(...)).
   - Schema/Table config: Listeners produce an epoch bump kept in memory; keys 
include schema/config epoch, forcing misses for old entries. Optionally 
bulk-invalidate by table.
   - Broker routing change: Broker cache key embeds segment epochs; any change 
yields a different key (implicit invalidation).
   - Admin APIs: Server and broker expose endpoints (see §7) to clear by 
table/segment or all.
   
   5.8 Failure Modes
   
   - Cache service down (remote backend): Fail-closed to execution; count as 
miss; circuit breaker to avoid hot looping.
   - Deserialization error (version mismatch): Drop entry; increment 
cache_value_deser_errors; execute normally.
   - Memory pressure: Caffeine evicts by size; emit warnings when hit rate < 
threshold with near-OOM events.
   
   5.9 Security & Multi-Tenancy
   
   - Namespace keys by tableNameWithType.
   - Per-table configuration overrides, including disablement.
   - Admin APIs gated behind existing auth/role checks.
   
   
   ## 6. Configuration
   ### 6.1 Broker
   ```
   pinot.broker.query.cache.enabled = false
   pinot.broker.query.cache.backend = in-memory | redis
   pinot.broker.query.cache.max.bytes = 512MB
   pinot.broker.query.cache.ttl.ms = 300000
   pinot.broker.query.cache.compress = lz4
   pinot.broker.query.cache.singleflight.enabled = true
   Per-table override: tableConfig.ingestionConfig.queryCache.broker.*
   ```
   ### 6.2 Server (Segment)
   ```
   pinot.server.segment.query.cache.enabled = false
   pinot.server.segment.query.cache.backend = in-memory | redis | rocksdb
   pinot.server.segment.query.cache.max.bytes = 256MB
   pinot.server.segment.query.cache.ttl.ms = 120000
   pinot.server.segment.query.cache.compress = lz4
   pinot.server.segment.query.cache.enable.offline = true
   pinot.server.segment.query.cache.enable.realtime.sealed = true
   pinot.server.segment.query.cache.enable.realtime.consuming = false
   pinot.server.segment.query.cache.disable.for.upsert = true
   pinot.server.segment.query.cache.singleflight.enabled = true
   pinot.server.segment.query.cache.stale_while_revalidate.ms = 0
   ```
   
   ## 7. Admin & Debug APIs
   
   Broker:
   POST /queryCache/invalidate?table=<tbl> → returns {removedKeys, bytes}
   POST /queryCache/clearAll → clears all
   GET  /queryCache/stats → hit/miss/bytes per table
   
   Server:
   POST /segmentCache/invalidate?table=<tbl>
   POST /segmentCache/invalidate?table=<tbl>&segment=<seg>
   POST /segmentCache/clearAll
   GET  /segmentCache/stats
   
   Per-query options:
   SET option 'resultCache' = false (broker)
   SET option 'segmentCache' = false (server)
   
   
   ## 8. Observability
   
   - Metrics (Broker/Server):
   - cache.hit, cache.miss, cache.put, cache.evict, cache.invalidate
   - cache.size.bytes, cache.value.bytes
   - cache.load.latency.ms, cache.hit.savings.ms (derived)
   - cache.singleflight.waiters
   - Tracing: Add spans cache.lookup, cache.miss_compute, cache.put, with tags: 
key.type, table, segment, reason (hit/miss/cannot-cache).
   - Logs: Structured log when bypassing cache due to policy/size/option.
   
   
   
   ## 9. Backends
   
   ### 9.1 In-memory (Caffeine)
   
   - Weighted LRU, TTL after write, optional SWR.
   - Zero external dependencies; default choice.
   
   ### 9.2 Redis (Optional)
   
   - Pros: cross-broker sharing for result cache; warm restarts.
   - Cons: network latency; need timeouts and circuit breaker.
   
   ### 9.3 RocksDB (Optional, server only)
   
   - Pros: large local capacity; survives restart.
   - Cons: file IO; compaction overhead.
   
   
   ## 10. Testing Strategy
   
   **Unit:**
   
   - Key canonicalization invariants (commutativity, literal normalization).
   - TTL/eviction and weight accounting.
   - Invalidation on segment reload and schema change.
   
   **Integration:**
   
   - Mini-cluster with offline + sealed realtime segments.
   - Workloads: (1) agg only, (2) agg+group-by, (3) selection no ORDER BY.
   - Verify correctness vs cache disabled; measure latency improvements.
   
   Chaos/Failure:
   
   - Kill/restart brokers/servers → ensure no corruption and warm/cold behavior 
acceptable.
   - Redis/RocksDB outages → graceful degrade to misses.
   
   
   ## 11. Rollout Plan
   
   1. Phase 1 (MVP): broker final result cache; server cache for agg, group-by, 
selection (no ORDER BY) on offline + sealed realtime. Upsert & consuming 
disabled.
   2. Phase 2: DocIdSet (filter) micro-cache; ORDER BY top-k per-segment cache; 
adaptive TTL; per-type pools.
   3. Phase 3: Remote backends (Redis/RocksDB) hardened; SWR; auto warm-up 
hooks.
   
   
   ## 12. Performance & Sizing Guidance
   
   - Start with broker.max.bytes = 512MB, server.max.bytes = 256MB per node.
   - Expect 20–70% hit rates on dashboard workloads with 15–60s refresh.
   - Benefits scale with segment immutability and repeated plan shapes.
   - Monitor cache.hit.savings.ms to convert wins into CPU-hours saved.
   
   
   ## 13. Alternatives Considered
   
   - Only broker cache: simpler but misses per-segment compute savings and 
limits reuse across similar (not identical) queries.
   - Only server cache: helps compute but not network/merge cost; less 
impactful for identical dashboards.
   - No cache: relies purely on block/index caches; inadequate for repeated 
analytics.
   
   
   ## 14. Risks & Mitigations
   
   - Stale results: Strong versioning in keys; conservative defaults; short 
TTLs.
   - Memory blowups: Byte-accurate weigher; per-table limits; robust metrics.
   - Complexity: Clear disable flags; thorough observability; staged rollout.
   
   
   ## 15. Implementation Tasks (GitHub Issue Breakdown)
   
   **Epic: Broker Result Cache**
   1. Broker cache config & SPI skeleton
   2. Caffeine backend + metrics
   3. Canonical key generator (SQL → AST → canonical string)
   4. Broker integration + store path
   5. Invalidation hooks (segment/schema/config)
   6. Admin REST + RBAC
   7. Unit & integration tests; docs
   
   **Epic: Server Segment Cache**
   1. SegmentResultCache SPI + configs
   2. Caffeine backend (weighted) + compression
   3. Plan signature builder (filter/projection/agg)
   4. Execution hook integration (pre/post operators)
   5. Lifecycle invalidation wiring (InstanceDataManager)
   6. Realtime consuming & upsert safety gates
   7. Metrics & tracing; debug flags
   8. Admin REST + RBAC
   9. Unit/IT, chaos tests; docs
   
   **Epic: Optional Backends**
   1. Redis backend (broker + server), with timeouts and circuit breaker
   2. RocksDB backend (server)
   
   
   ## 16. Appendix A: Pseudocode
   ```
   // Broker
   Optional<BrokerResponseNative> cached = brokerCache.get(key);
   if (cached.isPresent()) return cached.get();
   BrokerResponseNative resp = executeDownstream(query);  
   if (isCacheable(resp, query)) brokerCache.put(key, resp, 
serializedSize(resp));
   return resp;
   
   // Server (per-segment)
   SegmentCacheKey key = composeKey(segment, planSig, opts, ...);
   SegmentCacheValue val = segmentCache.get(key);
   if (val != null) return val; // short-circuit
   SegmentCacheValue computed = runOperators(segment, plan);
   if (isCacheable(computed)) segmentCache.put(key, computed, size(computed));
   return computed;
   ```
   
   
   ## 17. Appendix B: Industry Comparison (Condensed)
   
   - Druid: Broker result cache + historical segment cache; keys include query 
+ segment version; invalidation on version bump.
   - ClickHouse: Final result cache keyed by AST + part versions; heavy 
reliance on block/index caches; no per-part result cache.
   - StarRocks: BE per-tablet plan-signature cache; strong versioning; partial 
reuse supported.
   
   ## 18. Assets
   
   - This RFC Doc: 
https://docs.google.com/document/d/1qUjLEJnhODD3oJrOP_Q15JP-Tb4RkLLNyDHYaWcztMg/edit?usp=sharing
   - Design Doc: Need to add later.
   
      


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to