[ 
https://issues.apache.org/jira/browse/KAFKA-8066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8066.
------------------------------------
       Resolution: Fixed
    Fix Version/s: 2.2.1
                   2.1.2
                   2.0.2

> ReplicaFetcherThread fails to startup because of failing to register the 
> metric.
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-8066
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8066
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 
> 2.0.2, 2.1.2
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Major
>             Fix For: 2.0.2, 2.1.2, 2.2.1
>
>
> After KAFKA-6051, we close leaderEndPoint in replica fetcher thread 
> initiateShutdown to try to preempt in-progress fetch request and accelerate 
> repica fetcher thread shutdown. However, the selector may fail to close the 
> channel and throw an Exception when the replica fetcher thread is still 
> actively fetching. In this case, the sensor will not be cleaned up. 
> Basically, if `close(id)` throws an exception in `Selector.close()`, then 
> `sensors.close()` will not be called and thus the sensors will not get 
> unregistered (See codes below).
> {code:java}
>     public void close() {
>         List<String> connections = new ArrayList<>(channels.keySet());
>         for (String id : connections)
>             close(id);
>         try {
>             this.nioSelector.close();
>         } catch (IOException | SecurityException e) {
>             log.error("Exception closing nioSelector:", e);
>         }
>         sensors.close();
>         channelBuilder.close();
>     }
> {code}
> If this happen, when the broker want to start up the ReplicaFetcherThread 
> with the same fetch id to the same destination broker again (e.g. due to 
> leadership changes or new partitions get created), the ReplicaFetcherThread 
> will fail to start up because the selector will throw an 
> IllegalArgumentException if the metric with the same name already exists:
> {noformat}
> 2019/02/27 10:24:26.938 ERROR [KafkaApis] [kafka-request-handler-6] 
> [kafka-server] [] [KafkaApi-38031] Error when handling request {}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-count, group=replica-fetcher-metrics, description=The 
> current number of active connections., tags={broker-id=29712, fetcher-id=3}]' 
> already exists, can't register another one.
>         at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:559) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:502) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:485) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:470) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:963)
>  ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:170) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:188) 
> ~[kafka-clients-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherBlockingSend.<init>(ReplicaFetcherBlockingSend.scala:61)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread$$anonfun$1.apply(ReplicaFetcherThread.scala:68)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at scala.Option.getOrElse(Option.scala:121) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.ReplicaFetcherThread.<init>(ReplicaFetcherThread.scala:67) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaFetcherManager.createFetcherThread(ReplicaFetcherManager.scala:32)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.AbstractFetcherManager.kafka$server$AbstractFetcherManager$$addAndStartFetcherThread$1(AbstractFetcherManager.scala:132)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:146)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:137)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) 
> ~[scala-library-2.11.12.jar:?]
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
>         at 
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:137)
>  ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1333) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1107) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:194) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:110) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) 
> ~[kafka_2.11-2.0.0.66.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> {noformat}
> The fix should be adding a try-finally block for selector.close() to make 
> sure sensors.close() will be called even an exception is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to