This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 9b63e8a  KAFKA-6501; Dynamic broker config tests updates and metrics 
fix (#4539)
9b63e8a is described below

commit 9b63e8af2f1fe2bca60170fc11e9ae37e8fa595a
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Thu Feb 8 16:22:51 2018 -0800

    KAFKA-6501; Dynamic broker config tests updates and metrics fix (#4539)
    
    1. Handle listener-not-found in MetadataCache since this can occur when 
listeners are being updated. To avoid breaking clients, this is handled in the 
same way as broker-not-available so that clients may retry.
    2. Set retries=1000 for listener reconfiguration tests to avoid transient 
failures when metadata cache has not been updated
    3. Remove IdlePercent metric when Processor is deleted, add test
    4. Reduce log segment size used during reconfiguration to avoid timeout 
while waiting for log rolling
    5.Test markPartitionsForTruncation after fetcher thread resize
    6. Move per-processor ResponseQueueSize metric back to RequestChannel.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson 
<ja...@confluent.io>
---
 .../main/scala/kafka/network/RequestChannel.scala  |  18 +++-
 .../main/scala/kafka/network/SocketServer.scala    |  24 ++---
 .../kafka/server/AbstractFetcherManager.scala      |   3 +-
 .../main/scala/kafka/server/MetadataCache.scala    |  14 ++-
 .../server/DynamicBrokerReconfigurationTest.scala  | 117 ++++++++++++++++-----
 .../unit/kafka/server/MetadataCacheTest.scala      |  13 +--
 6 files changed, 134 insertions(+), 55 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 144632c..8a17528 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,7 +24,6 @@ import java.util.concurrent._
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, 
NoOpAction, CloseConnectionAction}
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.Send
@@ -40,6 +39,10 @@ import scala.reflect.ClassTag
 object RequestChannel extends Logging {
   private val requestLogger = Logger("kafka.request.logger")
 
+  val RequestQueueSizeMetric = "RequestQueueSize"
+  val ResponseQueueSizeMetric = "ResponseQueueSize"
+  val ProcessorMetricTag = "processor"
+
   def isRequestLoggingEnabled: Boolean = 
requestLogger.underlying.isDebugEnabled
 
   sealed trait BaseRequest
@@ -241,15 +244,16 @@ object RequestChannel extends Logging {
 }
 
 class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+  import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
 
-  newGauge("RequestQueueSize", new Gauge[Int] {
+  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
       def value = requestQueue.size
   })
 
-  newGauge("ResponseQueueSize", new Gauge[Int]{
+  newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
     def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
       total + processor.responseQueueSize
     }
@@ -258,10 +262,18 @@ class RequestChannel(val queueSize: Int) extends 
KafkaMetricsGroup {
   def addProcessor(processor: Processor): Unit = {
     if (processors.putIfAbsent(processor.id, processor) != null)
       warn(s"Unexpected processor with processorId ${processor.id}")
+
+    newGauge(ResponseQueueSizeMetric,
+      new Gauge[Int] {
+        def value = processor.responseQueueSize
+      },
+      Map(ProcessorMetricTag -> processor.id.toString)
+    )
   }
 
   def removeProcessor(processorId: Int): Unit = {
     processors.remove(processorId)
+    removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> 
processorId.toString))
   }
 
   /** Send a request to be handled, potentially blocking until there is room 
in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index fef412b..d37b523 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -433,6 +433,12 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
 }
 
+private[kafka] object Processor {
+  val IdlePercentMetricName = "IdlePercent"
+  val NetworkProcessorMetricTag = "networkProcessor"
+  val ListenerMetricTag = "listener"
+}
+
 /**
  * Thread that processes all requests from a single connection. There are N of 
these running in parallel
  * each of which has its own selector
@@ -451,6 +457,7 @@ private[kafka] class Processor(val id: Int,
                                memoryPool: MemoryPool,
                                logContext: LogContext) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
+  import Processor._
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
       case Array(local, remote, index) => 
BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
@@ -471,18 +478,11 @@ private[kafka] class Processor(val id: Int,
   private val responseQueue = new 
LinkedBlockingDeque[RequestChannel.Response]()
 
   private[kafka] val metricTags = mutable.LinkedHashMap(
-    "listener" -> listenerName.value,
-    "networkProcessor" -> id.toString
+    ListenerMetricTag -> listenerName.value,
+    NetworkProcessorMetricTag -> id.toString
   ).asJava
 
-  newGauge("ResponseQueueSize",
-    new Gauge[Int] {
-      def value = responseQueue.size()
-    },
-    Map("processor" -> id.toString)
-  )
-
-  newGauge("IdlePercent",
+  newGauge(IdlePercentMetricName,
     new Gauge[Double] {
       def value = {
         Option(metrics.metric(metrics.metricName("io-wait-ratio", 
"socket-server-metrics", metricTags))).fold(0.0)(_.value)
@@ -490,7 +490,7 @@ private[kafka] class Processor(val id: Int,
     },
     // for compatibility, only add a networkProcessor tag to the Yammer 
Metrics alias (the equivalent Selector metric
     // also includes the listener name)
-    Map("networkProcessor" -> id.toString)
+    Map(NetworkProcessorMetricTag -> id.toString)
   )
 
   private val selector = createSelector(
@@ -742,6 +742,7 @@ private[kafka] class Processor(val id: Int,
       close(channel.id)
     }
     selector.close()
+    removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> 
id.toString))
   }
 
   // 'protected` to allow override for testing
@@ -792,7 +793,6 @@ private[kafka] class Processor(val id: Int,
 
   override def shutdown(): Unit = {
     super.shutdown()
-    removeMetric("ResponseQueueSize", Map("processor" -> id.toString))
     removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
   }
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 312123c..aa08585 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -91,7 +91,8 @@ abstract class AbstractFetcherManager(protected val name: 
String, clientId: Stri
     }
   }
 
-  private def getFetcherId(topic: String, partitionId: Int) : Int = {
+  // Visibility for testing
+  private[server] def getFetcherId(topic: String, partitionId: Int) : Int = {
     lock synchronized {
       Utils.abs(31 * topic.hashCode() + partitionId) % numFetchersPerBroker
     }
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index b4a015d..eb2d835 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -103,12 +103,11 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): 
Option[Node] =
+  private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): 
Option[Node] =
     inReadLock(partitionMetadataLock) {
-      aliveNodes.get(brokerId).map { nodeMap =>
-        nodeMap.getOrElse(listenerName,
-          throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` 
does not have listener with name `$listenerName`"))
-      }
+      // Returns None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+      // Since listeners can be added dynamically, a broker with a missing 
listener could be a transient error.
+      aliveNodes.get(brokerId).flatMap(_.get(listenerName))
     }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
@@ -203,6 +202,11 @@ class MetadataCache(brokerId: Int) extends Logging {
         aliveBrokers(broker.id) = Broker(broker.id, endPoints, 
Option(broker.rack))
         aliveNodes(broker.id) = nodes.asScala
       }
+      aliveNodes.get(brokerId).foreach { listenerMap =>
+        val listeners = listenerMap.keySet
+        if (!aliveNodes.values.forall(_.keySet == listeners))
+          error(s"Listeners are not identical across brokers: $aliveNodes")
+      }
 
       val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) 
=>
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index cb2ac52..4cdc989 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,11 +26,14 @@ import java.util.{Collections, Properties}
 import java.util.concurrent._
 import javax.management.ObjectName
 
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.coordinator.group.OffsetConfig
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
+import kafka.network.{Processor, RequestChannel}
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
@@ -92,6 +95,8 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism)))
     super.setUp()
 
+    clearLeftOverProcessorMetrics() // clear metrics left over from other 
tests so that new ones can be tested
+
     (0 until numServers).foreach { brokerId =>
 
       val props = TestUtils.createBrokerConfig(brokerId, zkConnect, 
trustStoreFile = Some(trustStoreFile1))
@@ -102,10 +107,11 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
       props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslEnabledMechanismsProp, 
kafkaServerSaslMechanisms.mkString(","))
-      props.put(KafkaConfig.LogSegmentBytesProp, "2000")
-      props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000")
+      props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test 
log rolling on config update
+      props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one 
to test reducing threads
+      props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, 
"10000000") // non-default value to trigger a new metric
       props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
-      props.put(KafkaConfig.PasswordEncoderOldSecretProp, 
"old-dynamic-config-secret")
+      props.put(KafkaConfig.PasswordEncoderOldSecretProp, 
"old-dynamic-config-secret") // for testing secret rotation
 
       props ++= sslProperties1
       addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -261,9 +267,10 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
     reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerThreadsProp, "2"))
 
-    // Verify cleaner config was updated
+    // Verify cleaner config was updated. Wait for one of the configs to be 
updated and verify
+    // that all other others were updated at the same time since they are 
reconfigured together
     val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
-    assertEquals(2, newCleanerConfig.numThreads)
+    TestUtils.waitUntilTrue(() => newCleanerConfig.numThreads == 2, "Log 
cleaner not reconfigured")
     assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
     assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
     assertEquals(300000, newCleanerConfig.ioBufferSize)
@@ -291,7 +298,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     val (producerThread, consumerThread) = startProduceConsume(retries = 0)
 
     val props = new Properties
-    props.put(KafkaConfig.LogSegmentBytesProp, "10000")
+    props.put(KafkaConfig.LogSegmentBytesProp, "4000")
     props.put(KafkaConfig.LogRollTimeMillisProp, 
TimeUnit.HOURS.toMillis(2).toString)
     props.put(KafkaConfig.LogRollTimeJitterMillisProp, 
TimeUnit.HOURS.toMillis(1).toString)
     props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "100000")
@@ -312,7 +319,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     props.put(KafkaConfig.LogPreAllocateProp, true.toString)
     props.put(KafkaConfig.LogMessageTimestampTypeProp, 
TimestampType.LOG_APPEND_TIME.toString)
     props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
-    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogSegmentBytesProp, "10000"))
+    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogSegmentBytesProp, "4000"))
 
     // Verify that all broker defaults have been updated
     servers.foreach { server =>
@@ -325,7 +332,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     val newLogConfig = 
LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config))
     assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig)
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 
0)).getOrElse(throw new IllegalStateException("Log not found"))
-    TestUtils.waitUntilTrue(() => log.config.segmentSize == 10000, "Existing 
topic config using defaults not updated")
+    TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing 
topic config using defaults not updated")
     props.asScala.foreach { case (k, v) =>
       val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
       val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" 
else v
@@ -335,7 +342,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     consumerThread.waitForMatchingRecords(record => record.timestampType == 
TimestampType.LOG_APPEND_TIME)
 
     // Verify that the new config is actually used for new segments of 
existing logs
-    TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 9000), "Log 
segment size increase not applied")
+    TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 3000), "Log 
segment size increase not applied")
 
     // Verify that overridden topic configs are not updated when broker 
default is updated
     val log2 = servers.head.logManager.getLog(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
@@ -383,19 +390,20 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     // For others, thread count should be configuredCount * threadMultiplier * 
numBrokers
     val threadMultiplier = Map(
       requestHandlerPrefix -> 1,
-      networkThreadPrefix ->  2, // 2 endpoints
+      networkThreadPrefix -> 2, // 2 endpoints
       fetcherThreadPrefix -> (servers.size - 1)
     )
 
     // Tolerate threads left over from previous tests
-    def leftOverThreadCount(prefix: String, perBrokerCount: Int) : Int = {
+    def leftOverThreadCount(prefix: String, perBrokerCount: Int): Int = {
       val count = matchingThreads(prefix).size - perBrokerCount * servers.size 
* threadMultiplier(prefix)
       if (count > 0) count else 0
     }
+
     val leftOverThreads = Map(
       requestHandlerPrefix -> leftOverThreadCount(requestHandlerPrefix, 
servers.head.config.numIoThreads),
-      networkThreadPrefix ->  leftOverThreadCount(networkThreadPrefix, 
servers.head.config.numNetworkThreads),
-      fetcherThreadPrefix ->  leftOverThreadCount(fetcherThreadPrefix, 
servers.head.config.numReplicaFetchers)
+      networkThreadPrefix -> leftOverThreadCount(networkThreadPrefix, 
servers.head.config.numNetworkThreads),
+      fetcherThreadPrefix -> leftOverThreadCount(fetcherThreadPrefix, 
servers.head.config.numReplicaFetchers)
     )
 
     def maybeVerifyThreadPoolSize(propName: String, size: Int, threadPrefix: 
String): Unit = {
@@ -404,21 +412,26 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       if (expectedCountPerBroker > 0)
         verifyThreads(threadPrefix, expectedCountPerBroker, ignoreCount)
     }
+
     def reducePoolSize(propName: String, currentSize: => Int, threadPrefix: 
String): Int = {
       val newSize = if (currentSize / 2 == 0) 1 else currentSize / 2
       resizeThreadPool(propName, newSize, threadPrefix)
       newSize
     }
+
     def increasePoolSize(propName: String, currentSize: => Int, threadPrefix: 
String): Int = {
-      resizeThreadPool(propName, currentSize * 2, threadPrefix)
-      currentSize * 2
+      val newSize = currentSize * 2 - 1
+      resizeThreadPool(propName, newSize, threadPrefix)
+      newSize
     }
+
     def resizeThreadPool(propName: String, newSize: Int, threadPrefix: 
String): Unit = {
       val props = new Properties
       props.put(propName, newSize.toString)
       reconfigureServers(props, perBrokerConfig = false, (propName, 
newSize.toString))
       maybeVerifyThreadPoolSize(propName, newSize, threadPrefix)
     }
+
     def verifyThreadPoolResize(propName: String, currentSize: => Int, 
threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = {
       maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
       val numRetries = if (mayReceiveDuplicates) 100 else 0
@@ -444,6 +457,57 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
       "", mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, 
config.numNetworkThreads,
       networkThreadPrefix, mayReceiveDuplicates = true)
+
+    verifyProcessorMetrics()
+    verifyMarkPartitionsForTruncation()
+  }
+
+  private def isProcessorMetric(metricName: MetricName): Boolean = {
+    val mbeanName = metricName.getMBeanName
+    mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || 
mbeanName.contains(s"${RequestChannel.ProcessorMetricTag}=")
+  }
+
+  private def clearLeftOverProcessorMetrics(): Unit = {
+    val metricsFromOldTests = 
Metrics.defaultRegistry.allMetrics.keySet.asScala.filter(isProcessorMetric)
+    metricsFromOldTests.foreach(Metrics.defaultRegistry.removeMetric)
+  }
+
+  // Verify that metrics from processors that were removed have been deleted.
+  // Since processor ids are not reused, it is sufficient to check metrics 
count
+  // based on the current number of processors
+  private def verifyProcessorMetrics(): Unit = {
+    val numProcessors = servers.head.config.numNetworkThreads * 2 // 2 
listeners
+
+    val kafkaMetrics = servers.head.metrics.metrics().keySet.asScala
+      .filter(_.tags.containsKey(Processor.NetworkProcessorMetricTag))
+      .groupBy(_.tags.get(Processor.NetworkProcessorMetricTag))
+    assertEquals(numProcessors, kafkaMetrics.size)
+
+    Metrics.defaultRegistry.allMetrics.keySet.asScala
+      .filter(isProcessorMetric)
+      .groupBy(_.getName)
+      .foreach { case (name, set) => assertEquals(s"Metrics not deleted 
$name", numProcessors, set.size) }
+  }
+
+  // Verify that replicaFetcherManager.markPartitionsForTruncation uses the 
current fetcher thread size
+  // to obtain partition assignment
+  private def verifyMarkPartitionsForTruncation(): Unit = {
+    val leaderId = 0
+    val partitions = (0 until numPartitions).map(i => new 
TopicPartition(topic, i)).filter { tp =>
+      zkClient.getLeaderForPartition(tp) == Some(leaderId)
+    }
+    assertTrue(s"Partitons not found with leader $leaderId", 
partitions.nonEmpty)
+    partitions.foreach { tp =>
+      (1 to 2).foreach { i =>
+        val replicaFetcherManager = 
servers(i).replicaManager.replicaFetcherManager
+        val truncationOffset = tp.partition
+        replicaFetcherManager.markPartitionsForTruncation(leaderId, tp, 
truncationOffset)
+        val fetcherThreads = 
replicaFetcherManager.fetcherThreadMap.filter(_._2.partitionStates.contains(tp))
+        assertEquals(1, fetcherThreads.size)
+        assertEquals(replicaFetcherManager.getFetcherId(tp.topic, 
tp.partition), fetcherThreads.head._1.fetcherId)
+        assertEquals(truncationOffset, 
fetcherThreads.head._2.partitionStates.stateValue(tp).fetchOffset)
+      }
+    }
   }
 
   @Test
@@ -672,7 +736,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
   private def verifyRemoveListener(listenerName: String, securityProtocol: 
SecurityProtocol,
                                    saslMechanisms: Seq[String]): Unit = {
     val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
-    val producer1 = createProducer(listenerName, securityProtocol, 
saslMechanism)
+    val producer1 = createProducer(listenerName, securityProtocol, 
saslMechanism, retries = 1000)
     val consumer1 = createConsumer(listenerName, securityProtocol, 
saslMechanism,
       s"remove-listener-group-$securityProtocol")
     verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
@@ -716,7 +780,8 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 
   private def verifyListener(securityProtocol: SecurityProtocol, 
saslMechanism: Option[String]): Unit = {
     val mechanism = saslMechanism.getOrElse("")
-    val producer = createProducer(securityProtocol.name, securityProtocol, 
mechanism)
+    val retries = 1000 // since it may take time for metadata to be updated on 
all brokers
+    val producer = createProducer(securityProtocol.name, securityProtocol, 
mechanism, retries)
     val consumer = createConsumer(securityProtocol.name, securityProtocol, 
mechanism,
       s"add-listener-group-$securityProtocol-$mechanism")
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
@@ -785,11 +850,13 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     props
   }
 
-  private def createProducer(listenerName: String, securityProtocol: 
SecurityProtocol,
-                             saslMechanism: String): KafkaProducer[String, 
String] = {
+  private def createProducer(listenerName: String,
+                             securityProtocol: SecurityProtocol,
+                             saslMechanism: String,
+                             retries: Int): KafkaProducer[String, String] = {
     val bootstrapServers =  TestUtils.bootstrapServers(servers, new 
ListenerName(listenerName))
     val producer = TestUtils.createNewProducer(bootstrapServers,
-      acks = -1, retries = 0,
+      acks = -1, retries = retries,
       securityProtocol = securityProtocol,
       keySerializer = new StringSerializer,
       valueSerializer = new StringSerializer,
@@ -834,12 +901,12 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
                                    topic: String): Unit = {
     val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, 
s"key$i", s"value$i"))
     producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
-
-    val records = new ArrayBuffer[ConsumerRecord[String, String]]
+    var received = 0
     TestUtils.waitUntilTrue(() => {
-      records ++= consumer.poll(50).asScala
-      records.size == numRecords
-    }, s"Consumed ${records.size} records until timeout instead of the 
expected $numRecords records")
+      received += consumer.poll(50).count
+      received >= numRecords
+    }, s"Consumed $received records until timeout instead of the expected 
$numRecords records")
+    assertEquals(numRecords, received)
   }
 
   private def verifyAuthenticationFailure(producer: KafkaProducer[_, _]): Unit 
= {
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 383c1e2..0ee7365 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 import java.util
 import util.Arrays.asList
 
-import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -289,14 +288,10 @@ class MetadataCacheTest {
       brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
-    try {
-      val result = cache.getTopicMetadata(Set(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
-      fail(s"Exception should be thrown by `getTopicMetadata` with 
non-supported SecurityProtocol, $result was returned instead")
-    }
-    catch {
-      case _: BrokerEndPointNotAvailableException => //expected
-    }
-
+    val topicMetadata = cache.getTopicMetadata(Set(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+    assertEquals(1, topicMetadata.size)
+    assertEquals(1, topicMetadata.head.partitionMetadata.size)
+    assertEquals(-1, topicMetadata.head.partitionMetadata.get(0).leaderId)
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to