christian-schlichtherle opened a new issue, #19575:
URL: https://github.com/apache/druid/issues/19575

   ### Affected Version
   
   37.0.0 (the relevant code is unchanged on current master)
   
   ### Description
   
   **Summary.** A Kafka-type lookup (`druid-kafka-extraction-namespace`) is 
reported as started/loaded as soon as its Kafka consumer completes its first 
`poll()`, not when it has caught up to the end of the topic. Because 
`KafkaLookupExtractorFactory` generates a fresh random `group.id` (the 
`factoryId`) on every process start, the consumer finds no committed offsets 
and re-reads the entire (typically compacted) topic from offset 0. During that 
catch-up window the process announces itself and serves queries from a 
partially populated map: `LOOKUP(...)` returns NULL and `SELECT * FROM 
lookup."X"` returns no rows for keys that are durably present in the topic — 
and this affects *all* keys, not just recently added ones.
   
   **Where this bites hard:** Kubernetes deployments with broker autoscaling. 
Every HPA scale-up adds a cold broker that immediately receives query traffic, 
so query results flap between complete and empty depending on which broker 
happens to serve them, then self-heal once the consumer catches up. With an 
aggressive HPA (we observed 2→4→5→4→2 within ~26 minutes, and hundreds of 
rescales over ~19 days) this produces user-visible intermittent data loss in 
dashboards several times per hour.
   
   **Root cause.** In 
[`KafkaLookupExtractorFactory.start()`](https://github.com/apache/druid/blob/master/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java)
 the startup latch is released by the first poll, even if it returns zero 
records:
   
   ```java
   final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1000));
   startingReads.countDown();
   ```
   
   and `start()` only awaits that latch (bounded by `connectTimeout`) — i.e. it 
waits for *connection*, not for *catch-up*. Consequently:
   
   - `LookupReferencesManager` considers the lookup loaded and the lookup 
status APIs report `loaded: true` while the in-memory map is still mostly empty;
   - the node announces itself (we run 
`druid.lookup.enableLookupSyncOnStartup=true`, which doesn't help because of 
the above);
   - there is no API or metric exposing the catch-up state, so operators cannot 
gate a Kubernetes readiness probe on it either.
   
   **Cluster / configuration**
   
   - Druid 37.0.0 on Kubernetes (druid-operator), ZooKeeper-less 
(`druid.discovery.type=k8s`)
   - Brokers: 2–5 replicas via HPA on CPU
   - Lookup: type `kafka`, log-compacted topic (`cleanup.policy=compact`), 8 
partitions, ~280 keys
   
   **Steps to reproduce**
   
   1. Create a Kafka lookup over a compacted topic with a non-trivial number of 
keys.
   2. Start an additional broker and immediately run `SELECT COUNT(*) FROM 
lookup."X"` against it (port-forward to the pod to bypass load balancing).
   3. The count starts at/near 0 and grows until the consumer catches up, while 
the lookup is already reported as loaded on that broker. The broker log shows 
`Found no committed offset for partition ...` followed by `Resetting offset for 
partition ... to position FetchPosition{offset=0, ...}` for all partitions, 
confirming the full re-read under a fresh random group id.
   
   **Expected behavior** (either would solve it)
   
   - `start()` waits — optionally behind a config flag, with a configurable 
timeout — until the consumer reaches the end offsets captured at subscription 
time before reporting started, so that "loaded" means "caught up"; or
   - the factory exposes its catch-up state (e.g. via lookup introspection 
and/or a metric) so the broker readiness endpoint `/druid/broker/v1/readiness` 
(or an operator-provided probe) can take it into account before the node 
receives queries.
   
   **Debugging already done**
   
   - Verified at steady state that all warm brokers hold the full map (~280 
entries each) and repeated queries return correct results — no intermittency 
once brokers are warm.
   - Reproduced the empty results by querying a freshly started broker directly 
during a scale-up while warm brokers answered the same query correctly at the 
same time.
   - Read `KafkaLookupExtractorFactory.start()` on current master and confirmed 
the latch is counted down by the first poll regardless of consumer lag.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to