Repository: kafka
Updated Branches:
  refs/heads/0.10.1 fc5f48aad -> d48415f18


KAFKA-4184; Intermittent failures in 
ReplicationQuotasTest.shouldBootstrapTwoBrokersWithFollowerThrottle

Build is unstable, so it's hard to validate this change. Of the various builds 
up until 11am BST the test ran twice and passed twice.

Author: Ben Stopford <benstopf...@gmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #1873 from benstopford/KAFKA-4184

(cherry picked from commit 3663275cf066b7715cc11b26fd9c144bbff1c373)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d48415f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d48415f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d48415f1

Branch: refs/heads/0.10.1
Commit: d48415f185d1882f0a3b89a3ce03ea84893393ba
Parents: fc5f48a
Author: Ben Stopford <benstopf...@gmail.com>
Authored: Tue Sep 20 14:53:48 2016 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Tue Sep 20 15:39:17 2016 +0100

----------------------------------------------------------------------
 .../server/ReplicationQuotaManagerTest.scala    |  3 +-
 .../kafka/server/ReplicationQuotasTest.scala    | 93 ++++++++++----------
 2 files changed, 49 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d48415f1/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index 5c41372..3616b7b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -14,13 +14,12 @@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
-package unit.kafka.server
+package kafka.server
 
 import java.util.Collections
 
 import kafka.common.TopicAndPartition
 import kafka.server.QuotaType._
-import kafka.server.{QuotaType, ReplicationQuotaManager, 
ReplicationQuotaManagerConfig}
 import org.apache.kafka.common.metrics.{Quota, MetricConfig, Metrics}
 import org.apache.kafka.common.utils.MockTime
 import org.junit.Assert.{assertFalse, assertTrue, assertEquals}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d48415f1/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index af7c4c8..88b9b89 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -15,7 +15,7 @@
   * limitations under the License.
   */
 
-package unit.kafka.server
+package kafka.server
 
 import java.util.Properties
 
@@ -25,7 +25,6 @@ import kafka.common._
 import kafka.log.LogConfig._
 import kafka.server.KafkaConfig.fromProps
 import kafka.server.QuotaType._
-import kafka.server._
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
@@ -85,8 +84,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     brokers = (100 to 105).map { id => 
TestUtils.createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
-    //Given six partitions, lead on nodes 0,1,2,3,4,5 but will followers on 
node 6,7 (not started yet)
-    //And two extra partitions 6,7, which we don't intend on throttling
+    //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on 
node 6,7 (not started yet)
+    //And two extra partitions 6,7, which we don't intend on throttling.
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
Map(
       0 -> Seq(100, 106), //Throttled
       1 -> Seq(101, 106), //Throttled
@@ -99,7 +98,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     ))
 
     val msg = msg100KB
-    val msgCount: Int = 1000
+    val msgCount = 100
     val expectedDuration = 10 //Keep the test to N seconds
     var throttle: Long = msgCount * msg.length / expectedDuration
     if (!leaderThrottle) throttle = throttle * 3 //Follower throttle needs to 
replicate 3x as fast to get the same duration as there are three replicas to 
replicate for each of the two follower brokers
@@ -108,16 +107,16 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     (100 to 107).foreach { brokerId =>
       changeBrokerConfig(zkUtils, Seq(brokerId), 
property(KafkaConfig.ThrottledReplicationRateLimitProp, throttle.toString))
     }
-    if (leaderThrottle)
-      changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, 
"0:100,1:101,2:102,3:103,4:104,5:105")) //partition-broker:... throttle the 6 
leaders
-    else
-      changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, 
"0:106,1:106,2:106,3:107,4:107,5:107")) //partition-broker:... throttle the two 
followers
+
+    //Either throttle the six leaders or the two followers
+    val throttledReplicas = if (leaderThrottle) 
"0:100,1:101,2:102,3:103,4:104,5:105" else "0:106,1:106,2:106,3:107,4:107,5:107"
+    changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, 
throttledReplicas))
 
     //Add data equally to each partition
     producer = 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers), 
retries = 5, acks = 0)
     (0 until msgCount).foreach { x =>
       (0 to 7).foreach { partition =>
-        producer.send(new ProducerRecord(topic, partition, null, msg)).get
+        producer.send(new ProducerRecord(topic, partition, null, msg))
       }
     }
 
@@ -130,20 +129,15 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val start = System.currentTimeMillis()
 
     //When we create the 2 new, empty brokers
-    brokers = brokers :+ 
TestUtils.createServer(fromProps(createBrokerConfig(106, zkConnect)))
-    brokers = brokers :+ 
TestUtils.createServer(fromProps(createBrokerConfig(107, zkConnect)))
+    createBrokers(106 to 107)
 
     //Check that throttled config correctly migrated to the new brokers
     (106 to 107).foreach { brokerId =>
       assertEquals(throttle, 
brokerFor(brokerId).quotaManagers.follower.upperBound())
     }
     if (!leaderThrottle) {
-      (0 to 2).foreach { partition =>
-        assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(new 
TopicAndPartition(topic, partition)))
-      }
-      (3 to 5).foreach { partition =>
-        assertTrue(brokerFor(107).quotaManagers.follower.isThrottled(new 
TopicAndPartition(topic, partition)))
-      }
+      (0 to 2).foreach { partition => 
assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(tp(partition))) }
+      (3 to 5).foreach { partition => 
assertTrue(brokerFor(107).quotaManagers.follower.isThrottled(tp(partition))) }
     }
 
     //Wait for non-throttled partitions to replicate first
@@ -156,33 +150,24 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     val throttledTook = System.currentTimeMillis() - start
 
-    //Check the recorded throttled rate is what we expect
-    if (leaderThrottle) {
-      (100 to 105).map(brokerFor(_)).foreach { broker =>
-        val metricName = broker.metrics.metricName("byte-rate", 
LeaderReplication.toString, "Tracking byte-rate for" + LeaderReplication)
-        val measuredRate = broker.metrics.metrics.asScala(metricName).value()
-        info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded 
Rate was:$measuredRate")
-        assertEquals(throttle, measuredRate, percentError(25, throttle))
-      }
-    } else {
-      (106 to 107).map(brokerFor(_)).foreach { broker =>
-        val metricName = broker.metrics.metricName("byte-rate", 
FollowerReplication.toString, "Tracking byte-rate for" + FollowerReplication)
-        val measuredRate = broker.metrics.metrics.asScala(metricName).value()
-        info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded 
Rate was:$measuredRate")
-        assertEquals(throttle, measuredRate, percentError(25, throttle))
-      }
-    }
-
     //Check the times for throttled/unthrottled are each side of what we expect
-    info(s"Unthrottled took: $unthrottledTook, Throttled took: $throttledTook, 
for expeted $expectedDuration secs")
-    assertTrue(s"Unthrottled replication of ${unthrottledTook}ms should be < 
${expectedDuration * 1000}ms",
-      unthrottledTook < expectedDuration * 1000)
-    assertTrue((s"Throttled replication of ${throttledTook}ms should be > 
${expectedDuration * 1000}ms"),
-      throttledTook > expectedDuration * 1000)
-    assertTrue((s"Throttled replication of ${throttledTook}ms should be < 
${expectedDuration * 1500}ms"),
-      throttledTook < expectedDuration * 1000 * 1.5)
+    val throttledLowerBound = expectedDuration * 1000 * 0.9
+    val throttledUpperBound = expectedDuration * 1000 * 3
+    assertTrue(s"Expected $unthrottledTook < $throttledLowerBound", 
unthrottledTook < throttledLowerBound)
+    assertTrue(s"Expected $throttledTook > $throttledLowerBound", 
throttledTook > throttledLowerBound)
+    assertTrue(s"Expected $throttledTook < $throttledUpperBound", 
throttledTook < throttledUpperBound)
+
+    // Check the rate metric matches what we expect.
+    // In a short test the brokers can be read unfairly, so assert against the 
average
+    val rateUpperBound = throttle * 1.1
+    val rateLowerBound = throttle * 0.5
+    val rate = if (leaderThrottle) avRate(LeaderReplication, 100 to 105) else 
avRate(FollowerReplication, 106 to 107)
+    assertTrue(s"Expected ${rate} < $rateUpperBound", rate < rateUpperBound)
+    assertTrue(s"Expected ${rate} > $rateLowerBound", rate > rateLowerBound)
   }
 
+  def tp(partition: Int): TopicAndPartition = new TopicAndPartition(topic, 
partition)
+
   @Test
   def shouldThrottleOldSegments(): Unit = {
     /**
@@ -230,13 +215,31 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
   private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: 
Int): Boolean = {
     waitUntilTrue(() => {
-      offset == 
brokerFor(brokerId).getLogManager.getLog(TopicAndPartition(topic, 
partitionId)).map(_.logEndOffset).getOrElse(0)
+      offset == 
brokerFor(brokerId).getLogManager.getLog(TopicAndPartition(topic, partitionId))
+        .map(_.logEndOffset).getOrElse(0)
     }, s"Offsets did not match for partition $partitionId on broker 
$brokerId", 60000)
   }
 
   private def property(key: String, value: String) = {
-    new Properties() { put(key, value) }
+    val props = new Properties()
+    props.put(key, value)
+    props
+  }
+
+  private def brokerFor(id: Int): KafkaServer = 
brokers.filter(_.config.brokerId == id).head
+
+  def createBrokers(brokerIds: Seq[Int]): Unit = {
+    brokerIds.foreach { id =>
+      brokers = brokers :+ 
TestUtils.createServer(fromProps(createBrokerConfig(id, zkConnect)))
+    }
+  }
+
+  private def avRate(replicationType: QuotaType, brokers: Seq[Int]): Double = {
+    brokers.map(brokerFor).map(measuredRate(_, replicationType)).sum / 
brokers.length
   }
 
-  private def brokerFor(id: Int): KafkaServer = 
brokers.filter(_.config.brokerId == id)(0)
+  private def measuredRate(broker: KafkaServer, repType: QuotaType): Double = {
+    val metricName = broker.metrics.metricName("byte-rate", repType.toString)
+    broker.metrics.metrics.asScala(metricName).value
+  }
 }

Reply via email to