Repository: kafka
Updated Branches:
  refs/heads/trunk 615cd4fd3 -> 454f6845b


HOTFIX: move restoreConsumer.assign() to shutdownTasksAndState

restoreConsumer.assign(..) in removeStandbyTasks was logging an (ignorable) 
exception due to the restoreConsumer being closed. Moved the 
restoreConsumer.assign(..) to shutdownTasksAndState as this is done prior to 
the closing of consumers.

Author: Damian Guy <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #1986 from dguy/hotfix-assign


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/454f6845
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/454f6845
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/454f6845

Branch: refs/heads/trunk
Commit: 454f6845b34d0e3faabd9427f3cd8bbe6209c30d
Parents: 615cd4f
Author: Damian Guy <[email protected]>
Authored: Fri Oct 7 11:30:00 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Fri Oct 7 11:30:00 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/454f6845/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0667865..c3c6cc1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -297,6 +297,15 @@ public class StreamThread extends Thread {
         producer.flush();
         // Close all task state managers
         closeAllStateManagers(rethrowExceptions);
+        try {
+            // un-assign the change log partitions
+            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        } catch (Exception e) {
+            log.error(String.format("stream-thread [%s] Failed to un-assign 
change log partitions: ", this.getName()), e);
+            if (rethrowExceptions) {
+                throw e;
+            }
+        }
     }
 
     interface AbstractTaskAction {
@@ -758,17 +767,9 @@ public class StreamThread extends Thread {
     }
 
     private void removeStandbyTasks() {
-        try {
-            standbyTasks.clear();
-            standbyTasksByPartition.clear();
-            standbyRecords.clear();
-
-            // un-assign the change log partitions
-            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-
-        } catch (Exception e) {
-            log.error(String.format("stream-thread [%s] Failed to remove 
standby tasks: ", this.getName()), e);
-        }
+        standbyTasks.clear();
+        standbyTasksByPartition.clear();
+        standbyRecords.clear();
     }
 
     private class StreamsMetricsImpl implements StreamsMetrics, 
ThreadCacheMetrics {

Reply via email to