Repository: kafka Updated Branches: refs/heads/trunk ef92bb4e0 -> 1b902b4ed
KAFKA4811; ReplicaFetchThread may fail to create due to existing metric Have fetcherThreadMap keyed off brokerId + fetcherId instead of broker + fetcherId, but did not consider the case where port is changed. Author: huxi <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #2606 from amethystic/kafka4811_ReplicaFetchThread_fail_create Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b902b4e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b902b4e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b902b4e Branch: refs/heads/trunk Commit: 1b902b4ed39e78066fab163d1b6d54dd435b1d7b Parents: ef92bb4 Author: huxi <[email protected]> Authored: Thu Mar 2 09:55:01 2017 -0800 Committer: Jun Rao <[email protected]> Committed: Thu Mar 2 09:55:01 2017 -0800 ---------------------------------------------------------------------- .../kafka/server/AbstractFetcherManager.scala | 29 ++++++++++++++------ .../kafka/server/AbstractFetcherThread.scala | 2 +- 2 files changed, 21 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1b902b4e/core/src/main/scala/kafka/server/AbstractFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 0a17f8e..2b2aa7b 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { // map of (source broker_id, fetcher_id per source broker) => fetcher - private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] + private val fetcherThreadMap = new mutable.HashMap[BrokerIdAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " @@ -75,17 +75,26 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) => BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))} + + def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) { + val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) + fetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread) + fetcherThread.start + } + for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { - var fetcherThread: AbstractFetcherThread = null - fetcherThreadMap.get(brokerAndFetcherId) match { - case Some(f) => fetcherThread = f + val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) + fetcherThreadMap.get(brokerIdAndFetcherId) match { + case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port => + // reuse the fetcher thread + case Some(f) => + f.shutdown() + addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) case None => - fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) - fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) - fetcherThread.start + addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) } - fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) => + fetcherThreadMap(brokerIdAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) => tp -> brokerAndInitOffset.initOffset }) } @@ -105,7 +114,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri def shutdownIdleFetcherThreads() { mapLock synchronized { - val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId] + val keysToBeRemoved = new mutable.HashSet[BrokerIdAndFetcherId] for ((key, fetcher) <- fetcherThreadMap) { if (fetcher.partitionCount <= 0) { fetcher.shutdown() @@ -133,3 +142,5 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int) case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long) + +case class BrokerIdAndFetcherId(brokerId: Int, fetcherId: Int) http://git-wip-us.apache.org/repos/asf/kafka/blob/1b902b4e/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 6462968..0eb3ad8 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -44,7 +44,7 @@ import org.apache.kafka.common.record.MemoryRecords */ abstract class AbstractFetcherThread(name: String, clientId: String, - sourceBroker: BrokerEndPoint, + val sourceBroker: BrokerEndPoint, fetchBackOffMs: Int = 0, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) {
