This is an automated email from the ASF dual-hosted git repository.
mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 67fa5a4 METRON-2227 Increase Kafka test harness timeout (tigerquoll
via mmiklavc) closes apache/metron#1493
67fa5a4 is described below
commit 67fa5a403b01d0f7c8607c06e63f9d06f8b8cbc1
Author: tigerquoll <[email protected]>
AuthorDate: Wed Sep 4 11:47:04 2019 -0600
METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc)
closes apache/metron#1493
---
.../integration/components/KafkaComponent.java | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index 08910be..0fa414b 100644
---
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -65,6 +65,10 @@ import org.slf4j.LoggerFactory;
public class KafkaComponent implements InMemoryComponent {
protected static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final long KAFKA_PROPAGATE_TIMEOUT_MS = 10000l;
+ public static final int ZK_SESSION_TIMEOUT_MS = 30000;
+ public static final int ZK_CONNECTION_TIMEOUT_MS = 30000;
+ public static final int KAFKA_ZOOKEEPER_TIMEOUT_MS = 1000000;
public static class Topic {
public int numPartitions;
@@ -159,11 +163,11 @@ public class KafkaComponent implements InMemoryComponent {
// setup Zookeeper
zookeeperConnectString =
topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
- zkClient = new ZkClient(zookeeperConnectString, 30000, 30000,
ZKStringSerializer$.MODULE$);
+ zkClient = new ZkClient(zookeeperConnectString, ZK_SESSION_TIMEOUT_MS,
ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
// setup Broker
Properties props = TestUtilsWrapper.createBrokerConfig(0,
zookeeperConnectString, brokerPort);
- props.setProperty("zookeeper.connection.timeout.ms","1000000");
+ props.setProperty("zookeeper.connection.timeout.ms",
Integer.toString(KAFKA_ZOOKEEPER_TIMEOUT_MS));
KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
@@ -175,7 +179,7 @@ public class KafkaComponent implements InMemoryComponent {
for(Topic topic : getTopics()) {
try {
- createTopic(topic.name, topic.numPartitions, true);
+ createTopic(topic.name, topic.numPartitions,
KAFKA_PROPAGATE_TIMEOUT_MS);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to create topic", e);
}
@@ -288,26 +292,26 @@ public class KafkaComponent implements InMemoryComponent {
}
public void createTopic(String name) throws InterruptedException {
- createTopic(name, 1, true);
+ createTopic(name, 1, KAFKA_PROPAGATE_TIMEOUT_MS);
}
- public void waitUntilMetadataIsPropagated(String topic, int numPartitions) {
+ public void waitUntilMetadataIsPropagated(String topic, int numPartitions,
long timeOutMS) {
List<KafkaServer> servers = new ArrayList<>();
servers.add(kafkaServer);
for(int part = 0;part < numPartitions;++part) {
-
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers),
topic, part, 5000);
+
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers),
topic, part, timeOutMS);
}
}
- public void createTopic(String name, int numPartitions, boolean
waitUntilMetadataIsPropagated) throws InterruptedException {
+ public void createTopic(String name, int numPartitions, long
waitThisLongForMetadataToPropagate) throws InterruptedException {
ZkUtils zkUtils = null;
Level oldLevel = UnitTestHelper.getJavaLoggingLevel();
try {
UnitTestHelper.setJavaLoggingLevel(Level.OFF);
zkUtils = ZkUtils.apply(zookeeperConnectString, 30000, 30000, false);
AdminUtilsWrapper.createTopic(zkUtils, name, numPartitions, 1, new
Properties());
- if (waitUntilMetadataIsPropagated) {
- waitUntilMetadataIsPropagated(name, numPartitions);
+ if (waitThisLongForMetadataToPropagate > 0) {
+ waitUntilMetadataIsPropagated(name, numPartitions,
waitThisLongForMetadataToPropagate);
}
}catch(TopicExistsException tee) {
}finally {