Repository: kafka
Updated Branches:
  refs/heads/trunk 49ddc897b -> ff300c9d4


KAFKA-3645; Fix ConsumerGroupCommand and ConsumerOffsetChecker to correctly 
read endpoint info from ZK

The host and port entries under /brokers/ids/<bid> gets filled only for 
PLAINTEXT security protocol. For other protocols the host is null and the 
actual endpoint is under "endpoints". This causes NPE when running the consumer 
group and offset checker scripts in a kerberized env. By always reading the 
host and port values from the "endpoint", a more meaningful exception would be 
thrown rather than a NPE.

Author: Arun Mahadevan <[email protected]>

Reviewers: Sriharsha Chintalapani <[email protected]>, Ismael Juma 
<[email protected]>

Closes #1301 from arunmahadevan/cg_kerb_fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ff300c9d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ff300c9d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ff300c9d

Branch: refs/heads/trunk
Commit: ff300c9d4f45e4a355db11258965c3a3a6f6bbf7
Parents: 49ddc89
Author: Arun Mahadevan <[email protected]>
Authored: Sat Jun 4 09:24:45 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Sat Jun 4 09:24:45 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 20 +++++------------
 .../kafka/tools/ConsumerOffsetChecker.scala     | 23 ++++++--------------
 2 files changed, 12 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ff300c9d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index b086d8f..f0c817f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.BrokerNotAvailableException
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
@@ -277,20 +277,10 @@ object ConsumerGroupCommand {
 
     private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
       try {
-        zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 
match {
-          case Some(brokerInfoString) =>
-            Json.parseFull(brokerInfoString) match {
-              case Some(m) =>
-                val brokerInfo = m.asInstanceOf[Map[String, Any]]
-                val host = brokerInfo.get("host").get.asInstanceOf[String]
-                val port = brokerInfo.get("port").get.asInstanceOf[Int]
-                Some(new SimpleConsumer(host, port, 10000, 100000, 
"ConsumerGroupCommand"))
-              case None =>
-                throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(brokerId))
-            }
-          case None =>
-            throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(brokerId))
-        }
+        zkUtils.getBrokerInfo(brokerId)
+          .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+          .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 
10000, 100000, "ConsumerGroupCommand"))
+          .orElse(throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(brokerId)))
       } catch {
         case t: Throwable =>
           println("Could not parse broker info due to " + t.getMessage)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ff300c9d/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 5c01f34..8f86f66 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -21,11 +21,12 @@ package kafka.tools
 import joptsimple._
 import kafka.utils._
 import kafka.consumer.SimpleConsumer
-import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
+import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest}
 import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
 import org.apache.kafka.common.errors.BrokerNotAvailableException
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.security.JaasUtils
+
 import scala.collection._
 import kafka.client.ClientUtils
 import kafka.network.BlockingChannel
@@ -40,20 +41,10 @@ object ConsumerOffsetChecker extends Logging {
 
   private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] 
= {
     try {
-      zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
-        case Some(brokerInfoString) =>
-          Json.parseFull(brokerInfoString) match {
-            case Some(m) =>
-              val brokerInfo = m.asInstanceOf[Map[String, Any]]
-              val host = brokerInfo.get("host").get.asInstanceOf[String]
-              val port = brokerInfo.get("port").get.asInstanceOf[Int]
-              Some(new SimpleConsumer(host, port, 10000, 100000, 
"ConsumerOffsetChecker"))
-            case None =>
-              throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(bid))
-          }
-        case None =>
-          throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(bid))
-      }
+      zkUtils.getBrokerInfo(bid)
+        .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+        .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 
10000, 100000, "ConsumerOffsetChecker"))
+        .orElse(throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(bid)))
     } catch {
       case t: Throwable =>
         println("Could not parse broker info due to " + t.getCause)

Reply via email to