Repository: kafka Updated Branches: refs/heads/trunk d3f1407f7 -> ceb10c533
KAFKA-5289: handleStopReplica should not send a second response `shutdownIdleFetcherThreads()` can throw InterruptedException for example. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3096 from ijuma/kafka-5289-stop-replica-should-not-send-two-responses Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ceb10c53 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ceb10c53 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ceb10c53 Branch: refs/heads/trunk Commit: ceb10c53302bd4824737e03ab86ca19e2aa64c7a Parents: d3f1407 Author: Ismael Juma <ism...@juma.me.uk> Authored: Mon May 22 13:37:29 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Mon May 22 13:37:29 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ceb10c53/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1346fb3..197298c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -35,7 +35,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.{RequestChannel, RequestOrResponseSend} import kafka.security.auth._ -import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils} +import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} @@ -195,7 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel, // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader, // is not cleared. result.foreach { case (topicPartition, error) => - if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) { + if (error == Errors.NONE && stopReplicaRequest.deletePartitions && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) { groupCoordinator.handleGroupEmigration(topicPartition.partition) } } @@ -207,7 +207,7 @@ class KafkaApis(val requestChannel: RequestChannel, new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)) } - replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() + CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()) } def handleUpdateMetadataRequest(request: RequestChannel.Request) {