Repository: kafka Updated Branches: refs/heads/0.11.0 ef88ee504 -> e08e76b71
KAFKA-5289: handleStopReplica should not send a second response `shutdownIdleFetcherThreads()` can throw InterruptedException for example. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3096 from ijuma/kafka-5289-stop-replica-should-not-send-two-responses (cherry picked from commit ceb10c53302bd4824737e03ab86ca19e2aa64c7a) Signed-off-by: Rajini Sivaram <rajinisiva...@googlemail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e08e76b7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e08e76b7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e08e76b7 Branch: refs/heads/0.11.0 Commit: e08e76b7166f5690cafdc8d5e52cd81746408a37 Parents: ef88ee5 Author: Ismael Juma <ism...@juma.me.uk> Authored: Mon May 22 13:37:29 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Mon May 22 13:39:12 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e08e76b7/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1346fb3..197298c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -35,7 +35,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.{RequestChannel, RequestOrResponseSend} import kafka.security.auth._ -import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils} +import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} @@ -195,7 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel, // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader, // is not cleared. result.foreach { case (topicPartition, error) => - if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) { + if (error == Errors.NONE && stopReplicaRequest.deletePartitions && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) { groupCoordinator.handleGroupEmigration(topicPartition.partition) } } @@ -207,7 +207,7 @@ class KafkaApis(val requestChannel: RequestChannel, new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) } - replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() + CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()) } def handleUpdateMetadataRequest(request: RequestChannel.Request) {