cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971662847
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -444,6 +451,8 @@ public void restore(final Map<TaskId, Task> tasks) {
final Set<TaskId> corruptedTasks = new HashSet<>();
e.partitions().forEach(partition ->
corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
throw new TaskCorruptedException(corruptedTasks, e);
+ } catch (final InterruptException interruptException) {
+ throw interruptException;
Review Comment:
Yes, it will. However, `InterruptException` is a `KafkaException`, thus it
would be caught the `catch`-clause below and rethrown as a `StreamsException`.
I thought it would be easier to directly catch an `InterruptException` in
`DefaultStateUpdater` instead of catching a `StreamsException`, unwrap it,
verify if it is an `InterruptException`, and if not rethrow it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]