christian-schlichtherle opened a new issue, #19575: URL: https://github.com/apache/druid/issues/19575
### Affected Version 37.0.0 (the relevant code is unchanged on current master) ### Description **Summary.** A Kafka-type lookup (`druid-kafka-extraction-namespace`) is reported as started/loaded as soon as its Kafka consumer completes its first `poll()`, not when it has caught up to the end of the topic. Because `KafkaLookupExtractorFactory` generates a fresh random `group.id` (the `factoryId`) on every process start, the consumer finds no committed offsets and re-reads the entire (typically compacted) topic from offset 0. During that catch-up window the process announces itself and serves queries from a partially populated map: `LOOKUP(...)` returns NULL and `SELECT * FROM lookup."X"` returns no rows for keys that are durably present in the topic — and this affects *all* keys, not just recently added ones. **Where this bites hard:** Kubernetes deployments with broker autoscaling. Every HPA scale-up adds a cold broker that immediately receives query traffic, so query results flap between complete and empty depending on which broker happens to serve them, then self-heal once the consumer catches up. With an aggressive HPA (we observed 2→4→5→4→2 within ~26 minutes, and hundreds of rescales over ~19 days) this produces user-visible intermittent data loss in dashboards several times per hour. **Root cause.** In [`KafkaLookupExtractorFactory.start()`](https://github.com/apache/druid/blob/master/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java) the startup latch is released by the first poll, even if it returns zero records: ```java final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); startingReads.countDown(); ``` and `start()` only awaits that latch (bounded by `connectTimeout`) — i.e. it waits for *connection*, not for *catch-up*. Consequently: - `LookupReferencesManager` considers the lookup loaded and the lookup status APIs report `loaded: true` while the in-memory map is still mostly empty; - the node announces itself (we run `druid.lookup.enableLookupSyncOnStartup=true`, which doesn't help because of the above); - there is no API or metric exposing the catch-up state, so operators cannot gate a Kubernetes readiness probe on it either. **Cluster / configuration** - Druid 37.0.0 on Kubernetes (druid-operator), ZooKeeper-less (`druid.discovery.type=k8s`) - Brokers: 2–5 replicas via HPA on CPU - Lookup: type `kafka`, log-compacted topic (`cleanup.policy=compact`), 8 partitions, ~280 keys **Steps to reproduce** 1. Create a Kafka lookup over a compacted topic with a non-trivial number of keys. 2. Start an additional broker and immediately run `SELECT COUNT(*) FROM lookup."X"` against it (port-forward to the pod to bypass load balancing). 3. The count starts at/near 0 and grows until the consumer catches up, while the lookup is already reported as loaded on that broker. The broker log shows `Found no committed offset for partition ...` followed by `Resetting offset for partition ... to position FetchPosition{offset=0, ...}` for all partitions, confirming the full re-read under a fresh random group id. **Expected behavior** (either would solve it) - `start()` waits — optionally behind a config flag, with a configurable timeout — until the consumer reaches the end offsets captured at subscription time before reporting started, so that "loaded" means "caught up"; or - the factory exposes its catch-up state (e.g. via lookup introspection and/or a metric) so the broker readiness endpoint `/druid/broker/v1/readiness` (or an operator-provided probe) can take it into account before the node receives queries. **Debugging already done** - Verified at steady state that all warm brokers hold the full map (~280 entries each) and repeated queries return correct results — no intermittency once brokers are warm. - Reproduced the empty results by querying a freshly started broker directly during a scale-up while warm brokers answered the same query correctly at the same time. - Read `KafkaLookupExtractorFactory.start()` on current master and confirmed the latch is counted down by the first poll regardless of consumer lag. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
