Repository: kafka Updated Branches: refs/heads/0.10.1 e409d8afd -> d6b54da3e
HOTFIX: move restoreConsumer.assign() to shutdownTasksAndState restoreConsumer.assign(..) in removeStandbyTasks was logging an (ignorable) exception due to the restoreConsumer being closed. Moved the restoreConsumer.assign(..) to shutdownTasksAndState as this is done prior to the closing of consumers. Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1986 from dguy/hotfix-assign (cherry picked from commit 454f6845b34d0e3faabd9427f3cd8bbe6209c30d) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6b54da3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6b54da3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6b54da3 Branch: refs/heads/0.10.1 Commit: d6b54da3e433da5a650e397d24db59c7bc05d6e4 Parents: e409d8a Author: Damian Guy <[email protected]> Authored: Fri Oct 7 11:30:00 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Oct 7 11:30:10 2016 -0700 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 23 ++++++++++---------- 1 file changed, 12 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d6b54da3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 0667865..c3c6cc1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -297,6 +297,15 @@ public class StreamThread extends Thread { producer.flush(); // Close all task state managers closeAllStateManagers(rethrowExceptions); + try { + // un-assign the change log partitions + restoreConsumer.assign(Collections.<TopicPartition>emptyList()); + } catch (Exception e) { + log.error(String.format("stream-thread [%s] Failed to un-assign change log partitions: ", this.getName()), e); + if (rethrowExceptions) { + throw e; + } + } } interface AbstractTaskAction { @@ -758,17 +767,9 @@ public class StreamThread extends Thread { } private void removeStandbyTasks() { - try { - standbyTasks.clear(); - standbyTasksByPartition.clear(); - standbyRecords.clear(); - - // un-assign the change log partitions - restoreConsumer.assign(Collections.<TopicPartition>emptyList()); - - } catch (Exception e) { - log.error(String.format("stream-thread [%s] Failed to remove standby tasks: ", this.getName()), e); - } + standbyTasks.clear(); + standbyTasksByPartition.clear(); + standbyRecords.clear(); } private class StreamsMetricsImpl implements StreamsMetrics, ThreadCacheMetrics {
