kafka-900; ClosedByInterruptException when high-level consumer shutdown 
normally; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/312ed2e6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/312ed2e6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/312ed2e6

Branch: refs/heads/trunk
Commit: 312ed2e67a0bca194ee3012c61239d30d8890566
Parents: 85c7159
Author: Jun Rao <jun...@gmail.com>
Authored: Wed May 29 09:56:27 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed May 29 09:56:27 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/312ed2e6/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 162c749..48100f4 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -152,9 +152,11 @@ abstract class AbstractFetcherThread(name: String, 
clientId: String, sourceBroke
                       partitionsWithError += topicAndPartition
                   }
                 case _ =>
-                  warn("error for partition [%s,%d] to broker 
%d".format(topic, partitionId, sourceBroker.id),
-                    ErrorMapping.exceptionFor(partitionData.error))
-                  partitionsWithError += topicAndPartition
+                  if (isRunning.get) {
+                    warn("error for partition [%s,%d] to broker 
%d".format(topic, partitionId, sourceBroker.id),
+                      ErrorMapping.exceptionFor(partitionData.error))
+                    partitionsWithError += topicAndPartition
+                  }
               }
             }
         }

Reply via email to