This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new d3bf76b KAFKA-8379; Fix
KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
d3bf76b is described below
commit d3bf76b54c5a7ed6ca9f5b027e6defbaf37d4b53
Author: Rajini Sivaram <[email protected]>
AuthorDate: Fri May 17 14:20:04 2019 +0100
KAFKA-8379; Fix KafkaAdminClientTest.testUnreachableBootstrapServer (#6753)
Initiate `unreachable server` scenario before starting admin client to
avoid timing issues if node is disconnected from the test thread while admin
client network thread is processing a metadata request.
Reviewers: Ismael Juma <[email protected]>
---
.../kafka/clients/admin/AdminClientUnitTestEnv.java | 18 ++++++++++++------
.../kafka/clients/admin/KafkaAdminClientTest.java | 5 +++--
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index 6023c63..42166b4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,14 +54,18 @@ public class AdminClientUnitTestEnv implements
AutoCloseable {
}
public AdminClientUnitTestEnv(Time time, Cluster cluster, String... vals) {
- this(time, cluster, newStrMap(vals));
+ this(time, cluster, clientConfigs(vals));
}
public AdminClientUnitTestEnv(Time time, Cluster cluster) {
- this(time, cluster, newStrMap());
+ this(time, cluster, clientConfigs());
}
public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String,
Object> config) {
+ this(time, cluster, config, Collections.emptyMap());
+ }
+
+ public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String,
Object> config, Map<Node, Long> unreachableNodes) {
this.time = time;
this.cluster = cluster;
AdminClientConfig adminClientConfig = new AdminClientConfig(config);
@@ -86,6 +91,7 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
});
metadataManager.update(cluster, time.milliseconds());
+ unreachableNodes.forEach(mockClient::setUnreachable);
this.adminClient = KafkaAdminClient.createInternal(adminClientConfig,
metadataManager, mockClient, time);
}
@@ -110,15 +116,15 @@ public class AdminClientUnitTestEnv implements
AutoCloseable {
this.adminClient.close();
}
- private static Map<String, Object> newStrMap(String... vals) {
+ static Map<String, Object> clientConfigs(String... overrides) {
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
- if (vals.length % 2 != 0) {
+ if (overrides.length % 2 != 0) {
throw new IllegalStateException();
}
- for (int i = 0; i < vals.length; i += 2) {
- map.put(vals[i], vals[i + 1]);
+ for (int i = 0; i < overrides.length; i += 2) {
+ map.put(overrides[i], overrides[i + 1]);
}
return map;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7e9f4c4..388aa88 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -259,10 +259,11 @@ public class KafkaAdminClientTest {
// which prevents AdminClient from being able to send the initial
metadata request
Cluster cluster = Cluster.bootstrap(Collections.singletonList(new
InetSocketAddress("localhost", 8121)));
- try (final AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
+ Map<Node, Long> unreachableNodes =
Collections.singletonMap(cluster.nodes().get(0), 200L);
+ try (final AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(Time.SYSTEM, cluster,
+ AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
Cluster discoveredCluster = mockCluster(0);
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
env.kafkaClient().prepareResponse(body -> body instanceof
MetadataRequest,
new MetadataResponse(discoveredCluster.nodes(),
discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));