102: Thanks for clarifying. Looked into the code now. Makes sense. Might be something to be worth calling out explicitly in the KIP writeup. -- Now that I realize that the position is tracked inside the store (not outside as the changelog offsets) it makes much more sense to pull position into RocksDB itself. In the end, it's actually a "store implementation" detail how it tracks the position (and kinda leaky abstraction currently, that we re-use the checkpoint file mechanism to track it and flush to disk).
200: I was thinking about this a little bit more, and maybe it's not too bad? When KS starts up, we could upon all stores we find on local disk pro-actively, and keep them all open until the first rebalance finishes: For tasks we get assigned, we hand in the already opened store (this would amortize the cost to open the store before the rebalance) and for non-assigned tasks, we know the offset information won't change and we could just cache it in-memory for later reuse (ie, next rebalance) and close the store to free up resources? -- Assuming that we would get a large percentage of opened stores assigned as tasks anyway, this could work?
-Matthias On 5/3/24 1:29 AM, Bruno Cadonna wrote:
Hi Matthias, 101:Let's assume a RocksDB store, but I think the following might be true also for other store implementations. With this KIP, if Kafka Streams commits the offsets, the committed offsets will be stored in an in-memory data structure (i.e. the memtable) and stay there until RocksDB decides that it is time to persist its in-memory data structure. If Kafka Streams writes its position to the .position file during a commit and a crash happens before RocksDB persist the memtable then the position in the .position file is ahead of the persisted offset. If IQ is done between the crash and the state store fully restored the changelog, the position might tell IQ that the state store is more up-to-date than it actually is. In contrast, if Kafka Streams handles persisting positions the same as persisting offset, the position should always be consistent with the offset, because they are persisted together.102:I am confused about your confusion which tells me that we are talking about two different things.You asked"Do you intent to add this information [i.e. position] to the map passed via commit(final Map<TopicPartition, Long> changelogOffsets)?"and with what I wrote I meant that we do not need to pass the position into the implementation of the StateStore interface since the position is updated within the implementation of the StateStore interface (e.g. RocksDBStore [1]). My statement describes the behavior now, not the change proposed in this KIP, so it does not contradict what is stated in the KIP.200: This is about Matthias' main concern about rebalance metadata.As far as I understand the KIP, Kafka Streams will only use the .checkpoint files to compute the task lag for unassigned tasks whose state is locally available. For assigned tasks, it will use the offsets managed by the open state store.Best, Bruno[1] https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397On 5/1/24 3:00 AM, Matthias J. Sax wrote:Thanks Bruno.101: I think I understand this better now. But just want to make sure I do. What do you mean by "they can diverge" and "Recovering after a failure might load inconsistent offsets and positions."The checkpoint is the offset from the changelog, while the position is the offset from the upstream source topic, right? -- In the end, the position is about IQ, and if we fail to update it, it only means that there is some gap when we might not be able to query a standby task, because we think it's not up-to-date enough even if it is, which would resolve itself soon? Ie, the position might "lag", but it's not "inconsistent". Do we believe that this lag would be highly problematic?102: I am confused.The position is maintained inside the state store, but is persisted in the .position file when the state store closes.This contradicts the KIP:these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position fileMy main concern is currently about rebalance metadata -- opening RocksDB stores seems to be very expensive, but if we follow the KIP:We will do this under EOS by updating the .checkpoint file whenever a store is close()d.It seems, having the offset inside RocksDB does not help us at all? In the end, when we crash, we don't want to lose the state, but when we update the .checkpoint only on a clean close, the .checkpoint might be stale (ie, still contains the checkpoint when we opened the store when we got a task assigned).-Matthias On 4/30/24 2:40 AM, Bruno Cadonna wrote:Hi all, 100I think we already have such a wrapper. It is called AbstractReadWriteDecorator.101Currently, the position is checkpointed when a offset checkpoint is written. If we let the state store manage the committed offsets, we need to also let the state store also manage the position otherwise they might diverge. State store managed offsets can get flushed (i.e. checkpointed) to the disk when the state store decides to flush its in-memory data structures, but the position is only checkpointed at commit time. Recovering after a failure might load inconsistent offsets and positions.102The position is maintained inside the state store, but is persisted in the .position file when the state store closes. The only public interface that uses the position is IQv2 in a read-only mode. So the position is only updated within the state store and read from IQv2. No need to add anything to the public StateStore interface.103 Deprecating managesOffsets() right away might be a good idea. 104I agree that we should try to support downgrades without wipes. At least Nick should state in the KIP why we do not support it.Best, Bruno On 4/23/24 8:13 AM, Matthias J. Sax wrote:Thanks for splitting out this KIP. The discussion shows, that it is a complex beast by itself, so worth to discuss by its own.Couple of question / comment:100 `StateStore#commit()`: The JavaDoc says "must not be called by users" -- I would propose to put a guard in place for this, by either throwing an exception (preferable) or adding a no-op implementation (at least for our own stores, by wrapping them -- we cannot enforce it for custom stores I assume), and document this contract explicitly.101 adding `.position` to the store: Why do we actually need this? The KIP says "To ensure consistency with the committed data and changelog offsets" but I am not sure if I can follow? Can you elaborate why leaving the `.position` file as-is won't work?If it's possible at all, it will need to be done bycreating temporary StateManagers and StateStores during rebalance. I think it is possible, and probably not too expensive, but the devil will be inthe detail.This sounds like a significant overhead to me. We know that opening a single RocksDB takes about 500ms, and thus opening RocksDB to get this information might slow down rebalances significantly.102: It's unclear to me, how `.position` information is added. The KIP only says: "position offsets will be stored in RocksDB, in the same column family as the changelog offsets". Do you intent to add this information to the map passed via `commit(final Map<TopicPartition, Long> changelogOffsets)`? The KIP should describe this in more detail. Also, if my assumption is correct, we might want to rename the parameter and also have a better JavaDoc description?103: Should we make it mandatory (long-term) that all stores (including custom stores) manage their offsets internally? Maintaining both options and thus both code paths puts a burden on everyone and make the code messy. I would strongly prefer if we could have mid-term path to get rid of supporting both. -- For this case, we should deprecate the newly added `managesOffsets()` method right away, to point out that we intend to remove it. If it's mandatory to maintain offsets for stores, we won't need this method any longer. In memory stores can just return null from #committedOffset().104 "downgrading": I think it might be worth to add support for downgrading w/o the need to wipe stores? Leveraging `upgrade.from` parameter, we could build a two rolling bounce downgrade: (1) the new code is started with `upgrade.from` set to a lower version, telling the runtime to do the cleanup on `close()` -- (ie, ensure that all data is written into `.checkpoint` and `.position` file, and the newly added CL is deleted). In a second, rolling bounce, the old code would be able to open RocksDB. -- I understand that this implies much more work, but downgrade seems to be common enough, that it might be worth it? Even if we did not always support this in the past, we have the face the fact that KS is getting more and more adopted and as a more mature product should support this?-Matthias On 4/21/24 11:58 PM, Bruno Cadonna wrote:Hi all, How should we proceed here? 1. with the plain .checkpoint file2. with a way to use the state store interface on unassigned but locally existing task stateWhile I like option 2, I think option 1 is less risky and will give us the benefits of transactional state stores sooner. We should consider the interface approach afterwards, though.Best, Bruno On 4/17/24 3:15 PM, Bruno Cadonna wrote:Hi Nick and Sophie,I think the task ID is not enough to create a state store that can read the offsets of non-assigned tasks for lag computation during rebalancing. The state store also needs the state directory so that it knows where to find the information that it needs to return from changelogOffsets().In general, I think we should proceed with the plain .checkpoint file for now and iterate back to the state store solution later since it seems it is not that straightforward. Alternatively, Nick could timebox an effort to better understand what would be needed for the state store solution. Nick, let us know your decision.Regarding your question about the state store instance. I am not too familiar with that part of the code, but I think the state store is build when the processor topology is build and the processor topology is build per stream task. So there is one instance of processor topology and state store per stream task. Try to follow the call in [1].Best, Bruno[1] https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153On 4/16/24 8:59 PM, Nick Telford wrote:That does make sense. The one thing I can't figure out is how per-TaskStateStore instances are constructed.It looks like we construct one StateStore instance for the whole Topology (in InternalTopologyBuilder), and pass that into ProcessorStateManager (viaStateManagerUtil) for each Task, which then initializes it.This can't be the case though, otherwise multiple partitions of the same sub-topology (aka Tasks) would share the same StateStore instance, whichthey don't. What am I missing?On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman <sop...@responsive.dev>wrote:I don't think we need to *require* a constructor accept the TaskId, but wewould definitely make sure that the RocksDB state store changes its constructor to one that accepts the TaskID (which we can do withoutdeprecation since its an internal API), and custom state stores can just decide for themselves whether they want to opt-in/use the TaskId paramor not. I mean custom state stores would have to opt-in anyways by implementing the new StoreSupplier#get(TaskId) API and the onlyreason to do that would be to have created a constructor that acceptsa TaskIdJust to be super clear about the proposal, this is what I had in mind. It's actually fairly simple and wouldn't add much to the scope of the KIP (I think -- if it turns out to be more complicated than I'm assuming, we should definitely do whatever has the smallest LOE to get this doneAnyways, the (only) public API changes would be to add this new method to the StoreSupplier API: default T get(final TaskId taskId) { return get(); } We can decide whether or not to deprecate the old #get but it's notreally necessary and might cause a lot of turmoil, so I'd personallysay we just leave both APIs in place.And that's it for public API changes! Internally, we would just adapteach of the rocksdb StoreSupplier classes to implement this new API. So for example with the RocksDBKeyValueBytesStoreSupplier, we just add @Override public KeyValueStore<Bytes, byte[]> get(final TaskId taskId) { return returnTimestampedStore ?new RocksDBTimestampedStore(name, metricsScope(), taskId) :new RocksDBStore(name, metricsScope(), taskId); } And of course add the TaskId parameter to each of the actual state store constructors returned here. Does that make sense? It's entirely possible I'm missing somethingimportant here, but I think this would be a pretty small addition thatwould solve the problem you mentioned earlier while also being useful to anyone who uses custom state stores.On Mon, Apr 15, 2024 at 10:21 AM Nick Telford <nick.telf...@gmail.com>wrote:Hi Sophie, Interesting idea! Although what would that mean for the StateStoreinterface? Obviously we can't require that the constructor take theTaskId.Is it enough to add the parameter to the StoreSupplier?Would doing this be in-scope for this KIP, or are we over-complicatingit?NickOn Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman <sop...@responsive.devwrote:Somewhat minor point overall, but it actually drives me crazy that you can't get access to the taskId of a StateStore until #init is called.Thishas caused me a huge headache personally (since the same is true for processors and I was trying to do something that's probably too hackytoactually complain about here lol)Can we just change the StateStoreSupplier to receive and pass along the taskId when creating a new store? Presumably by adding a new version ofthe#get method that takes in a taskId parameter? We can have it default toinvoking the old one for compatibility reasons and it should becompletelysafe to tack on. Would also prefer the same for a ProcessorSupplier, but that'sdefinitelyoutside the scope of this KIPOn Fri, Apr 12, 2024 at 3:31 AM Nick Telford <nick.telf...@gmail.com>wrote:On further thought, it's clear that this can't work for one simplereason:StateStores don't know their associated TaskId (and hence, theirStateDirectory) until the init() call. Therefore, committedOffset()can'tbe called before init(), unless we also added a StateStoreContextargumentto committedOffset(), which I think might be trying to shoehorn toomuchinto committedOffset().I still don't like the idea of the Streams engine maintaining thecacheofchangelog offsets independently of stores, mostly because of themaintenance burden of the code duplication, but it looks like we'llhavetolive with it. Unless you have any better ideas? Regards, NickOn Wed, 10 Apr 2024 at 14:12, Nick Telford <nick.telf...@gmail.com>wrote:Hi Bruno,Immediately after I sent my response, I looked at the codebase andcametothe same conclusion. If it's possible at all, it will need to bedonebycreating temporary StateManagers and StateStores during rebalance.Ithinkit is possible, and probably not too expensive, but the devil willbeinthe detail. I'll try to find some time to explore the idea to see if it'spossibleandreport back, because we'll need to determine this before we canvoteontheKIP. Regards, NickOn Wed, 10 Apr 2024 at 11:36, Bruno Cadonna <cado...@apache.org>wrote:Hi Nick, Thanks for reacting on my comments so quickly! 2. Some thoughts on your proposal.State managers (and state stores) are parts of tasks. If the taskisnotassigned locally, we do not create those tasks. To get the offsetswithyour approach, we would need to either create kind of inactivetasksbesides active and standby tasks or store and manage statemanagersofnon-assigned tasks differently than the state managers of assigned tasks. Additionally, the cleanup thread that removes unassignedtaskdirectories needs to concurrently delete those inactive tasks or task-less state managers of unassigned tasks. This seems all quitemessyto me.Could we create those state managers (or state stores) for locallyexisting but unassigned tasks on demand whenTaskManager#getTaskOffsetSums() is executed? Or have a differentencapsulation for the unused task directories? Best, Bruno On 4/10/24 11:31 AM, Nick Telford wrote:Hi Bruno, Thanks for the review! 1, 4, 5. Done 3. You're right. I've removed the offending paragraph. I hadoriginallyadapted this from the guarantees outlined in KIP-892. But it'sdifficult toprovide these guarantees without the KIP-892 transactionbuffers.Instead,we'll add the guarantees back into the JavaDoc when KIP-892lands.2. Good point! This is the only part of the KIP that was(significantly)changed when I extracted it from KIP-892. My prototype currentlymaintainsthis "cache" of changelog offsets in .checkpoint, but doing sobecomesverymessy. My intent with this change was to try to betterencapsulatethisoffset "caching", especially for StateStores that can cheaplyprovidetheoffsets stored directly in them without needing to duplicatetheminthiscache.It's clear some more work is needed here to better encapsulatethis.Myimmediate thought is: what if we construct *but don'tinitialize*theStateManager and StateStores for every Task directory on-disk?Thatshouldstill be quite cheap to do, and would enable us to query theoffsetsforall on-disk stores, even if they're not open. If theStateManager(aka.ProcessorStateManager/GlobalStateManager) proves too expensivetoholdopenfor closed stores, we could always have a "StubStateManager" initsplace,that enables the querying of offsets, but nothing else? IDK, what do you think? Regards, NickOn Tue, 9 Apr 2024 at 15:00, Bruno Cadonna <cado...@apache.org>wrote:Hi Nick, Thanks for breaking out the KIP from KIP-892! Here a couple of comments/questions: 1.In Kafka Streams, we have a design guideline which says to notusethe"get"-prefix for getters on the public API. Could you pleasechangegetCommittedOffsets() to committedOffsets()? 2. It is not clear to me how TaskManager#getTaskOffsetSums()shouldreadoffsets of tasks the stream thread does not own but that have astatedirectory on the Streams client by callingStateStore#getCommittedOffsets(). If the thread does not own ataskitdoes also not create any state stores for the task, which meansthereisno state store on which to call getCommittedOffsets().I would have rather expected that a checkpoint file is writtenforallstate stores on close -- not only for the RocksDBStore -- andthatthischeckpoint file is read in TaskManager#getTaskOffsetSums() forthetasksthat have a state directory on the client but are not currentlyassignedto any stream thread of the Streams client. 3. In the javadocs for commit() you write "... all writes since the last commit(Map), or sinceinit(StateStore)*MUST* be available to readers, even after a restart."This is only true for a clean close before the restart, isn'tit?If the task fails with a dirty close, Kafka Streams cannotguaranteethat the in-memory structures of the state store (e.g. memtableinthecase of RocksDB) are flushed so that the records and thecommittedoffsets are persisted. 4.The wrapper that provides the legacy checkpointing behavior isactuallyan implementation detail. I would remove it from the KIP, butstillstate that the legacy checkpointing behavior will be supportedwhenthestate store does not manage the checkpoints. 5.Regarding the metrics, could you please add the tags, and therecordinglevel (DEBUG or INFO) as done in KIP-607 or KIP-444. Best, Bruno On 4/7/24 5:35 PM, Nick Telford wrote:Hi everyone,Based on some offline discussion, I've split out the "AtomicCheckpointing"section from KIP-892: Transactional Semantics for StateStores,intoitsownKIP KIP-1035: StateStore managed changelog offsetshttps://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsetsWhile KIP-892 was adopted *with* the changes outlined inKIP-1035,thesechanges were always the most contentious part, and continuedtospurdiscussion even after KIP-892 was adopted.All the changes introduced in KIP-1035 have been removed fromKIP-892,anda hard dependency on KIP-1035 has been added to KIP-892 intheirplace.I'm hopeful that with some more focus on this set of changes,wecandeliver something that we're all happy with. Regards, Nick