Guozhang Wang created KAFKA-12549:
-------------------------------------
Summary: Allow state stores to opt-in transactional support
Key: KAFKA-12549
URL: https://issues.apache.org/jira/browse/KAFKA-12549
Project: Kafka
Issue Type: New Feature
Components: streams
Reporter: Guozhang Wang
Right now Kafka Stream's EOS implementation does not make any assumptions about
the state store's transactional support. Allowing the state stores to
optionally provide transactional support can have multiple benefits. E.g., if
we add some APIs into the {{StateStore}} interface, like {{beginTxn}},
{{commitTxn}} and {{abortTxn}}. Then these APIs can be used under both ALOS and
EOS such that:
* store.beginTxn
* store.put // during processing
* streams commit // either through eos protocol or not
* store.commitTxn
We can have the following benefits:
* Reduce the duplicated records upon crashes for ALOS (note this is not EOS
still, but some middle-ground where uncommitted data within a state store would
not be retained if store.commitTxn failed).
* No need to wipe the state store and re-bootstrap from scratch upon crashes
for EOS. E.g., if a crash-failure happened between streams commit completes and
store.commitTxn. We can instead just roll-forward the transaction by replaying
the changelog from the second recent streams committed offset towards the most
recent committed offset.
* Remote stores that support txn then does not need to support wiping
(https://issues.apache.org/jira/browse/KAFKA-12475).
* We can fix the known issues of emit-on-change
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
* We can support "query committed data only" for interactive queries (see below
for reasons).
As for the implementation of these APIs, there are several options:
* The state store itself have natural transaction features (e.g. RocksDB).
* Use an in-memory buffer for all puts within a transaction, and upon
`commitTxn` write the whole buffer as a batch to the underlying state store, or
just drop the whole buffer upon aborting. Then for interactive queries, one can
optionally only query the underlying store for committed data only.
* Use a separate store as the transient persistent buffer. Upon `beginTxn`
create a new empty transient store, and upon `commitTxn` merge the store into
the underlying store. Same applies for interactive querying committed-only data.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)