sanghyeok An created KAFKA-19935:
------------------------------------
Summary: Introduce TopologyValidator Utils for improving topology
evolution comptiability.
Key: KAFKA-19935
URL: https://issues.apache.org/jira/browse/KAFKA-19935
Project: Kafka
Issue Type: Improvement
Components: streams, streams-test-utils
Reporter: sanghyeok An
Assignee: sanghyeok An
If a developer does not explicitly specify a name for a {{{}StateStore{}}}, its
name is generated using an incremental number. Consequently, the corresponding
Changelog topic is created using this same generated name.
Let's assume a scenario where the application evolves and a new Source Node is
added. Since the new Source Node is typically built at the beginning of the
topology, the incremental numbers for all subsequent nodes shift by 1. As a
result, the names of the {{{}StateStore{}}}s change as well.
For example, the change might look like this:
* *Previous:* {{MY-APP-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog}}
* *New:* {{MY-APP-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog}}
This shift can lead to significant operational issues. Simply replaying the
Source Topic is often insufficient to make the new {{StateStore}} identical to
the previous one.
There are several scenarios where this is impossible. For instance, if the
application only processes messages after the last commit(most case) and the
data includes keys that appear infrequently, the state will never match.
(Consider a restaurant with very few orders compared to one with many; the
"sparse" orders might have existed in the previous StateStore but will likely
be missing in the new one after the shift).
While this might be dismissed as a minor issue by some, it can be a critical
problem for organizations where data consistency is paramount.
A clear solution to this problem is using {*}Named StateStores{*}. However,
some users may not be aware of this feature or may not feel the need for it.
Furthermore, they might not realize that the IDs of the StateStore and
Changelog topic have incremented, leading to the unintentional creation of a
new store and potential data loss.
To address this, I propose introducing a {{TopologyValidator}} as a utility
class. Ideally, the usage would look something like this:
{code:java}
TopologyValidator.of(prevTopology, newTopology).diff(); {code}
* When {{.diff()}} is called, the {{TopologyValidator}} would identify
changes in the topology and issue a warning if {{StateStore}} IDs have shifted.
* The {{TopologyValidator}} could be included in the Kafka Streams application
code or integrated into CI pipelines. Since it essentially requires only the
code to build the topology, it does not need a connection to an actual broker.
Therefore, we can verify the diff using just the previous and new topology
definitions.
By introducing {{TopologyValidator}} under the {{streams/test-utils}} module
and encouraging users to utilize it, I believe we can promote greater stability
from the perspective of {*}Topology Evolution Compatibility{*}.
Please give your opinion.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)