This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 596c6c0 KAFKA-7231; Ensure NetworkClient uses overridden request
timeout (#5444)
596c6c0 is described below
commit 596c6c0c0b27c13f2017c770ea37cd39e27e5dcf
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Aug 2 11:02:38 2018 -0700
KAFKA-7231; Ensure NetworkClient uses overridden request timeout (#5444)
Fixed incorrect use of default timeout instead of the argument explicitly
passed to `newClientRequest`.
Reviewers: Ron Dagostino <[email protected]>, Ismael Juma
<[email protected]>
---
.../org/apache/kafka/clients/NetworkClient.java | 2 +-
.../apache/kafka/clients/NetworkClientTest.java | 36 ++++++++++++----------
2 files changed, 20 insertions(+), 18 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index fd16fe6..e4ba197 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1060,7 +1060,7 @@ public class NetworkClient implements KafkaClient {
int requestTimeoutMs,
RequestCompletionHandler callback) {
return new ClientRequest(nodeId, requestBuilder, correlation++,
clientId, createdTimeMs, expectResponse,
- defaultRequestTimeoutMs, callback);
+ requestTimeoutMs, callback);
}
public boolean discoverBrokerVersions() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e13fcef..2876570 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -52,7 +52,7 @@ import static org.junit.Assert.assertTrue;
public class NetworkClientTest {
- protected final int minRequestTimeoutMs = 1000;
+ protected final int defaultRequestTimeoutMs = 1000;
protected final MockTime time = new MockTime();
protected final MockSelector selector = new MockSelector(time);
protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
@@ -70,19 +70,19 @@ public class NetworkClientTest {
private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 *
1024,
- minRequestTimeoutMs, time, true, new ApiVersions(), new
LogContext());
+ defaultRequestTimeoutMs, time, true, new ApiVersions(), new
LogContext());
}
private NetworkClient createNetworkClientWithStaticNodes() {
return new NetworkClient(selector, new
ManualMetadataUpdater(Arrays.asList(node)),
- "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024,
minRequestTimeoutMs,
+ "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs,
time, true, new ApiVersions(), new LogContext());
}
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
- 64 * 1024, 64 * 1024, minRequestTimeoutMs, time, false, new
ApiVersions(), new LogContext());
+ 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, time, false,
new ApiVersions(), new LogContext());
}
@Before
@@ -144,7 +144,7 @@ public class NetworkClientTest {
Collections.emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = networkClient.newClientRequest(
- node.idString(), builder, time.milliseconds(), true,
minRequestTimeoutMs, handler);
+ node.idString(), builder, time.milliseconds(), true,
defaultRequestTimeoutMs, handler);
networkClient.send(request, time.milliseconds());
networkClient.poll(1, time.milliseconds());
assertEquals(1, networkClient.inFlightRequestCount());
@@ -187,18 +187,20 @@ public class NetworkClientTest {
ProduceRequest.Builder builder =
ProduceRequest.Builder.forCurrentMagic((short) 1,
1000, Collections.emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
- int requestTimeoutMs = minRequestTimeoutMs + 5000;
+ int requestTimeoutMs = defaultRequestTimeoutMs + 5000;
ClientRequest request = client.newClientRequest(node.idString(),
builder, time.milliseconds(), true,
requestTimeoutMs, handler);
+ assertEquals(requestTimeoutMs, request.requestTimeoutMs());
testRequestTimeout(request);
}
@Test
- public void testMinRequestTimeout() {
+ public void testDefaultRequestTimeout() {
awaitReady(client, node); // has to be before creating any request, as
it may send ApiVersionsRequest and its response is mocked with correlation id 0
ProduceRequest.Builder builder =
ProduceRequest.Builder.forCurrentMagic((short) 1,
1000, Collections.emptyMap());
ClientRequest request = client.newClientRequest(node.idString(),
builder, time.milliseconds(), true);
+ assertEquals(defaultRequestTimeoutMs, request.requestTimeoutMs());
testRequestTimeout(request);
}
@@ -222,7 +224,7 @@ public class NetworkClientTest {
Collections.emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = client.newClientRequest(node.idString(),
builder, time.milliseconds(), true,
- minRequestTimeoutMs, handler);
+ defaultRequestTimeoutMs, handler);
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
ResponseHeader respHeader = new
ResponseHeader(request.correlationId());
@@ -281,7 +283,7 @@ public class NetworkClientTest {
Collections.emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = client.newClientRequest(node.idString(),
builder, time.milliseconds(), true,
- minRequestTimeoutMs, handler);
+ defaultRequestTimeoutMs, handler);
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
ResponseHeader respHeader = new
ResponseHeader(request.correlationId());
@@ -349,7 +351,7 @@ public class NetworkClientTest {
awaitReady(clientWithNoExponentialBackoff, node);
selector.serverDisconnect(node.idString());
- clientWithNoExponentialBackoff.poll(minRequestTimeoutMs,
time.milliseconds());
+ clientWithNoExponentialBackoff.poll(defaultRequestTimeoutMs,
time.milliseconds());
long delay = clientWithNoExponentialBackoff.connectionDelay(node,
time.milliseconds());
assertEquals(reconnectBackoffMsTest, delay);
@@ -361,7 +363,7 @@ public class NetworkClientTest {
// Start connecting and disconnect before the connection is established
client.ready(node, time.milliseconds());
selector.serverDisconnect(node.idString());
- client.poll(minRequestTimeoutMs, time.milliseconds());
+ client.poll(defaultRequestTimeoutMs, time.milliseconds());
// Second attempt should have the same behaviour as exponential
backoff is disabled
assertEquals(reconnectBackoffMsTest, delay);
@@ -391,7 +393,7 @@ public class NetworkClientTest {
// First disconnection
selector.serverDisconnect(node.idString());
- client.poll(minRequestTimeoutMs, time.milliseconds());
+ client.poll(defaultRequestTimeoutMs, time.milliseconds());
long delay = client.connectionDelay(node, time.milliseconds());
long expectedDelay = reconnectBackoffMsTest;
double jitter = 0.3;
@@ -404,7 +406,7 @@ public class NetworkClientTest {
// Start connecting and disconnect before the connection is established
client.ready(node, time.milliseconds());
selector.serverDisconnect(node.idString());
- client.poll(minRequestTimeoutMs, time.milliseconds());
+ client.poll(defaultRequestTimeoutMs, time.milliseconds());
// Second attempt should take twice as long with twice the jitter
expectedDelay = Math.round(delay * 2);
@@ -423,13 +425,13 @@ public class NetworkClientTest {
long now = time.milliseconds();
ClientRequest request = client.newClientRequest(node.idString(),
builder, now, true);
client.send(request, now);
- client.poll(minRequestTimeoutMs, now);
+ client.poll(defaultRequestTimeoutMs, now);
assertEquals(1, client.inFlightRequestCount(node.idString()));
assertTrue(client.hasInFlightRequests(node.idString()));
assertTrue(client.hasInFlightRequests());
selector.close(node.idString());
- List<ClientResponse> responses = client.poll(minRequestTimeoutMs,
time.milliseconds());
+ List<ClientResponse> responses = client.poll(defaultRequestTimeoutMs,
time.milliseconds());
assertEquals(1, responses.size());
assertTrue(responses.iterator().next().wasDisconnected());
}
@@ -474,11 +476,11 @@ public class NetworkClientTest {
}
};
- ClientRequest request1 = client.newClientRequest(node.idString(),
builder, now, true, minRequestTimeoutMs, callback);
+ ClientRequest request1 = client.newClientRequest(node.idString(),
builder, now, true, defaultRequestTimeoutMs, callback);
client.send(request1, now);
client.poll(0, now);
- ClientRequest request2 = client.newClientRequest(node.idString(),
builder, now, true, minRequestTimeoutMs, callback);
+ ClientRequest request2 = client.newClientRequest(node.idString(),
builder, now, true, defaultRequestTimeoutMs, callback);
client.send(request2, now);
client.poll(0, now);