Repository: kafka
Updated Branches:
  refs/heads/trunk d3f1407f7 -> ceb10c533


KAFKA-5289: handleStopReplica should not send a second response

`shutdownIdleFetcherThreads()` can throw InterruptedException
for example.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>

Closes #3096 from ijuma/kafka-5289-stop-replica-should-not-send-two-responses


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ceb10c53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ceb10c53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ceb10c53

Branch: refs/heads/trunk
Commit: ceb10c53302bd4824737e03ab86ca19e2aa64c7a
Parents: d3f1407
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Mon May 22 13:37:29 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Mon May 22 13:37:29 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ceb10c53/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1346fb3..197298c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -35,7 +35,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinat
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
 import kafka.security.auth._
-import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -195,7 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // request to become a follower due to which cache for groups that 
belong to an offsets topic partition for which broker 1 was the leader,
       // is not cleared.
       result.foreach { case (topicPartition, error) =>
-        if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && 
topicPartition.topic == GROUP_METADATA_TOPIC_NAME) {
+        if (error == Errors.NONE && stopReplicaRequest.deletePartitions && 
topicPartition.topic == GROUP_METADATA_TOPIC_NAME) {
           groupCoordinator.handleGroupEmigration(topicPartition.partition)
         }
       }
@@ -207,7 +207,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
result.asJava))
     }
 
-    replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
+    
CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads())
   }
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {

Reply via email to