This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 8698b7a KAFKA-10158: Fix flaky
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022)
8698b7a is described below
commit 8698b7afcb432e7049403c2a556d1867cea1fe26
Author: Brian Byrne <[email protected]>
AuthorDate: Sun Jul 26 08:28:47 2020 -0700
KAFKA-10158: Fix flaky
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022)
Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to
allow
for throttling to take place. This helps avoid a race condition where the
reassignment would complete more quickly than expected causing an
assertion to fail some times.
Reviewers: Lucas Bradstreet <[email protected]>, Jason Gustafson
<[email protected]>, Chia-Ping Tsai <[email protected]>, Ismael Juma
<[email protected]>
---
.../kafka/admin/TopicCommandWithAdminClientTest.scala | 19 +++++++++++++++++--
.../org/apache/kafka/jmh/server/CheckpointBench.java | 3 ++-
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index a1e49e4..28323f5 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -50,14 +50,20 @@ class TopicCommandWithAdminClientTest extends
KafkaServerTestHarness with Loggin
/**
* Implementations must override this method to return a set of
KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select
their ports randomly when servers are started.
+ *
+ * Note the replica fetch max bytes is set to `1` in order to throttle the
rate of replication for test
+ * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
*/
override def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(
numConfigs = 6,
zkConnect = zkConnect,
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4
-> "rack3", 5 -> "rack3"),
numPartitions = numPartitions,
- defaultReplicationFactor = defaultReplicationFactor
- ).map(KafkaConfig.fromProps)
+ defaultReplicationFactor = defaultReplicationFactor,
+ ).map { props =>
+ props.put(KafkaConfig.ReplicaFetchMaxBytesProp, "1")
+ KafkaConfig.fromProps(props)
+ }
private val numPartitions = 1
private val defaultReplicationFactor = 1.toShort
@@ -672,8 +678,13 @@ class TopicCommandWithAdminClientTest extends
KafkaServerTestHarness with Loggin
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, partitions,
replicationFactor).configs(configMap))).all().get()
waitForTopicCreated(testTopicName)
+
+ // Produce multiple batches.
+ TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages =
10, acks = -1)
TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages =
10, acks = -1)
+ // Enable throttling. Note the broker config sets the replica max fetch
bytes to `1` upon to minimize replication
+ // throughput so the reassignment doesn't complete quickly.
val brokerIds = servers.map(_.config.brokerId)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds,
Set(tp), throttleBytes = 1)
@@ -703,6 +714,10 @@ class TopicCommandWithAdminClientTest extends
KafkaServerTestHarness with Loggin
topicService.describeTopic(new
TopicCommandOptions(Array("--under-replicated-partitions"))))
assertEquals(s"--under-replicated-partitions shouldn't return anything:
'$underReplicatedOutput'", "", underReplicatedOutput)
+ // Verify reassignment is still ongoing.
+ val reassignments =
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments.get().get(tp)
+ assertFalse(Option(reassignments).forall(_.addingReplicas.isEmpty))
+
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds,
Set(tp))
TestUtils.waitForAllReassignmentsToComplete(adminClient)
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index e103f13..7e24857 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -97,7 +97,8 @@ public class CheckpointBench {
this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
this.brokerProperties =
KafkaConfig.fromProps(TestUtils.createBrokerConfig(
0, TestUtils.MockZkConnect(), true, true, 9092,
Option.empty(), Option.empty(),
- Option.empty(), true, false, 0, false, 0, false, 0,
Option.empty(), 1, true, 1, (short) 1));
+ Option.empty(), true, false, 0, false, 0, false, 0,
Option.empty(), 1, true, 1,
+ (short) 1));
this.metrics = new Metrics();
this.time = new MockTime();
this.failureChannel = new
LogDirFailureChannel(brokerProperties.logDirs().size());