Hello everyone, I've been looking into the specifics of this KIP's
implementation. At Littlehorse, we are very interested in this KIP, and we
would like to contribute to its implementation and/or code review. This KIP
is a crucial step toward KIP-892 and should enhance standby performance, as
it won't cause Rocksdb flushes every 10,000 records.

I put some effort into understanding the code around StateStores and offset
computation, and I also looked over the PR that was already merged and the
other one that is still in progress.

Additionally, I was told about this issue (KAFKA-19434) that is related to
this KIP. I believe that this issue is mainly caused by the fact that we
create Tasks in the StateDirectory class (see
StateDirectory#initializeStartupTasks), and as a consequence, these tasks
get assigned to the main thread. That’s why I started to explore an
alternative that opens StateStores instead of Tasks during this
initialization process, but then I realized that it is not an easy change
because the StateStore interface doesn’t support opening stores without
being assigned to a stream thread.

According to my understanding, StateStores' lifecycle is as follows: 1)
During topology build time, state factory classes call store constructors
to create instances. 2) During kafka streams rebalances, the
“StateStore#init” method opens and assigns stores to StreamThreads. Metrics
must be registered at this point. 3) Kafka Streams calls “StateStore#close”
to close the instance.

However, from what I can see, this process is slightly different for
RocksDBStore. RocksDBStore has an extra method, which is
“RocksDBStores#openDB.” This method only opens the rocksdb instance and
makes it writable. Segmented stores use this method to open multiple
segments, which are RocksDBStore instances, and never assign the segment
store to the stream thread (“StateStore#init”) (see
KeyValueSegments#getOrCreateSegment).

With that in mind, I think we can use a similar approach when discovering
stores already present in the state directory and open the store instance
without calling the #init method. But that will require us to modify this
KIP in order to add a new method to the `StateStore` API. I pushed a PR to
demonstrate this alternative in the code
https://github.com/apache/kafka/pull/20749.

Let me know what you think about this alternative.

El dom, 7 abr 2024 a las 10:36, Nick Telford (<[email protected]>)
escribió:

> Hi everyone,
>
> Based on some offline discussion, I've split out the "Atomic Checkpointing"
> section from KIP-892: Transactional Semantics for StateStores, into its own
> KIP
>
> KIP-1035: StateStore managed changelog offsets
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
>
> While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
> changes were always the most contentious part, and continued to spur
> discussion even after KIP-892 was adopted.
>
> All the changes introduced in KIP-1035 have been removed from KIP-892, and
> a hard dependency on KIP-1035 has been added to KIP-892 in their place.
>
> I'm hopeful that with some more focus on this set of changes, we can
> deliver something that we're all happy with.
>
> Regards,
> Nick
>

Reply via email to