Hello everyone, I've been looking into the specifics of this KIP's implementation. At Littlehorse, we are very interested in this KIP, and we would like to contribute to its implementation and/or code review. This KIP is a crucial step toward KIP-892 and should enhance standby performance, as it won't cause Rocksdb flushes every 10,000 records.
I put some effort into understanding the code around StateStores and offset computation, and I also looked over the PR that was already merged and the other one that is still in progress. Additionally, I was told about this issue (KAFKA-19434) that is related to this KIP. I believe that this issue is mainly caused by the fact that we create Tasks in the StateDirectory class (see StateDirectory#initializeStartupTasks), and as a consequence, these tasks get assigned to the main thread. That’s why I started to explore an alternative that opens StateStores instead of Tasks during this initialization process, but then I realized that it is not an easy change because the StateStore interface doesn’t support opening stores without being assigned to a stream thread. According to my understanding, StateStores' lifecycle is as follows: 1) During topology build time, state factory classes call store constructors to create instances. 2) During kafka streams rebalances, the “StateStore#init” method opens and assigns stores to StreamThreads. Metrics must be registered at this point. 3) Kafka Streams calls “StateStore#close” to close the instance. However, from what I can see, this process is slightly different for RocksDBStore. RocksDBStore has an extra method, which is “RocksDBStores#openDB.” This method only opens the rocksdb instance and makes it writable. Segmented stores use this method to open multiple segments, which are RocksDBStore instances, and never assign the segment store to the stream thread (“StateStore#init”) (see KeyValueSegments#getOrCreateSegment). With that in mind, I think we can use a similar approach when discovering stores already present in the state directory and open the store instance without calling the #init method. But that will require us to modify this KIP in order to add a new method to the `StateStore` API. I pushed a PR to demonstrate this alternative in the code https://github.com/apache/kafka/pull/20749. Let me know what you think about this alternative. El dom, 7 abr 2024 a las 10:36, Nick Telford (<[email protected]>) escribió: > Hi everyone, > > Based on some offline discussion, I've split out the "Atomic Checkpointing" > section from KIP-892: Transactional Semantics for StateStores, into its own > KIP > > KIP-1035: StateStore managed changelog offsets > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets > > While KIP-892 was adopted *with* the changes outlined in KIP-1035, these > changes were always the most contentious part, and continued to spur > discussion even after KIP-892 was adopted. > > All the changes introduced in KIP-1035 have been removed from KIP-892, and > a hard dependency on KIP-1035 has been added to KIP-892 in their place. > > I'm hopeful that with some more focus on this set of changes, we can > deliver something that we're all happy with. > > Regards, > Nick >
