This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 67b8780 KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)
67b8780 is described below
commit 67b87807b21548c7b5c675284438429d9f486ca0
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Tue Aug 4 21:53:10 2020 +0800
KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)
Creating a topic may fail (due to timeout) in running system tests.
However, `RoundTripWorker` does not ignore `TopicExistsException` which makes
`round_trip_fault_test.py` be a flaky one.
More specifically, a network exception can cause the `CreateTopics` request
to reach Kafka but Trogdor retry it
and hit a `TopicAlreadyExists` exception on the retry, failing the test.
Reviewers: Ismael Juma <[email protected]>
---
tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java | 3 ++-
.../main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java | 2 +-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index faf2d96..812129e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -259,7 +260,7 @@ public final class WorkerUtils {
// map will always contain the topic since all topics in
'topicsExists' are in given
// 'topics' map
int partitions = topicsInfo.get(desc.name()).numPartitions();
- if (desc.partitions().size() != partitions) {
+ if (partitions != CreateTopicsRequest.NO_NUM_PARTITIONS &&
desc.partitions().size() != partitions) {
String str = "Topic '" + desc.name() + "' exists, but has "
+ desc.partitions().size() + " partitions, while
requested "
+ " number of partitions is " + partitions;
diff --git
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 643d22c..643555a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -147,7 +147,7 @@ public class RoundTripWorker implements TaskWorker {
}
status.update(new TextNode("Creating " +
newTopics.keySet().size() + " topic(s)"));
WorkerUtils.createTopics(log, spec.bootstrapServers(),
spec.commonClientConf(),
- spec.adminClientConf(), newTopics, true);
+ spec.adminClientConf(), newTopics, false);
status.update(new TextNode("Created " +
newTopics.keySet().size() + " topic(s)"));
toSendTracker = new ToSendTracker(spec.maxMessages());
toReceiveTracker = new ToReceiveTracker();