dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1044643791
##########
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java:
##########
@@ -590,6 +595,14 @@ public boolean handleResponse(FetchResponse response,
short version) {
}
}
+ /**
+ * The client will initiate the session close on next fetch request.
+ */
+ public void notifyClose() {
+ log.info("Set the metadata for next fetch request to close the
existing session ID={}", nextMetadata.sessionId());
Review Comment:
nit: Should we lower it to debug to be consistent with the other logs in
KafkaConsumer?
##########
clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java:
##########
@@ -114,9 +114,21 @@ public boolean equals(Object o) {
}
/**
- * Return the metadata for the next error response.
+ * Return the metadata for the next request. The metadata is set to
indicate that the client wants to close the
+ * existing session.
*/
public FetchMetadata nextCloseExisting() {
+ return new FetchMetadata(sessionId, FINAL_EPOCH);
+ }
+
+ /**
+ * Return the metadata for the next request.
Review Comment:
Should we just just say `The metadata is set to indicate that the client
wants to close the existing session and create a new one if possible.`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1943,79 @@ private Map<String, String>
topicPartitionTags(TopicPartition tp) {
}
}
+ // Visible for testing
+ void maybeCloseFetchSessions(final Timer timer) {
+ final Cluster cluster = metadata.fetch();
+ final List<RequestFuture<ClientResponse>> requestFutures = new
ArrayList<>();
+ for (final Map.Entry<Integer, FetchSessionHandler> entry :
sessionHandlers.entrySet()) {
+ final FetchSessionHandler sessionHandler = entry.getValue();
+ // set the session handler to notify close. This will set the next
metadata request to send close message.
+ sessionHandler.notifyClose();
+
+ final int sessionId = sessionHandler.sessionId();
+ final Integer fetchTargetNodeId = entry.getKey();
+ // FetchTargetNode may not be available as it may have
disconnected the connection. In such cases, we will
+ // skip sending the close request.
+ final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+ if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+ log.debug("Skip sending close session request to broker {}
since it is not reachable", fetchTarget);
+ continue;
+ }
+
+ final RequestFuture<ClientResponse> responseFuture =
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+ responseFuture.addListener(new
RequestFutureListener<ClientResponse>() {
+ @Override
+ public void onSuccess(ClientResponse value) {
+ log.debug("Successfully sent a close message for fetch
session: {} to node: {}", sessionId, fetchTarget);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ log.debug("Unable to a close message for fetch session: {}
to node: {}. " +
+ "This may result in unnecessary fetch sessions at the
broker.", sessionId, fetchTarget, e);
+ }
+ });
+
+ requestFutures.add(responseFuture);
+ }
+
+ // Poll to ensure that request has been written to the socket. Wait
until either the timer has expired or until
+ // all requests have received a response.
+ do {
+ client.poll(timer, null, true);
+ } while (timer.notExpired() &&
!requestFutures.stream().allMatch(RequestFuture::isDone));
+
+ if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+ // we ran out of time before completing all futures. It is ok
since we don't want to block the shutdown
+ // here.
+ log.debug("All requests couldn't be sent in the specific timeout
period {}ms. " +
+ "This may result in unnecessary fetch sessions at the broker.
Consider increasing the timeout passed for " +
+ "KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
+ }
+ }
+
+ public void close(final Timer timer) {
+ if (!isClosed.compareAndSet(false, true)) {
+ log.info("Fetcher {} is already closed.", this);
+ return;
+ }
+
+ // Shared states (e.g. sessionHandlers) could be accessed by multiple
threads (such as heartbeat thread), hence,
+ // it is necessary to acquire a lock on the fetcher instance before
modifying the states.
+ synchronized (Fetcher.this) {
+ // we do not need to re-enable wakeups since we are closing already
+ client.disableWakeups();
Review Comment:
Could you elaborate on why we need this here? It is something that we did
not have before.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1571,7 +1575,8 @@ public void testGracefulClose() throws Exception {
response.put(tp0, Errors.NONE);
OffsetCommitResponse commitResponse = offsetCommitResponse(response);
LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()));
- consumerCloseTest(5000, Arrays.asList(commitResponse,
leaveGroupResponse), 0, false);
+ FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0,
INVALID_SESSION_ID, new LinkedHashMap<>());
+ consumerCloseTest(5000, Arrays.asList(commitResponse,
leaveGroupResponse, closeResponse), 0, false);
Review Comment:
nit: should we also add a test where close response is not received?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1539,30 +1543,30 @@ public void testOffsetOfPausedPartitions() {
client.requests().clear();
consumer.unsubscribe();
- consumer.close();
+ consumer.close(Duration.ZERO);
}
@Test
public void testPollWithNoSubscription() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String)
null)) {
- assertThrows(IllegalStateException.class, () ->
consumer.poll(Duration.ZERO));
- }
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
+ assertThrows(IllegalStateException.class, () ->
consumer.poll(Duration.ZERO));
+ consumer.close(Duration.ZERO);
}
@Test
public void testPollWithEmptySubscription() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
- consumer.subscribe(Collections.emptyList());
- assertThrows(IllegalStateException.class, () ->
consumer.poll(Duration.ZERO));
- }
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
+ consumer.subscribe(Collections.emptyList());
+ assertThrows(IllegalStateException.class, () ->
consumer.poll(Duration.ZERO));
+ consumer.close(Duration.ZERO);
}
@Test
public void testPollWithEmptyUserAssignment() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
Review Comment:
Are we loosing anything by removing all those try resources? It basically
means that the consumer is not closed in case of an exception.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1600,9 +1605,8 @@ public void testCloseInterrupt() throws Exception {
@Test
public void testCloseShouldBeIdempotent() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
- consumer.close();
- consumer.close();
- consumer.close();
Review Comment:
Is the third one not necessary?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -5171,27 +5222,27 @@ private <K, V> void buildFetcher(MetricConfig
metricConfig,
SubscriptionState subscriptionState,
LogContext logContext) {
buildDependencies(metricConfig, metadataExpireMs, subscriptionState,
logContext);
- fetcher = new Fetcher<>(
- new LogContext(),
- consumerClient,
- minBytes,
- maxBytes,
- maxWaitMs,
- fetchSize,
- maxPollRecords,
- true, // check crc
- "",
- keyDeserializer,
- valueDeserializer,
- metadata,
- subscriptions,
- metrics,
- metricsRegistry,
- time,
- retryBackoffMs,
- requestTimeoutMs,
- isolationLevel,
- apiVersions);
+ fetcher = spy(new Fetcher<>(
+ new LogContext(),
+ consumerClient,
+ minBytes,
Review Comment:
If we are doing this, could we adopt the new style directly:
```
new Fetcher(
a,
b,
....
);
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1941,79 @@ private Map<String, String>
topicPartitionTags(TopicPartition tp) {
}
}
+ // Visible for testing
+ void maybeCloseFetchSessions(final Timer timer) {
+ final Cluster cluster = metadata.fetch();
+ final List<RequestFuture<ClientResponse>> requestFutures = new
ArrayList<>();
+ for (final Map.Entry<Integer, FetchSessionHandler> entry :
sessionHandlers.entrySet()) {
Review Comment:
It seems that his was not addressed.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1943,79 @@ private Map<String, String>
topicPartitionTags(TopicPartition tp) {
}
}
+ // Visible for testing
+ void maybeCloseFetchSessions(final Timer timer) {
+ final Cluster cluster = metadata.fetch();
+ final List<RequestFuture<ClientResponse>> requestFutures = new
ArrayList<>();
+ for (final Map.Entry<Integer, FetchSessionHandler> entry :
sessionHandlers.entrySet()) {
+ final FetchSessionHandler sessionHandler = entry.getValue();
+ // set the session handler to notify close. This will set the next
metadata request to send close message.
+ sessionHandler.notifyClose();
+
+ final int sessionId = sessionHandler.sessionId();
+ final Integer fetchTargetNodeId = entry.getKey();
+ // FetchTargetNode may not be available as it may have
disconnected the connection. In such cases, we will
+ // skip sending the close request.
+ final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+ if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+ log.debug("Skip sending close session request to broker {}
since it is not reachable", fetchTarget);
+ continue;
+ }
+
+ final RequestFuture<ClientResponse> responseFuture =
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+ responseFuture.addListener(new
RequestFutureListener<ClientResponse>() {
+ @Override
+ public void onSuccess(ClientResponse value) {
+ log.debug("Successfully sent a close message for fetch
session: {} to node: {}", sessionId, fetchTarget);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ log.debug("Unable to a close message for fetch session: {}
to node: {}. " +
+ "This may result in unnecessary fetch sessions at the
broker.", sessionId, fetchTarget, e);
+ }
+ });
+
+ requestFutures.add(responseFuture);
+ }
+
+ // Poll to ensure that request has been written to the socket. Wait
until either the timer has expired or until
+ // all requests have received a response.
+ do {
+ client.poll(timer, null, true);
+ } while (timer.notExpired() &&
!requestFutures.stream().allMatch(RequestFuture::isDone));
+
+ if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+ // we ran out of time before completing all futures. It is ok
since we don't want to block the shutdown
+ // here.
+ log.debug("All requests couldn't be sent in the specific timeout
period {}ms. " +
+ "This may result in unnecessary fetch sessions at the broker.
Consider increasing the timeout passed for " +
+ "KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
+ }
+ }
+
+ public void close(final Timer timer) {
+ if (!isClosed.compareAndSet(false, true)) {
+ log.info("Fetcher {} is already closed.", this);
+ return;
+ }
+
+ // Shared states (e.g. sessionHandlers) could be accessed by multiple
threads (such as heartbeat thread), hence,
+ // it is necessary to acquire a lock on the fetcher instance before
modifying the states.
Review Comment:
I am trying to understand how the heartbeat thread interacts with the
fetcher here. Could you elaborate? Also, not that the heartbeat thread is
stopped at this point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]