guozhangwang commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097749095


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,8 +986,23 @@ public void unregister(final Collection<TopicPartition> 
revokedChangelogs) {
         for (final TopicPartition partition : revokedChangelogs) {
             final ChangelogMetadata changelogMetadata = 
changelogs.remove(partition);
             if (changelogMetadata != null) {
+                // if the changelog is still in REGISTERED, it means it has 
not initialized and started
+                // restoring yet, and hence we should not try to remove the 
changelog partition
                 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
                     revokedInitializedChangelogs.add(partition);
+
+                    // if the changelog is not in RESTORING, it means
+                    // the corresponding onRestoreStart was not called; in 
this case
+                    // we should not call onRestoreSuspended either
+                    if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE &&
+                        
changelogMetadata.state().equals(ChangelogState.RESTORING)) {
+                        try {
+                            final String storeName = 
changelogMetadata.storeMetadata.store().name();
+                            stateRestoreListener.onRestoreSuspended(partition, 
storeName, changelogMetadata.totalRestored);
+                        } catch (final Exception e) {
+                            throw new StreamsException("State restore listener 
failed on restore paused", e);

Review Comment:
   The exception returned from user instantiated functions is arbitrary while 
the exception channels we are sending from restore thread to the main thread 
currently is only expecting internal ones. So I think we either need to allow 
arbitrary exceptions to be sent to the main thread via the queue as well, or do 
that. Personally I'm in favor of this approach to avoid a `catch all` clause 
that may hide any lurking exception handling bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to