[
https://issues.apache.org/jira/browse/KAFKA-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508828#comment-16508828
]
ASF GitHub Bot commented on KAFKA-5570:
---------------------------------------
ijuma closed pull request #3503: KAFKA-5570: Join request's timeout should be
slightly higher than the rebalance timeout
URL: https://github.com/apache/kafka/pull/3503
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 11119646f4a..3d6c2819ae9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -30,6 +30,7 @@
private final int correlationId;
private final String clientId;
private final long createdTimeMs;
+ private final int timeoutMs;
private final boolean expectResponse;
private final RequestCompletionHandler callback;
@@ -39,6 +40,7 @@
* @param correlationId The correlation id for this client request
* @param clientId The client ID to use for the header
* @param createdTimeMs The unix timestamp in milliseconds for the time at
which this request was created.
+* @param timeoutMs The request timeout in milliseconds.
* @param expectResponse Should we expect a response message or is this
request complete once it is sent?
* @param callback A callback to execute when the response has been
received (or null if no callback is necessary)
*/
@@ -47,6 +49,7 @@ public ClientRequest(String destination,
int correlationId,
String clientId,
long createdTimeMs,
+ int timeoutMs,
boolean expectResponse,
RequestCompletionHandler callback) {
this.destination = destination;
@@ -54,6 +57,7 @@ public ClientRequest(String destination,
this.correlationId = correlationId;
this.clientId = clientId;
this.createdTimeMs = createdTimeMs;
+ this.timeoutMs = timeoutMs;
this.expectResponse = expectResponse;
this.callback = callback;
}
@@ -66,6 +70,7 @@ public String toString() {
", correlationId=" + correlationId +
", clientId=" + clientId +
", createdTimeMs=" + createdTimeMs +
+ ", timeoutMs=" + timeoutMs +
", requestBuilder=" + requestBuilder +
")";
}
@@ -98,6 +103,10 @@ public long createdTimeMs() {
return createdTimeMs;
}
+ public int timeoutMs() {
+ return timeoutMs;
+ }
+
public int correlationId() {
return correlationId;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index f9773297dbb..d58827fcd0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -17,10 +17,10 @@
package org.apache.kafka.clients;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -31,6 +31,7 @@
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests =
new HashMap<>();
+ private Integer minTimeoutMs;
public InFlightRequests(int maxInFlightRequestsPerConnection) {
this.maxInFlightRequestsPerConnection =
maxInFlightRequestsPerConnection;
@@ -46,6 +47,8 @@ public void add(NetworkClient.InFlightRequest request) {
reqs = new ArrayDeque<>();
this.requests.put(destination, reqs);
}
+ if (minTimeoutMs != null)
+ minTimeoutMs = Math.min(request.timeoutMs, minTimeoutMs);
reqs.addFirst(request);
}
@@ -60,10 +63,12 @@ public void add(NetworkClient.InFlightRequest request) {
}
/**
- * Get the oldest request (the one that that will be completed next) for
the given node
+ * Complete the oldest request (the one that that will be completed next)
for the given node
*/
public NetworkClient.InFlightRequest completeNext(String node) {
- return requestQueue(node).pollLast();
+ NetworkClient.InFlightRequest request = requestQueue(node).pollLast();
+ minTimeoutMs = null;
+ return request;
}
/**
@@ -80,7 +85,9 @@ public void add(NetworkClient.InFlightRequest request) {
* @return The request
*/
public NetworkClient.InFlightRequest completeLastSent(String node) {
- return requestQueue(node).pollFirst();
+ NetworkClient.InFlightRequest request = requestQueue(node).pollFirst();
+ minTimeoutMs = null;
+ return request;
}
/**
@@ -142,6 +149,7 @@ public boolean isEmpty() {
*/
public Iterable<NetworkClient.InFlightRequest> clearAll(String node) {
Deque<NetworkClient.InFlightRequest> reqs = requests.remove(node);
+ minTimeoutMs = null;
return (reqs == null) ?
Collections.<NetworkClient.InFlightRequest>emptyList() : reqs;
}
@@ -149,23 +157,36 @@ public boolean isEmpty() {
* Returns a list of nodes with pending in-flight request, that need to be
timed out
*
* @param now current time in milliseconds
- * @param requestTimeoutMs max time to wait for the request to be completed
* @return list of nodes
*/
- public List<String> getNodesWithTimedOutRequests(long now, int
requestTimeoutMs) {
- List<String> nodeIds = new LinkedList<>();
- for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>>
requestEntry : requests.entrySet()) {
- String nodeId = requestEntry.getKey();
- Deque<NetworkClient.InFlightRequest> deque =
requestEntry.getValue();
-
- if (!deque.isEmpty()) {
- NetworkClient.InFlightRequest request = deque.peekLast();
- long timeSinceSend = now - request.sendTimeMs;
- if (timeSinceSend > requestTimeoutMs)
+ public List<String> getNodesWithTimedOutRequests(long now) {
+ List<String> nodeIds = new ArrayList<>();
+ for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>>
nodeRequests : requests.entrySet()) {
+ String nodeId = nodeRequests.getKey();
+ Deque<NetworkClient.InFlightRequest> deque =
nodeRequests.getValue();
+
+ for (NetworkClient.InFlightRequest request : deque) {
+ if (request.hasExpired(now)) {
nodeIds.add(nodeId);
+ break;
+ }
}
}
return nodeIds;
}
-
+
+ /**
+ * Return the minimum request timeout of all in flight requests or null if
there are none.
+ */
+ public Integer minTimeoutMs() {
+ if (minTimeoutMs == null && !requests.isEmpty()) {
+ int timeoutMs = Integer.MAX_VALUE;
+ for (Deque<NetworkClient.InFlightRequest> nodeRequests :
requests.values()) {
+ for (NetworkClient.InFlightRequest request : nodeRequests)
+ timeoutMs = Math.min(request.timeoutMs, timeoutMs);
+ }
+ minTimeoutMs = timeoutMs;
+ }
+ return minTimeoutMs;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 5bca261436c..0f0da00b1ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -167,4 +167,17 @@ ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> request
ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?>
requestBuilder, long createdTimeMs,
boolean expectResponse,
RequestCompletionHandler callback);
+ /**
+ * Create a new ClientRequest.
+ *
+ * @param nodeId the node to send to
+ * @param requestBuilder the request builder to use
+ * @param createdTimeMs the time in milliseconds to use as the creation
time of the request
+ * @param expectResponse true iff we expect a response
+ * @param callback the callback to invoke when we get a response
+ * @param timeoutMs the request timeout in milliseconds
+ */
+ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?>
requestBuilder, long createdTimeMs,
+ boolean expectResponse,
RequestCompletionHandler callback, int timeoutMs);
+
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 60b15980406..9a5079e00f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -87,7 +87,7 @@
private int correlation;
/* max time in ms for the producer to wait for acknowledgement from
server*/
- private final int requestTimeoutMs;
+ private final int defaultRequestTimeoutMs;
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
@@ -172,7 +172,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
- int requestTimeoutMs,
+ int defaultRequestTimeoutMs,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
@@ -196,7 +196,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.randOffset = new Random();
- this.requestTimeoutMs = requestTimeoutMs;
+ this.defaultRequestTimeoutMs = defaultRequestTimeoutMs;
this.reconnectBackoffMs = reconnectBackoffMs;
this.time = time;
this.discoverBrokerVersions = discoverBrokerVersions;
@@ -238,7 +238,7 @@ public void disconnect(String nodeId) {
List<ApiKeys> requestTypes = new ArrayList<>();
long now = time.milliseconds();
for (InFlightRequest request : inFlightRequests.clearAll(nodeId)) {
- if (request.isInternalRequest) {
+ if (request.isInternal) {
if (request.header.apiKey() == ApiKeys.METADATA.id) {
metadataUpdater.handleDisconnection(request.destination);
}
@@ -267,7 +267,7 @@ public void disconnect(String nodeId) {
public void close(String nodeId) {
selector.close(nodeId);
for (InFlightRequest request : inFlightRequests.clearAll(nodeId))
- if (request.isInternalRequest && request.header.apiKey() ==
ApiKeys.METADATA.id)
+ if (request.isInternal && request.header.apiKey() ==
ApiKeys.METADATA.id)
metadataUpdater.handleDisconnection(request.destination);
connectionStates.remove(nodeId);
}
@@ -332,10 +332,11 @@ public void send(ClientRequest request, long now) {
doSend(request, false, now);
}
- private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
+ private ClientRequest sendInternalMetadataRequest(MetadataRequest.Builder
builder,
String nodeConnectionId, long
now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId,
builder, now, true);
doSend(clientRequest, true, now);
+ return clientRequest;
}
private void doSend(ClientRequest clientRequest, boolean
isInternalRequest, long now) {
@@ -403,7 +404,8 @@ private void doSend(ClientRequest clientRequest, boolean
isInternalRequest, long
isInternalRequest,
request,
send,
- now);
+ now,
+ clientRequest.timeoutMs());
this.inFlightRequests.add(inFlightRequest);
selector.send(inFlightRequest.send);
}
@@ -411,14 +413,14 @@ private void doSend(ClientRequest clientRequest, boolean
isInternalRequest, long
/**
* Do actual reads and writes to sockets.
*
- * @param timeout The maximum amount of time to wait (in ms) for responses
if there are none immediately,
- * must be non-negative. The actual timeout will be the
minimum of timeout, request timeout and
- * metadata timeout
+ * @param timeoutMs The maximum amount of time to wait (in ms) for
responses if there are none immediately,
+ * must be non-negative. The actual timeout will be the
minimum of this parameter,
+ * the default request timeout, in-flight requests
timeouts and metadata timeout
* @param now The current time in milliseconds
* @return The list of responses received
*/
@Override
- public List<ClientResponse> poll(long timeout, long now) {
+ public List<ClientResponse> poll(long timeoutMs, long now) {
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version
exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
@@ -428,9 +430,9 @@ private void doSend(ClientRequest clientRequest, boolean
isInternalRequest, long
return responses;
}
- long metadataTimeout = metadataUpdater.maybeUpdate(now);
+ long metadataTimeoutMs = metadataUpdater.maybeUpdate(now);
try {
- this.selector.poll(Utils.min(timeout, metadataTimeout,
requestTimeoutMs));
+ this.selector.poll(selectorPollTimeoutMs(timeoutMs,
metadataTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
@@ -449,6 +451,14 @@ private void doSend(ClientRequest clientRequest, boolean
isInternalRequest, long
return responses;
}
+ private long selectorPollTimeoutMs(long pollTimeoutMs, long
metadataTimeoutMs) {
+ long result = Utils.min(pollTimeoutMs, metadataTimeoutMs,
defaultRequestTimeoutMs);
+ Integer minRequestsTimeoutMs = inFlightRequests.minTimeoutMs();
+ if (minRequestsTimeoutMs != null)
+ result = Math.min(minRequestsTimeoutMs, pollTimeoutMs);
+ return result;
+ }
+
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
@@ -594,7 +604,7 @@ private void processDisconnection(List<ClientResponse>
responses, String nodeId,
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId))
{
log.trace("Cancelled request {} with correlation id {} due to node
{} being disconnected", request.request,
request.header.correlationId(), nodeId);
- if (request.isInternalRequest && request.header.apiKey() ==
ApiKeys.METADATA.id)
+ if (request.isInternal && request.header.apiKey() ==
ApiKeys.METADATA.id)
metadataUpdater.handleDisconnection(request.destination);
else
responses.add(request.disconnected(now));
@@ -609,7 +619,7 @@ private void processDisconnection(List<ClientResponse>
responses, String nodeId,
* @param now The current time
*/
private void handleTimedOutRequests(List<ClientResponse> responses, long
now) {
- List<String> nodeIds =
this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
+ List<String> nodeIds =
this.inFlightRequests.getNodesWithTimedOutRequests(now);
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);
@@ -661,9 +671,9 @@ private void handleCompletedReceives(List<ClientResponse>
responses, long now) {
ApiKeys.forId(req.header.apiKey()),
req.header.correlationId(), responseStruct);
}
AbstractResponse body = createResponse(responseStruct, req.header);
- if (req.isInternalRequest && body instanceof MetadataResponse)
+ if (req.isInternal && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header,
now, (MetadataResponse) body);
- else if (req.isInternalRequest && body instanceof
ApiVersionsResponse)
+ else if (req.isInternal && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now,
(ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));
@@ -801,7 +811,7 @@ public boolean isUpdateDue(long now) {
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
- long waitForMetadataFetch = this.metadataFetchInProgress ?
requestTimeoutMs : 0;
+ long waitForMetadataFetch = this.metadataFetchInProgress ?
defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate,
waitForMetadataFetch);
if (metadataTimeout > 0) {
@@ -889,8 +899,7 @@ private long maybeUpdate(long now, Node node) {
log.debug("Sending metadata request {} to node {}",
metadataRequest, node);
- sendInternalMetadataRequest(metadataRequest, nodeConnectionId,
now);
- return requestTimeoutMs;
+ return sendInternalMetadataRequest(metadataRequest,
nodeConnectionId, now).timeoutMs();
}
// If there's any connection establishment underway, wait until it
completes. This prevents
@@ -926,8 +935,14 @@ public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?>
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse,
RequestCompletionHandler callback) {
- return new ClientRequest(nodeId, requestBuilder, correlation++,
clientId, createdTimeMs, expectResponse,
- callback);
+ return newClientRequest(nodeId, requestBuilder, createdTimeMs,
expectResponse, callback, defaultRequestTimeoutMs);
+ }
+
+ @Override
+ public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
+ boolean expectResponse,
RequestCompletionHandler callback, int timeoutMs) {
+ return new ClientRequest(nodeId, requestBuilder, correlation++,
clientId, createdTimeMs, timeoutMs,
+ expectResponse, callback);
}
static class InFlightRequest {
@@ -936,29 +951,36 @@ public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?>
final RequestCompletionHandler callback;
final boolean expectResponse;
final AbstractRequest request;
- final boolean isInternalRequest; // used to flag requests which are
initiated internally by NetworkClient
+ final boolean isInternal; // used to flag requests which are initiated
internally by NetworkClient
final Send send;
final long sendTimeMs;
final long createdTimeMs;
+ final int timeoutMs;
public InFlightRequest(RequestHeader header,
long createdTimeMs,
String destination,
RequestCompletionHandler callback,
boolean expectResponse,
- boolean isInternalRequest,
+ boolean isInternal,
AbstractRequest request,
Send send,
- long sendTimeMs) {
+ long sendTimeMs,
+ int timeoutMs) {
this.header = header;
this.destination = destination;
this.callback = callback;
this.expectResponse = expectResponse;
- this.isInternalRequest = isInternalRequest;
+ this.isInternal = isInternal;
this.request = request;
this.send = send;
this.sendTimeMs = sendTimeMs;
this.createdTimeMs = createdTimeMs;
+ this.timeoutMs = timeoutMs;
+ }
+
+ public boolean hasExpired(long now) {
+ return now - sendTimeMs >= timeoutMs;
}
public ClientResponse completed(AbstractResponse response, long
timeMs) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d36f711509e..5f4c220e11c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -439,10 +439,21 @@ public void onFailure(RuntimeException e) {
metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder,
this.coordinator);
- return client.send(coordinator, requestBuilder)
+ return client.send(coordinator, requestBuilder, joinRequestTimeoutMs())
.compose(new JoinGroupResponseHandler());
}
+ /**
+ * The join request blocks until the rebalance is completed, so the
request timeout should be a little higher than
+ * the rebalance timeout.
+ */
+ private int joinRequestTimeoutMs() {
+ int result = rebalanceTimeoutMs + 5000;
+ if (result < 0)
+ return Integer.MAX_VALUE;
+ return result;
+ }
+
private class JoinGroupResponseHandler extends
CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse,
RequestFuture<ByteBuffer> future) {
@@ -526,8 +537,7 @@ public void handle(JoinGroupResponse joinResponse,
RequestFuture<ByteBuffer> fut
private RequestFuture<ByteBuffer>
sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
- return client.send(coordinator, requestBuilder)
- .compose(new SyncGroupResponseHandler());
+ return client.send(coordinator, requestBuilder).compose(new
SyncGroupResponseHandler());
}
private class SyncGroupResponseHandler extends
CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@@ -711,8 +721,7 @@ public synchronized void maybeLeaveGroup() {
log.debug("Sending LeaveGroup request to coordinator {} for group
{}", coordinator, groupId);
LeaveGroupRequest.Builder request =
new LeaveGroupRequest.Builder(groupId,
generation.memberId);
- client.send(coordinator, request)
- .compose(new LeaveGroupResponseHandler());
+ client.send(coordinator, request).compose(new
LeaveGroupResponseHandler());
client.pollNoWakeup();
}
@@ -738,8 +747,7 @@ public void handle(LeaveGroupResponse leaveResponse,
RequestFuture<Void> future)
log.debug("Sending Heartbeat request for group {} to coordinator {}",
groupId, coordinator);
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(this.groupId,
this.generation.generationId, this.generation.memberId);
- return client.send(coordinator, requestBuilder)
- .compose(new HeartbeatResponseHandler());
+ return client.send(coordinator, requestBuilder).compose(new
HeartbeatResponseHandler());
}
private class HeartbeatResponseHandler extends
CoordinatorResponseHandler<HeartbeatResponse, Void> {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 84e9a812f52..4044fd9a050 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -60,7 +60,7 @@
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
- private final long unsentExpiryMs;
+ private final int unsentExpiryMs;
private final AtomicBoolean wakeupDisabled = new AtomicBoolean();
// when requests complete, they are transferred to this queue prior to
invocation. The purpose
@@ -75,12 +75,16 @@ public ConsumerNetworkClient(KafkaClient client,
Metadata metadata,
Time time,
long retryBackoffMs,
- long requestTimeoutMs) {
+ int defaultRequestTimeoutMs) {
this.client = client;
this.metadata = metadata;
this.time = time;
this.retryBackoffMs = retryBackoffMs;
- this.unsentExpiryMs = requestTimeoutMs;
+ this.unsentExpiryMs = defaultRequestTimeoutMs;
+ }
+
+ public RequestFuture<ClientResponse> send(Node node,
AbstractRequest.Builder<?> requestBuilder) {
+ return doSend(node, requestBuilder, null);
}
/**
@@ -95,13 +99,22 @@ public ConsumerNetworkClient(KafkaClient client,
* @param requestBuilder A builder for the request payload
* @return A future which indicates the result of the send.
*/
- public RequestFuture<ClientResponse> send(Node node,
AbstractRequest.Builder<?> requestBuilder) {
+ public RequestFuture<ClientResponse> send(Node node,
AbstractRequest.Builder<?> requestBuilder, int requestTimeoutMs) {
+ return doSend(node, requestBuilder, requestTimeoutMs);
+ }
+
+ private RequestFuture<ClientResponse> doSend(Node node,
AbstractRequest.Builder<?> requestBuilder,
+ Integer requestTimeoutMs) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new
RequestFutureCompletionHandler();
- ClientRequest clientRequest = client.newClientRequest(node.idString(),
requestBuilder, now, true,
- completionHandler);
+ ClientRequest clientRequest;
+ if (requestTimeoutMs == null)
+ clientRequest = client.newClientRequest(node.idString(),
requestBuilder, now, true,
+ completionHandler);
+ else
+ clientRequest = client.newClientRequest(node.idString(),
requestBuilder, now, true,
+ completionHandler, requestTimeoutMs);
unsent.put(node, clientRequest);
-
// wakeup the client in case it is blocking in poll so that we can
send the queued request
client.wakeup();
return completionHandler.future;
@@ -227,7 +240,7 @@ public void poll(long timeout, long now, PollCondition
pollCondition, boolean di
// handler), the client will be woken up.
if (pollCondition == null || pollCondition.shouldBlock()) {
// if there are no requests in flight, do not block longer
than the retry backoff
- if (client.inFlightRequestCount() == 0)
+ if (!client.hasInFlightRequests())
timeout = Math.min(timeout, retryBackoffMs);
client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now = time.milliseconds();
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 29fae94f73d..d273d3570d2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -62,6 +62,8 @@ public FutureResponse(AbstractResponse responseBody, boolean
disconnected, Reque
}
+ private static final int DEFAUL_TIMEOUT_MS = 30_000;
+
private final Time time;
private final Metadata metadata;
private Set<String> unavailableTopics;
@@ -350,8 +352,14 @@ public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?>
@Override
public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse,
RequestCompletionHandler callback) {
+ return newClientRequest(nodeId, requestBuilder, createdTimeMs,
expectResponse, callback, DEFAUL_TIMEOUT_MS);
+ }
+
+ @Override
+ public ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
+ boolean expectResponse,
RequestCompletionHandler callback, int timeoutMs) {
return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId",
createdTimeMs,
- expectResponse, callback);
+ timeoutMs, expectResponse, callback);
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 77960e1123d..663c2e39ff0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -255,7 +255,9 @@ public void testDisconnectDuringUserMetadataRequest() {
long now = time.milliseconds();
ClientRequest request = client.newClientRequest(node.idString(),
builder, now, true);
client.send(request, now);
- client.poll(requestTimeoutMs, now);
+ // MockSelector.poll increments MockTime by timeoutMs, so we pass a
value lower than the request timeout to
+ // avoid expiring the request
+ client.poll(requestTimeoutMs - 1, now);
assertEquals(1, client.inFlightRequestCount(node.idString()));
assertTrue(client.hasInFlightRequests(node.idString()));
assertTrue(client.hasInFlightRequests());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 219c3f69ad7..2f80e7aa1e7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1608,7 +1608,7 @@ private FetchResponse fetchResponse(TopicPartition
partition, long fetchOffset,
String groupId = "mock-group";
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
- long requestTimeoutMs = 30000;
+ int requestTimeoutMs = 30000;
boolean excludeInternalTopics = true;
int minBytes = 1;
int maxBytes = Integer.MAX_VALUE;
@@ -1628,7 +1628,8 @@ private FetchResponse fetchResponse(TopicPartition
partition, long fetchOffset,
ConsumerMetrics metricsRegistry = new
ConsumerMetrics(metricGroupPrefix);
SubscriptionState subscriptions = new
SubscriptionState(autoResetStrategy);
- ConsumerNetworkClient consumerClient = new
ConsumerNetworkClient(client, metadata, time, retryBackoffMs, requestTimeoutMs);
+ ConsumerNetworkClient consumerClient = new
ConsumerNetworkClient(client, metadata, time, retryBackoffMs,
+ requestTimeoutMs);
ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(
consumerClient,
groupId,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 8a934398928..b5ca068a7a8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -57,7 +57,7 @@
private static final int SESSION_TIMEOUT_MS = 10000;
private static final int HEARTBEAT_INTERVAL_MS = 3000;
private static final long RETRY_BACKOFF_MS = 100;
- private static final long REQUEST_TIMEOUT_MS = 40000;
+ private static final int REQUEST_TIMEOUT_MS = 40000;
private static final String GROUP_ID = "dummy-group";
private static final String METRIC_GROUP_PREFIX = "consumer";
@@ -74,8 +74,8 @@ public void setupCoordinator() {
this.mockClient = new MockClient(mockTime);
Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true);
- this.consumerClient = new ConsumerNetworkClient(mockClient, metadata,
mockTime,
- RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS);
+ this.consumerClient = new ConsumerNetworkClient(mockClient, metadata,
mockTime, RETRY_BACKOFF_MS,
+ REQUEST_TIMEOUT_MS);
Metrics metrics = new Metrics();
Cluster cluster = TestUtils.singletonCluster("topic", 1);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index b46b65746b2..d623f33c3db 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -107,7 +107,7 @@ public void blockWhenPollConditionNotSatisfied() {
NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
ConsumerNetworkClient consumerClient = new
ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000);
- EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1);
+
EasyMock.expect(mockNetworkClient.hasInFlightRequests()).andReturn(true);
EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout),
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
EasyMock.replay(mockNetworkClient);
@@ -127,9 +127,10 @@ public void blockOnlyForRetryBackoffIfNoInflightRequests()
{
long retryBackoffMs = 100L;
NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
- ConsumerNetworkClient consumerClient = new
ConsumerNetworkClient(mockNetworkClient, metadata, time, retryBackoffMs, 1000L);
+ ConsumerNetworkClient consumerClient = new
ConsumerNetworkClient(mockNetworkClient, metadata, time,
+ retryBackoffMs, 1000);
- EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0);
+
EasyMock.expect(mockNetworkClient.hasInFlightRequests()).andReturn(false);
EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs),
EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
EasyMock.replay(mockNetworkClient);
@@ -166,7 +167,7 @@ public void testAwaitForMetadataUpdateWithTimeout() {
@Test
public void sendExpiry() throws InterruptedException {
- long unsentExpiryMs = 10;
+ int unsentExpiryMs = 10;
final AtomicBoolean isReady = new AtomicBoolean();
final AtomicBoolean disconnected = new AtomicBoolean();
client = new MockClient(time) {
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala
b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7bd626a6319..9be7dd5716a 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -473,7 +473,7 @@ object AdminClient {
metadata,
time,
retryBackoffMs,
- requestTimeoutMs.toLong)
+ requestTimeoutMs)
new AdminClient(
time,
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index c6ebdd17c36..f76da17e616 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -58,7 +58,7 @@ class InterBrokerSendThreadTest {
override def generateRequests() =
List[RequestAndCompletionHandler](handler)
}
- val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
handler.handler)
+ val clientRequest = new ClientRequest("dest", request, 0, "1", 0, 10,
true, handler.handler)
EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
EasyMock.same(handler.request),
@@ -92,7 +92,7 @@ class InterBrokerSendThreadTest {
override def generateRequests() =
List[RequestAndCompletionHandler](requestAndCompletionHandler)
}
- val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
requestAndCompletionHandler.handler)
+ val clientRequest = new ClientRequest("dest", request, 0, "1", 0, 10,
true, requestAndCompletionHandler.handler)
EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
EasyMock.same(requestAndCompletionHandler.request),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Join request's timeout should be slightly higher than the rebalance timeout
> ---------------------------------------------------------------------------
>
> Key: KAFKA-5570
> URL: https://issues.apache.org/jira/browse/KAFKA-5570
> Project: Kafka
> Issue Type: Improvement
> Reporter: Ismael Juma
> Assignee: Ismael Juma
> Priority: Major
>
> This is necessary because the join request blocks until the rebalance
> completes. The join request currently uses the same timeout as other consumer
> requests and this is the reason why the default request timeout for the
> consumer is 305 seconds (while the default rebalance timeout is 300 seconds).
> This JIRA is related to KAFKA-4237, but it doesn't include the part where we
> change the default request timeout for the consumer (as that has additional
> compatibility implications and requires a KIP).
> In order to implement this, we need to add to the NetworkClient the ability
> to do per request timeouts.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)