This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65d262c KAFKA-6528: Fix transient test failure in
testThreadPoolResize (#4526)
65d262c is described below
commit 65d262cc30315ed47bd17e94c0db672de93d908a
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Feb 6 01:50:51 2018 -0800
KAFKA-6528: Fix transient test failure in testThreadPoolResize (#4526)
Add locking to access AbstractFetcherThread#partitionStates during dynamic
thread update. Also make testing of thread updates that trigger retries more
resilient.
Reviewers: Jason Gustafson <[email protected]>
---
.../kafka/server/AbstractFetcherManager.scala | 4 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 6 ++
.../server/DynamicBrokerReconfigurationTest.scala | 92 ++++++++++++++--------
3 files changed, 66 insertions(+), 36 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 6d88d8d..312123c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -70,9 +70,7 @@ abstract class AbstractFetcherManager(protected val name:
String, clientId: Stri
def resizeThreadPool(newSize: Int): Unit = {
def migratePartitions(newSize: Int): Unit = {
fetcherThreadMap.foreach { case (id, thread) =>
- val removedPartitions =
thread.partitionStates.partitionStates.asScala.map { case state =>
- state.topicPartition -> new
BrokerAndInitialOffset(thread.sourceBroker, state.value.fetchOffset)
- }.toMap
+ val removedPartitions = thread.partitionsAndOffsets
removeFetcherForPartitions(removedPartitions.keySet)
if (id.fetcherId >= newSize)
thread.shutdown()
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 925c330..39a7032 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -312,6 +312,12 @@ abstract class AbstractFetcherThread(name: String,
finally partitionMapLock.unlock()
}
+ private[server] def partitionsAndOffsets: Map[TopicPartition,
BrokerAndInitialOffset] = inLock(partitionMapLock) {
+ partitionStates.partitionStates.asScala.map { case state =>
+ state.topicPartition -> new BrokerAndInitialOffset(sourceBroker,
state.value.fetchOffset)
+ }.toMap
+ }
+
}
object AbstractFetcherThread {
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index b7f0ae8..cb2ac52 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -69,6 +69,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
private val servers = new ArrayBuffer[KafkaServer]
private val numServers = 3
+ private val numPartitions = 10
private val producers = new ArrayBuffer[KafkaProducer[String, String]]
private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
private val adminClients = new ArrayBuffer[AdminClient]()
@@ -122,7 +123,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
OffsetConfig.DefaultOffsetsTopicNumPartitions,
replicationFactor = numServers, servers,
servers.head.groupCoordinator.offsetsTopicConfigs)
- TestUtils.createTopic(zkClient, topic, numPartitions = 10,
replicationFactor = numServers, servers)
+ TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor =
numServers, servers)
createAdminClient(SecurityProtocol.SSL, SecureInternal)
TestMetricsReporter.testReporters.clear()
@@ -203,7 +204,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
@Test
def testKeyStoreAlter(): Unit = {
val topic2 = "testtopic2"
- TestUtils.createTopic(zkClient, topic2, numPartitions = 10,
replicationFactor = numServers, servers)
+ TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor =
numServers, servers)
// Start a producer and consumer that work with the current truststore.
// This should continue working while changes are made
@@ -241,7 +242,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
verifyProduceConsume(producer, consumer, 10, topic2)
// Verify that all messages sent with retries=0 while keystores were being
altered were consumed
- stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
+ stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
@@ -282,7 +283,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
// Verify that produce/consume worked throughout this test without any
retries in producer
- stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
+ stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
@@ -370,7 +371,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
servers.tail.foreach { server =>
assertEquals(Defaults.LogIndexSizeMaxBytes,
server.config.values.get(KafkaConfig.LogIndexSizeMaxBytesProp)) }
// Verify that produce/consume worked throughout this test without any
retries in producer
- stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests = false)
+ stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
@@ -418,9 +419,9 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
reconfigureServers(props, perBrokerConfig = false, (propName,
newSize.toString))
maybeVerifyThreadPoolSize(propName, newSize, threadPrefix)
}
- def verifyThreadPoolResize(propName: String, currentSize: => Int,
threadPrefix: String, mayFailRequests: Boolean): Unit = {
+ def verifyThreadPoolResize(propName: String, currentSize: => Int,
threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = {
maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
- val numRetries = if (mayFailRequests) 100 else 0
+ val numRetries = if (mayReceiveDuplicates) 100 else 0
val (producerThread, consumerThread) = startProduceConsume(numRetries)
var threadPoolSize = currentSize
(1 to 2).foreach { _ =>
@@ -429,20 +430,20 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
threadPoolSize = increasePoolSize(propName, threadPoolSize,
threadPrefix)
Thread.sleep(100)
}
- stopAndVerifyProduceConsume(producerThread, consumerThread,
mayFailRequests)
+ stopAndVerifyProduceConsume(producerThread, consumerThread,
mayReceiveDuplicates)
}
val config = servers.head.config
verifyThreadPoolResize(KafkaConfig.NumIoThreadsProp, config.numIoThreads,
- requestHandlerPrefix, mayFailRequests = false)
- verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp,
config.numNetworkThreads,
- networkThreadPrefix, mayFailRequests = true)
+ requestHandlerPrefix, mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.NumReplicaFetchersProp,
config.numReplicaFetchers,
- fetcherThreadPrefix, mayFailRequests = false)
+ fetcherThreadPrefix, mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.BackgroundThreadsProp,
config.backgroundThreads,
- "kafka-scheduler-", mayFailRequests = false)
+ "kafka-scheduler-", mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.NumRecoveryThreadsPerDataDirProp,
config.numRecoveryThreadsPerDataDir,
- "", mayFailRequests = false)
+ "", mayReceiveDuplicates = false)
+ verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp,
config.numNetworkThreads,
+ networkThreadPrefix, mayReceiveDuplicates = true)
}
@Test
@@ -1055,17 +1056,16 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
private def stopAndVerifyProduceConsume(producerThread: ProducerThread,
consumerThread: ConsumerThread,
- mayFailRequests: Boolean = false):
Unit = {
+ mayReceiveDuplicates: Boolean =
false): Unit = {
TestUtils.waitUntilTrue(() => producerThread.sent >= 10, "Messages not
sent")
producerThread.shutdown()
consumerThread.initiateShutdown()
consumerThread.awaitShutdown()
- if (!mayFailRequests)
- assertEquals(producerThread.sent, consumerThread.received)
- else {
- assertTrue(s"Some messages not received, sent=${producerThread.sent}
received=${consumerThread.received}",
- consumerThread.received >= producerThread.sent)
- }
+ assertEquals(producerThread.lastSent, consumerThread.lastReceived)
+ assertEquals(0, consumerThread.missingRecords.size)
+ if (!mayReceiveDuplicates)
+ assertFalse("Duplicates not expected", consumerThread.duplicates)
+ assertFalse("Some messages received out of order",
consumerThread.outOfOrder)
}
private def verifyConnectionFailure(producer: KafkaProducer[String,
String]): Future[_] = {
@@ -1128,32 +1128,58 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
private class ProducerThread(clientId: String, retries: Int) extends
ShutdownableThread(clientId, isInterruptible = false) {
private val producer = createProducer(trustStoreFile1, retries, clientId)
+ val lastSent = new ConcurrentHashMap[Int, Int]()
@volatile var sent = 0
override def doWork(): Unit = {
- try {
- while (isRunning) {
- sent += 1
- val record = new ProducerRecord(topic, s"key$sent",
s"value$sent")
- producer.send(record).get(10, TimeUnit.SECONDS)
- }
- } finally {
- producer.close()
- }
+ try {
+ while (isRunning) {
+ val key = sent.toString
+ val partition = sent % numPartitions
+ val record = new ProducerRecord(topic, partition, key, s"value$sent")
+ producer.send(record).get(10, TimeUnit.SECONDS)
+ lastSent.put(partition, sent)
+ sent += 1
+ }
+ } finally {
+ producer.close()
}
+ }
}
private class ConsumerThread(producerThread: ProducerThread) extends
ShutdownableThread("test-consumer", isInterruptible = false) {
private val consumer = createConsumer("group1", trustStoreFile1)
+ val lastReceived = new ConcurrentHashMap[Int, Int]()
+ val missingRecords = new ConcurrentLinkedQueue[Int]()
+ @volatile var outOfOrder = false
+ @volatile var duplicates = false
@volatile var lastBatch: ConsumerRecords[String, String] = _
@volatile private var endTimeMs = Long.MaxValue
- var received = 0
+ @volatile var received = 0
override def doWork(): Unit = {
try {
- while (isRunning || (received < producerThread.sent &&
System.currentTimeMillis < endTimeMs)) {
+ while (isRunning || (lastReceived != producerThread.lastSent &&
System.currentTimeMillis < endTimeMs)) {
val records = consumer.poll(50)
received += records.count
- if (!records.isEmpty)
+ if (!records.isEmpty) {
lastBatch = records
+ records.partitions.asScala.foreach { tp =>
+ val partition = tp.partition
+ records.records(tp).asScala.map(_.key.toInt).foreach { key =>
+ val prevKey =
lastReceived.asScala.get(partition).getOrElse(partition - numPartitions)
+ val expectedKey = prevKey + numPartitions
+ if (key < prevKey)
+ outOfOrder = true
+ else if (key == prevKey)
+ duplicates = true
+ else {
+ for (i <- expectedKey until key by numPartitions)
+ missingRecords.add(expectedKey)
+ }
+ lastReceived.put(partition, key)
+ missingRecords.remove(key)
+ }
+ }
+ }
}
} finally {
consumer.close()
--
To stop receiving notification emails like this one, please contact
[email protected].