Hi Alex,

Thank a lot for explaining!

Now some aspects are clearer to me.

While I understand now, how the state store can roll forward, I have the feeling that rolling forward is specific to the 2-state-store implementation with RocksDB of your PoC. Other state store implementations might use a different strategy to react to crashes. For example, they might apply an atomic write and effectively rollback if they crash before committing the state store transaction. I think the KIP should not contain such implementation details but provide an interface to accommodate rolling forward and rolling backward.

So, I would remove the details about the 2-state-store implementation from the KIP or provide it as an example of a possible implementation at the end of the KIP.

Since a state store implementation can roll forward or roll back, I think it is fine to return the changelog offset from recover(). With the returned changelog offset, Streams knows from where to start state store restoration.

Could you please describe the usage of commit() and recover() in the commit workflow in the KIP as we did in this thread but independently from the state store implementation? That would make things clearer. Additionally, descriptions of failure scenarios would also be helpful.

Best,
Bruno


On 04.08.22 16:39, Alexander Sorokoumov wrote:
Hey Bruno,

Thank you for the suggestions and the clarifying questions. I believe that
they cover the core of this proposal, so it is crucial for us to be on the
same page.

1. Don't you want to deprecate StateStore#flush().


Good call! I updated both the proposal and the prototype.

  2. I would shorten Materialized#withTransactionalityEnabled() to
Materialized#withTransactionsEnabled().


Turns out, these methods are no longer necessary. I removed them from the
proposal and the prototype.


3. Could you also describe a bit more in detail where the offsets passed
into commit() and recover() come from?


The offset passed into StateStore#commit is the last offset committed to
the changelog topic. The offset passed into StateStore#recover is the last
checkpointed offset for the given StateStore. Let's look at steps 3 and 4
in the commit workflow. After the TaskExecutor/TaskManager commits, it calls
StreamTask#postCommit[1] that in turn:
a. updates the changelog offsets via
ProcessorStateManager#updateChangelogOffsets[2]. The offsets here come from
the RecordCollector[3], which tracks the latest offsets the producer sent
without exception[4, 5].
b. flushes/commits the state store in AbstractTask#maybeCheckpoint[6]. This
method essentially calls ProcessorStateManager methods - flush/commit[7]
and checkpoint[8]. ProcessorStateManager#commit goes over all state stores
that belong to that task and commits them with the offset obtained in step
`a`. ProcessorStateManager#checkpoint writes down those offsets for all
state stores, except for non-transactional ones in the case of EOS.

During initialization, StreamTask calls
StateManagerUtil#registerStateStores[8] that in turn calls
ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9]. At the
moment, this method assigns checkpointed offsets to the corresponding state
stores[10]. The prototype also calls StateStore#recover with the
checkpointed offset and assigns the offset returned by recover()[11].

4. I do not quite understand how a state store can roll forward. You
mention in the thread the following:


The 2-state-stores commit looks like this [12]:

    1. Flush the temporary state store.
    2. Create a commit marker with a changelog offset corresponding to the
    state we are committing.
    3. Go over all keys in the temporary store and write them down to the
    main one.
    4. Wipe the temporary store.
    5. Delete the commit marker.


Let's consider crash failure scenarios:

    - Crash failure happens between steps 1 and 2. The main state store is
    in a consistent state that corresponds to the previously checkpointed
    offset. StateStore#recover throws away the temporary store and proceeds
    from the last checkpointed offset.
    - Crash failure happens between steps 2 and 3. We do not know what keys
    from the temporary store were already written to the main store, so we
    can't roll back. There are two options - either wipe the main store or roll
    forward. Since the point of this proposal is to avoid situations where we
    throw away the state and we do not care to what consistent state the store
    rolls to, we roll forward by continuing from step 3.
    - Crash failure happens between steps 3 and 4. We can't distinguish
    between this and the previous scenario, so we write all the keys from the
    temporary store. This is okay because the operation is idempotent.
    - Crash failure happens between steps 4 and 5. Again, we can't
    distinguish between this and previous scenarios, but the temporary store is
    already empty. Even though we write all keys from the temporary store, this
    operation is, in fact, no-op.
    - Crash failure happens between step 5 and checkpoint. This is the case
    you referred to in question 5. The commit is finished, but it is not
    reflected at the checkpoint. recover() returns the offset of the previous
    commit here, which is incorrect, but it is okay because we will replay the
    changelog from the previously committed offset. As changelog replay is
    idempotent, the state store recovers into a consistent state.

The last crash failure scenario is a natural transition to

how should Streams know what to write into the checkpoint file
after the crash?


As mentioned above, the Streams app writes the checkpoint file after the
Kafka transaction and then the StateStore commit. Same as without the
proposal, it should write the committed offset, as it is the same for both
the Kafka changelog and the state store.


This issue arises because we store the offset outside of the state
store. Maybe we need an additional method on the state store interface
that returns the offset at which the state store is.


In my opinion, we should include in the interface only the guarantees that
are necessary to preserve EOS without wiping the local state. This way, we
allow more room for possible implementations. Thanks to the idempotency of
the changelog replay, it is "good enough" if StateStore#recover returns the
offset that is less than what it actually is. The only limitation here is
that the state store should never commit writes that are not yet committed
in Kafka changelog.

Please let me know what you think about this. First of all, I am relatively
new to the codebase, so I might be wrong in my understanding of
how it works. Second, while writing this, it occured to me that the
StateStore#recover interface method is not straightforward as it can be.
Maybe we can change it like that:

/**
     * Recover a transactional state store
     * <p>
     * If a transactional state store shut down with a crash failure, this
method ensures that the
     * state store is in a consistent state that corresponds to {@code
changelofOffset} or later.
     *
     * @param changelogOffset the checkpointed changelog offset.
     * @return {@code true} if recovery succeeded, {@code false} otherwise.
     */
boolean recover(final Long changelogOffset) {

Note: all links below except for [10] lead to the prototype's code.
1.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L468
2.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L580
3.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L868
4.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L94-L96
5.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L213-L216
6.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L94-L97
7.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L469
8.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L226
9.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L103
10.
https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L251-L252
11.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L250-L265
12.
https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java#L84-L88

Best,
Alex

On Fri, Jul 29, 2022 at 3:42 PM Bruno Cadonna <cado...@apache.org> wrote:

Hi Alex,

Thanks for the updates!

1. Don't you want to deprecate StateStore#flush(). As far as I
understand, commit() is the new flush(), right? If you do not deprecate
it, you don't get rid of the error room you describe in your KIP by
having a flush() and a commit().


2. I would shorten Materialized#withTransactionalityEnabled() to
Materialized#withTransactionsEnabled().


3. Could you also describe a bit more in detail where the offsets passed
into commit() and recover() come from?


For my next two points, I need the commit workflow that you were so kind
to post into this thread:

1. write stuff to the state store
2. producer.sendOffsetsToTransaction(token); producer.commitTransaction();
3. flush (<- that would be call to commit(), right?)
4. checkpoint


4. I do not quite understand how a state store can roll forward. You
mention in the thread the following:

"If the crash failure happens during #3, the state store can roll
forward and finish the flush/commit."

How does the state store know where it stopped the flushing when it
crashed?

This seems an optimization to me. I think in general the state store
should rollback to the last successfully committed state and restore
from there until the end of the changelog topic partition. The last
committed state is the offsets in the checkpoint file.


5. In the same e-mail from point 4, you also state:

"If the crash failure happens between #3 and #4, the state store should
do nothing during recovery and just proceed with the checkpoint."

How should Streams know that the failure was between #3 and #4 during
recovery? It just sees a valid state store and a valid checkpoint file.
Streams does not know that the state of the checkpoint file does not
match with the committed state of the state store.
Also, how should Streams know what to write into the checkpoint file
after the crash?
This issue arises because we store the offset outside of the state
store. Maybe we need an additional method on the state store interface
that returns the offset at which the state store is.


Best,
Bruno




On 27.07.22 11:51, Alexander Sorokoumov wrote:
Hey Nick,

Thank you for the kind words and the feedback! I'll definitely add an
option to configure the transactional mechanism in Stores factory method
via an argument as John previously suggested and might add the in-memory
option via RocksDB Indexed Batches if I figure why their creation via
rocksdb jni fails with `UnsatisfiedLinkException`.

Best,
Alex

On Wed, Jul 27, 2022 at 11:46 AM Alexander Sorokoumov <
asorokou...@confluent.io> wrote:

Hey Guozhang,

1) About the param passed into the `recover()` function: it seems to me
that the semantics of "recover(offset)" is: recover this state to a
transaction boundary which is at least the passed-in offset. And the
only
possibility that the returned offset is different than the passed-in
offset
is that if the previous failure happens after we've done all the commit
procedures except writing the new checkpoint, in which case the
returned
offset would be larger than the passed-in offset. Otherwise it should
always be equal to the passed-in offset, is that right?


Right now, the only case when `recover` returns an offset different from
the passed one is when the failure happens *during* commit.


If the failure happens after commit but before the checkpoint, `recover`
might return either a passed or newer committed offset, depending on the
implementation. The `recover` implementation in the prototype returns a
passed offset because it deletes the commit marker that holds that
offset
after the commit is done. In that case, the store will replay the last
commit from the changelog. I think it is fine as the changelog replay is
idempotent.

2) It seems the only use for the "transactional()" function is to
determine
if we can update the checkpoint file while in EOS.


Right now, there are 2 other uses for `transactional()`:
1. To determine what to do during initialization if the checkpoint is
gone
(see [1]). If the state store is transactional, we don't have to wipe
the
existing data. Thinking about it now, we do not really need this check
whether the store is `transactional` because if it is not, we'd not have
written the checkpoint in the first place. I am going to remove that
check.
2. To determine if the persistent kv store in KStreamImplJoin should be
transactional (see [2], [3]).

I am not sure if we can get rid of the checks in point 2. If so, I'd be
happy to encapsulate `transactional()` logic in `commit/recover`.

Best,
Alex

1.

https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281
2.

https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278
3.

https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354

On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Alex,

Excellent proposal, I'm very keen to see this land!

Would it be useful to permit configuring the type of store used for
uncommitted offsets on a store-by-store basis? This way, users could
choose
whether to use, e.g. an in-memory store or RocksDB, potentially
reducing
the overheads associated with RocksDb for smaller stores, but without
the
memory pressure issues?

I suspect that in most cases, the number of uncommitted records will be
very small, because the default commit interval is 100ms.

Regards,

Nick

On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <wangg...@gmail.com>
wrote:

Hello Alex,

Thanks for the updated KIP, I looked over it and browsed the WIP and
just
have a couple meta thoughts:

1) About the param passed into the `recover()` function: it seems to
me
that the semantics of "recover(offset)" is: recover this state to a
transaction boundary which is at least the passed-in offset. And the
only
possibility that the returned offset is different than the passed-in
offset
is that if the previous failure happens after we've done all the
commit
procedures except writing the new checkpoint, in which case the
returned
offset would be larger than the passed-in offset. Otherwise it should
always be equal to the passed-in offset, is that right?

2) It seems the only use for the "transactional()" function is to
determine
if we can update the checkpoint file while in EOS. But the purpose of
the
checkpoint file's offsets is just to tell "the local state's current
snapshot's progress is at least the indicated offsets" anyways, and
with
this KIP maybe we would just do:

a) when in ALOS, upon failover: we set the starting offset as
checkpointed-offset, then restore() from changelog till the
end-offset.
This way we may restore some records twice.
b) when in EOS, upon failover: we first call
recover(checkpointed-offset),
then set the starting offset as the returned offset (which may be
larger
than checkpointed-offset), then restore until the end-offset.

So why not also:
c) we let the `commit()` function to also return an offset, which
indicates
"checkpointable offsets".
d) for existing non-transactional stores, we just have a default
implementation of "commit()" which is simply a flush, and returns a
sentinel value like -1. Then later if we get checkpointable offsets
-1,
we
do not write the checkpoint. Upon clean shutting down we can just
checkpoint regardless of the returned value from "commit".
e) for existing non-transactional stores, we just have a default
implementation of "recover()" which is to wipe out the local store and
return offset 0 if the passed in offset is -1, otherwise if not -1
then
it
indicates a clean shutdown in the last run, can this function is just
a
no-op.

In that case, we would not need the "transactional()" function
anymore,
since for non-transactional stores their behaviors are still wrapped
in
the
`commit / recover` function pairs.

I have not completed the thorough pass on your WIP PR, so maybe I
could
come up with some more feedback later, but just let me know if my
understanding above is correct or not?


Guozhang




On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hi,

I updated the KIP with the following changes:
* Replaced in-memory batches with the secondary-store approach as the
default implementation to address the feedback about memory pressure
as
suggested by Sagar and Bruno.
* Introduced StateStore#commit and StateStore#recover methods as an
extension of the rollback idea. @Guozhang, please see the comment
below
on
why I took a slightly different approach than you suggested.
* Removed mentions of changes to IQv1 and IQv2. Transactional state
stores
enable reading committed in IQ, but it is really an independent
feature
that deserves its own KIP. Conflating them unnecessarily increases
the
scope for discussion, implementation, and testing in a single unit of
work.

I also published a prototype -
https://github.com/apache/kafka/pull/12393
that implements changes described in the proposal.

Regarding explicit rollback, I think it is a powerful idea that
allows
other StateStore implementations to take a different path to the
transactional behavior rather than keep 2 state stores. Instead of
introducing a new commit token, I suggest using a changelog offset
that
already 1:1 corresponds to the materialized state. This works nicely
because Kafka Stream first commits an AK transaction and only then
checkpoints the state store, so we can use the changelog offset to
commit
the state store transaction.

I called the method StateStore#recover rather than
StateStore#rollback
because a state store might either roll back or forward depending on
the
specific point of the crash failure.Consider the write algorithm in
Kafka
Streams is:
1. write stuff to the state store
2. producer.sendOffsetsToTransaction(token);
producer.commitTransaction();
3. flush
4. checkpoint

Let's consider 3 cases:
1. If the crash failure happens between #2 and #3, the state store
rolls
back and replays the uncommitted transaction from the changelog.
2. If the crash failure happens during #3, the state store can roll
forward
and finish the flush/commit.
3. If the crash failure happens between #3 and #4, the state store
should
do nothing during recovery and just proceed with the checkpoint.

Looking forward to your feedback,
Alexander

On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov <
asorokou...@confluent.io> wrote:

Hi,

As a status update, I did the following changes to the KIP:
* replaced configuration via the top-level config with configuration
via
Stores factory and StoreSuppliers,
* added IQv2 and elaborated how readCommitted will work when the
store
is
not transactional,
* removed claims about ALOS.

I am going to be OOO in the next couple of weeks and will resume
working
on the proposal and responding to the discussion in this thread
starting
June 27. My next top priorities are:
1. Prototype the rollback approach as suggested by Guozhang.
2. Replace in-memory batches with the secondary-store approach as
the
default implementation to address the feedback about memory
pressure as
suggested by Sagar and Bruno.
3. Adjust Stores methods to make transactional implementations
pluggable.
4. Publish the POC for the first review.

Best regards,
Alex

On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <wangg...@gmail.com>
wrote:

Alex,

Thanks for your replies! That is very helpful.

Just to broaden our discussions a bit here, I think there are some
other
approaches in parallel to the idea of "enforce to only persist upon
explicit flush" and I'd like to throw one here -- not really
advocating
it,
but just for us to compare the pros and cons:

1) We let the StateStore's `flush` function to return a token
instead
of
returning `void`.
2) We add another `rollback(token)` interface of StateStore which
would
effectively rollback the state as indicated by the token to the
snapshot
when the corresponding `flush` is called.
3) We encode the token and commit as part of
`producer#sendOffsetsToTransaction`.

Users could optionally implement the new functions, or they can
just
not
return the token at all and not implement the second function.
Again,
the
APIs are just for the sake of illustration, not feeling they are
the
most
natural :)

Then the procedure would be:

1. the previous checkpointed offset is 100
...
3. flush store, make sure all writes are persisted; get the
returned
token
that indicates the snapshot of 200.
4. producer.sendOffsetsToTransaction(token);
producer.commitTransaction();
5. Update the checkpoint file (say, the new value is 200).

Then if there's a failure, say between 3/4, we would get the token
from
the
last committed txn, and first we would do the restoration (which
may
get
the state to somewhere between 100 and 200), then call
`store.rollback(token)` to rollback to the snapshot of offset 100.

The pros is that we would then not need to enforce the state
stores to
not
persist any data during the txn: for stores that may not be able to
implement the `rollback` function, they can still reduce its impl
to
"not
persisting any data" via this API, but for stores that can indeed
support
the rollback, their implementation may be more efficient. The cons
though,
on top of my head are 1) more complicated logic differentiating
between
EOS
with and without store rollback support, and ALOS, 2) encoding the
token
as
part of the commit offset is not ideal if it is big, 3) the
recovery
logic
including the state store is also a bit more complicated.


Guozhang





On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hi Guozhang,

But I'm still trying to clarify how it guarantees EOS, and it
seems
that we
would achieve it by enforcing to not persist any data written
within
this
transaction until step 4. Is that correct?


This is correct. Both alternatives - in-memory
WriteBatchWithIndex
and
transactionality via the secondary store guarantee EOS by not
persisting
data in the "main" state store until it is committed in the
changelog
topic.

Oh what I meant is not what KStream code does, but that
StateStore
impl
classes themselves could potentially flush data to become
persisted
asynchronously


Thank you for elaborating! You are correct, the underlying state
store
should not persist data until the streams app calls
StateStore#flush.
There
are 2 options how a State Store implementation can guarantee
that -
either
keep uncommitted writes in memory or be able to roll back the
changes
that
were not committed during recovery. RocksDB's
WriteBatchWithIndex is
an
implementation of the first option. A considered alternative,
Transactions
via Secondary State Store for Uncommitted Changes, is the way to
implement
the second option.

As everyone correctly pointed out, keeping uncommitted data in
memory
introduces a very real risk of OOM that we will need to handle.
The
more I
think about it, the more I lean towards going with the
Transactions
via
Secondary Store as the way to implement transactionality as it
does
not
have that issue.

Best,
Alex


On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Alex,

we flush the cache, but not the underlying state store.

You're right. The ordering I mentioned above is actually:

...
3. producer.sendOffsetsToTransaction();
producer.commitTransaction();
4. flush store, make sure all writes are persisted.
5. Update the checkpoint file to 200.

But I'm still trying to clarify how it guarantees EOS, and it
seems
that
we
would achieve it by enforcing to not persist any data written
within
this
transaction until step 4. Is that correct?

Can you please point me to the place in the codebase where we
trigger
async flush before the commit?

Oh what I meant is not what KStream code does, but that
StateStore
impl
classes themselves could potentially flush data to become
persisted
asynchronously, e.g. RocksDB does that naturally out of the
control
of
KStream code. I think it is related to my previous question:
if we
think
by
guaranteeing EOS at the state store level, we would effectively
ask
the
impl classes that "you should not persist any data until
`flush`
is
called
explicitly", is the StateStore interface the right level to
enforce
such
mechanisms, or should we just do that on top of the
StateStores,
e.g.
during the transaction we just keep all the writes in the cache
(of
course
we need to consider how to work around memory pressure as
previously
mentioned), and then upon committing, we just write the cached
records
as a
whole into the store and then call flush.


Guozhang







On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey,

Thank you for the wealth of great suggestions and questions!
I
am
going
to
address the feedback in batches and update the proposal
async,
as
it is
probably going to be easier for everyone. I will also write a
separate
message after making updates to the KIP.

@John,

Did you consider instead just adding the option to the
RocksDB*StoreSupplier classes and the factories in Stores ?

Thank you for suggesting that. I think that this idea is
better
than
what I
came up with and will update the KIP with configuring
transactionality
via
the suppliers and Stores.

what is the advantage over just doing the same thing with the
RecordCache
and not introducing the WriteBatch at all?

Can you point me to RecordCache? I can't find it in the
project.
The
advantage would be that WriteBatch guarantees write
atomicity.
As
far
as
I
understood the way RecordCache works, it might leave the
system
in
an
inconsistent state during crash failure on write.

You mentioned that a transactional store can help reduce
duplication in
the
case of ALOS

I will remove claims about ALOS from the proposal. Thank you
for
elaborating!

As a reminder, we have a new IQv2 mechanism now. Should we
propose
any
changes to IQv1 to support this transactional mechanism,
versus
just
proposing it for IQv2? Certainly, it seems strange only to
propose a
change
for IQv1 and not v2.


   I will update the proposal with complementary API changes
for
IQv2

What should IQ do if I request to readCommitted on a
non-transactional
store?

We can assume that non-transactional stores commit on write,
so
IQ
works
in
the same way with non-transactional stores regardless of the
value
of
readCommitted.


   @Guozhang,

* If we crash between line 3 and 4, then at that time the
local
persistent
store image is representing as of offset 200, but upon
recovery
all
changelog records from 100 to log-end-offset would be
considered
as
aborted
and not be replayed and we would restart processing from
position
100.
Restart processing will violate EOS.I'm not sure how e.g.
RocksDB's
WriteBatchWithIndex would make sure that the step 4 and
step 5
could
be
done atomically here.


Could you please point me to the place in the codebase where
a
task
flushes
the store before committing the transaction?
Looking at TaskExecutor (








https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
),
StreamTask#prepareCommit (








https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
),
and CachedStateStore (








https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
)
we flush the cache, but not the underlying state store.
Explicit
StateStore#flush happens in
AbstractTask#maybeWriteCheckpoint (








https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
).
Is there something I am missing here?

Today all cached data that have not been flushed are not
committed
for
sure, but even flushed data to the persistent underlying
store
may
also
be
uncommitted since flushing can be triggered asynchronously
before
the
commit.

Can you please point me to the place in the codebase where we
trigger
async
flush before the commit? This would certainly be a reason to
introduce
a
dedicated StateStore#commit method.

Thanks again for the feedback. I am going to update the KIP
and
then
respond to the next batch of questions and suggestions.

Best,
Alex

On Mon, May 30, 2022 at 5:13 PM Suhas Satish
<ssat...@confluent.io.invalid

wrote:

Thanks for the KIP proposal Alex.
1. Configuration default

You mention applications using streams DSL with built-in
rocksDB
state
store will get transactional state stores by default when
EOS
is
enabled,
but the default implementation for apps using PAPI will
fallback
to
non-transactional behavior.
Shouldn't we have the same default behavior for both types
of
apps -
DSL
and PAPI?

On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <
cado...@apache.org

wrote:

Thanks for the PR, Alex!

I am also glad to see this coming.


1. Configuration

I would also prefer to restrict the configuration of
transactional
on
the state sore. Ideally, calling method transactional()
on
the
state
store would be enough. An option on the store builder
would
make it
possible to turn transactionality on and off (as John
proposed).


2. Memory usage in RocksDB

This seems to be a major issue. We do not have any
guarantee
that
uncommitted writes fit into memory and I guess we will
never
have.
What
happens when the uncommitted writes do not fit into
memory?
Does
RocksDB
throw an exception? Can we handle such an exception
without
crashing?

Does the RocksDB behavior even need to be included in
this
KIP?
In
the
end it is an implementation detail.

What we should consider - though - is a memory limit in
some
form.
And
what we do when the memory limit is exceeded.


3. PoC

I agree with Guozhang that a PoC is a good idea to better
understand
the
devils in the details.


Best,
Bruno

On 25.05.22 01:52, Guozhang Wang wrote:
Hello Alex,

Thanks for writing the proposal! Glad to see it
coming. I
think
this
is
the
kind of a KIP that since too many devils would be
buried
in
the
details
and
it's better to start working on a POC, either in
parallel,
or
before
we
resume our discussion, rather than blocking any
implementation
until
we
are
satisfied with the proposal.

Just as a concrete example, I personally am still not
100%
clear
how
the
proposal would work to achieve EOS with the state
stores.
For
example,
the
commit procedure today looks like this:

0: there's an existing checkpoint file indicating the
changelog
offset
of
the local state store image is 100. Now a commit is
triggered:
1. flush cache (since it contains partially processed
records),
make
sure
all records are written to the producer.
2. flush producer, making sure all changelog records
have
now
acked.
//
here we would get the new changelog position, say 200
3. flush store, make sure all writes are persisted.
4. producer.sendOffsetsToTransaction();
producer.commitTransaction();
//
we
would make the writes in changelog up to offset 200
committed
5. Update the checkpoint file to 200.

The question about atomicity between those lines, for
example:

* If we crash between line 4 and line 5, the local
checkpoint
file
would
stay as 100, and upon recovery we would replay the
changelog
from
100
to
200. This is not ideal but does not violate EOS, since
the
changelogs
are
all overwrites anyways.
* If we crash between line 3 and 4, then at that time
the
local
persistent
store image is representing as of offset 200, but upon
recovery
all
changelog records from 100 to log-end-offset would be
considered
as
aborted
and not be replayed and we would restart processing
from
position
100.
Restart processing will violate EOS.I'm not sure how
e.g.
RocksDB's
WriteBatchWithIndex would make sure that the step 4 and
step 5
could
be
done atomically here.

Originally what I was thinking when creating the JIRA
ticket
is
that
we
need to let the state store to provide a transactional
API
like
"token
commit()" used in step 4) above which returns a token,
that
e.g.
in
our
example above indicates offset 200, and that token
would
be
written
as
part
of the records in Kafka transaction in step 5). And
upon
recovery
the
state
store would have another API like "rollback(token)"
where
the
token
is
read
from the latest committed txn, and be used to rollback
the
store
to
that
committed image. I think your proposal is different,
and
it
seems
like
you're proposing we swap step 3) and 4) above, but the
atomicity
issue
still remains since now you may have the store image at
100
but
the
changelog is committed at 200. I'd like to learn more
about
the
details
on how it resolves such issues.

Anyways, that's just an example to make the point that
there
are
lots
of
implementational details which would drive the public
API
design,
and
we
should probably first do a POC, and come back to
discuss
the
KIP.
Let
me
know what you think?


Guozhang









On Tue, May 24, 2022 at 10:35 AM Sagar <
sagarmeansoc...@gmail.com>
wrote:

Hi Alexander,

Thanks for the KIP! This seems like a great proposal.
I
have
the
same
opinion as John on the Configuration part though. I
think
the 2
level
config and its behaviour based on the
setting/unsetting
of
the
flag
seems
confusing to me as well. Since the KIP seems
specifically
centred
around
RocksDB it might be better to add it at the Supplier
level
as
John
suggested.

On similar lines, this config name =>
*statestore.transactional.mechanism
*may
also need rethinking as the value assigned to
it(rocksdb_indexbatch)
implicitly seems to assume that rocksdb is the only
statestore
that
Kafka
Stream supports while that's not the case.

Also, regarding the potential memory pressure that
can be
introduced
by
WriteBatchIndex, do you think it might make more
sense to
include
some
numbers/benchmarks on how much the memory consumption
might
increase?

Lastly, the read_uncommitted flag's behaviour on IQ
may
need
more
elaboration.

These points aside, as I said, this is a great
proposal!

Thanks!
Sagar.

On Tue, May 24, 2022 at 10:35 PM John Roesler <
vvcep...@apache.org>
wrote:

Thanks for the KIP, Alex!

I'm really happy to see your proposal. This
improvement
fills a
long-standing gap.

I have a few questions:

1. Configuration
The KIP only mentions RocksDB, but of course, Streams
also
ships
with
an
InMemory store, and users also plug in their own
custom
state
stores.
It
is
also common to use multiple types of state stores in
the
same
application
for different purposes.

Against this backdrop, the choice to configure
transactionality
as
a
top-level config, as well as to configure the store
transaction
mechanism
as a top-level config, seems a bit off.

Did you consider instead just adding the option to
the
RocksDB*StoreSupplier classes and the factories in
Stores
?
It
seems
like
the desire to enable the feature by default, but
with a
feature-flag
to
disable it was a factor here. However, as you pointed
out,
there
are
some
major considerations that users should be aware of,
so
opt-in
doesn't
seem
like a bad choice, either. You could add an Enum
argument
to
those
factories like `RocksDBTransactionalMechanism.{NONE,

Some points in favor of this approach:
* Avoid "stores that don't support transactions
ignore
the
config"
complexity
* Users can choose how to spend their memory budget,
making
some
stores
transactional and others not
* When we add transactional support to in-memory
stores,
we
don't
have
to
figure out what to do with the mechanism config
(i.e.,
what
do
you
set
the
mechanism to when there are multiple kinds of
transactional
stores
in
the
topology?)

2. caching/flushing/transactions
The coupling between memory usage and flushing that
you
mentioned
is
a
bit
troubling. It also occurs to me that there seems to
be
some
relationship
with the existing record cache, which is also an
in-memory
holding
area
for
records that are not yet written to the cache and/or
store
(albeit
with
no
particular semantics). Have you considered how all
these
components
should
relate? For example, should a "full" WriteBatch
actually
trigger
a
flush
so
that we don't get OOMEs? If the proposed
transactional
mechanism
forces
all
uncommitted writes to be buffered in memory, until a
commit,
then
what
is
the advantage over just doing the same thing with the
RecordCache
and
not
introducing the WriteBatch at all?

3. ALOS
You mentioned that a transactional store can help
reduce
duplication
in
the case of ALOS. We might want to be careful about
claims
like
that.
Duplication isn't the way that repeated processing
manifests in
state
stores. Rather, it is in the form of dirty reads
during
reprocessing.
This
feature may reduce the incidence of dirty reads
during
reprocessing,
but
not in a predictable way. During regular processing
today,
we
will
send
some records through to the changelog in between
commit
intervals.
Under
ALOS, if any of those dirty writes gets committed to
the
changelog
topic,
then upon failure, we have to roll the store forward
to
them
anyway,
regardless of this new transactional mechanism.
That's a
fixable
problem,
by the way, but this KIP doesn't seem to fix it. I
wonder
if we
should
make
any claims about the relationship of this feature to
ALOS
if
the
real-world
behavior is so complex.

4. IQ
As a reminder, we have a new IQv2 mechanism now.
Should
we
propose
any
changes to IQv1 to support this transactional
mechanism,
versus
just
proposing it for IQv2? Certainly, it seems strange
only
to
propose
a
change
for IQv1 and not v2.

Regarding your proposal for IQv1, I'm unsure what the
behavior
should
be
for readCommitted, since the current behavior also
reads
out of
the
RecordCache. I guess if readCommitted==false, then we
will
continue
to
read
from the cache first, then the Batch, then the store;
and
if
readCommitted==true, we would skip the cache and the
Batch
and
only
read
from the persistent RocksDB store?

What should IQ do if I request to readCommitted on a
non-transactional
store?

Thanks again for proposing the KIP, and my apologies
for
the
long
reply;
I'm hoping to air all my concerns in one "batch" to
save
time
for
you.

Thanks,
-John

On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov
wrote:
Hi all,

I've written a KIP for making Kafka Streams state
stores
transactional
and
would like to start a discussion:













https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores

Best,
Alex







--

[image: Confluent] <https://www.confluent.io>
Suhas Satish
Engineering Manager
Follow us: [image: Blog]
<








https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
[image:
Twitter] <https://twitter.com/ConfluentInc>[image:
LinkedIn]
<https://www.linkedin.com/company/confluent/>

[image: Try Confluent Cloud for Free]
<








https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic





--
-- Guozhang




--
-- Guozhang





--
-- Guozhang






Reply via email to