lucasbru commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1099949313
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,8 +986,23 @@ public void unregister(final Collection<TopicPartition>
revokedChangelogs) {
for (final TopicPartition partition : revokedChangelogs) {
final ChangelogMetadata changelogMetadata =
changelogs.remove(partition);
if (changelogMetadata != null) {
+ // if the changelog is still in REGISTERED, it means it has
not initialized and started
+ // restoring yet, and hence we should not try to remove the
changelog partition
if
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
revokedInitializedChangelogs.add(partition);
+
+ // if the changelog is not in RESTORING, it means
+ // the corresponding onRestoreStart was not called; in
this case
+ // we should not call onRestoreSuspended either
+ if (changelogMetadata.stateManager.taskType() ==
Task.TaskType.ACTIVE &&
+
changelogMetadata.state().equals(ChangelogState.RESTORING)) {
+ try {
+ final String storeName =
changelogMetadata.storeMetadata.store().name();
+ stateRestoreListener.onRestoreSuspended(partition,
storeName, changelogMetadata.totalRestored);
+ } catch (final Exception e) {
+ throw new StreamsException("State restore listener
failed on restore paused", e);
Review Comment:
ack
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]