guozhangwang commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r972283623
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -209,7 +212,9 @@ void registerStateStores(final List<StateStore> allStores,
final InternalProcess
processorContext.uninitialize();
for (final StateStore store : allStores) {
if (stores.containsKey(store.name())) {
- maybeRegisterStoreWithChangelogReader(store.name());
+ if (!stateUpdaterEnabled) {
+ maybeRegisterStoreWithChangelogReader(store.name());
Review Comment:
Another case for this is when we handle EOS task corruption due to no
checkpoint file detected: in that case we would remove the corrupted task's
changelogs, and re-initialize them. In this case the stores would be
initialized but changelog readers not registered as well.
In the future we can decouple the registration of state stores and the
`register(final TopicPartition partition, final ProcessorStateManager
stateManager)` as long as in the latter case, we are sure that the
`stateManager`'s stores map are already populated, which should always be true
when the task is already in stateUpdater.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]