guozhangwang commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097749095
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -986,8 +986,23 @@ public void unregister(final Collection<TopicPartition>
revokedChangelogs) {
for (final TopicPartition partition : revokedChangelogs) {
final ChangelogMetadata changelogMetadata =
changelogs.remove(partition);
if (changelogMetadata != null) {
+ // if the changelog is still in REGISTERED, it means it has
not initialized and started
+ // restoring yet, and hence we should not try to remove the
changelog partition
if
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
revokedInitializedChangelogs.add(partition);
+
+ // if the changelog is not in RESTORING, it means
+ // the corresponding onRestoreStart was not called; in
this case
+ // we should not call onRestoreSuspended either
+ if (changelogMetadata.stateManager.taskType() ==
Task.TaskType.ACTIVE &&
+
changelogMetadata.state().equals(ChangelogState.RESTORING)) {
+ try {
+ final String storeName =
changelogMetadata.storeMetadata.store().name();
+ stateRestoreListener.onRestoreSuspended(partition,
storeName, changelogMetadata.totalRestored);
+ } catch (final Exception e) {
+ throw new StreamsException("State restore listener
failed on restore paused", e);
Review Comment:
The exception returned from user instantiated functions is arbitrary while
the exception channels we are sending from restore thread to the main thread
currently is only expecting internal ones. So I think we either need to allow
arbitrary exceptions to be sent to the main thread via the queue as well, or do
that. Personally I'm in favor of this approach to avoid a `catch all` clause
that may hide any lurking exception handling bugs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]