dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1044643791


##########
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java:
##########
@@ -590,6 +595,14 @@ public boolean handleResponse(FetchResponse response, 
short version) {
         }
     }
 
+    /**
+     * The client will initiate the session close on next fetch request.
+     */
+    public void notifyClose() {
+        log.info("Set the metadata for next fetch request to close the 
existing session ID={}", nextMetadata.sessionId());

Review Comment:
   nit: Should we lower it to debug to be consistent with the other logs in 
KafkaConsumer?



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java:
##########
@@ -114,9 +114,21 @@ public boolean equals(Object o) {
     }
 
     /**
-     * Return the metadata for the next error response.
+     * Return the metadata for the next request. The metadata is set to 
indicate that the client wants to close the
+     * existing session.
      */
     public FetchMetadata nextCloseExisting() {
+        return new FetchMetadata(sessionId, FINAL_EPOCH);
+    }
+
+    /**
+     * Return the metadata for the next request.

Review Comment:
   Should we just just say `The metadata is set to indicate that the client 
wants to close the existing session and create a new one if possible.`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1943,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {
+            final FetchSessionHandler sessionHandler = entry.getValue();
+            // set the session handler to notify close. This will set the next 
metadata request to send close message.
+            sessionHandler.notifyClose();
+
+            final int sessionId = sessionHandler.sessionId();
+            final Integer fetchTargetNodeId = entry.getKey();
+            // FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+            // skip sending the close request.
+            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+            if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+                log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+                continue;
+            }
+
+            final RequestFuture<ClientResponse> responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+                @Override
+                public void onSuccess(ClientResponse value) {
+                    log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    log.debug("Unable to a close message for fetch session: {} 
to node: {}. " +
+                        "This may result in unnecessary fetch sessions at the 
broker.", sessionId, fetchTarget, e);
+                }
+            });
+
+            requestFutures.add(responseFuture);
+        }
+
+        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+        // all requests have received a response.
+        do {
+            client.poll(timer, null, true);
+        } while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone));
+
+        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+            // we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
+            // here.
+            log.debug("All requests couldn't be sent in the specific timeout 
period {}ms. " +
+                "This may result in unnecessary fetch sessions at the broker. 
Consider increasing the timeout passed for " +
+                "KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
+        }
+    }
+
+    public void close(final Timer timer) {
+        if (!isClosed.compareAndSet(false, true)) {
+            log.info("Fetcher {} is already closed.", this);
+            return;
+        }
+
+        // Shared states (e.g. sessionHandlers) could be accessed by multiple 
threads (such as heartbeat thread), hence,
+        // it is necessary to acquire a lock on the fetcher instance before 
modifying the states.
+        synchronized (Fetcher.this) {
+            // we do not need to re-enable wakeups since we are closing already
+            client.disableWakeups();

Review Comment:
   Could you elaborate on why we need this here? It is something that we did 
not have before.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1571,7 +1575,8 @@ public void testGracefulClose() throws Exception {
         response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
         LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()));
-        consumerCloseTest(5000, Arrays.asList(commitResponse, 
leaveGroupResponse), 0, false);
+        FetchResponse closeResponse = FetchResponse.of(Errors.NONE, 0, 
INVALID_SESSION_ID, new LinkedHashMap<>());
+        consumerCloseTest(5000, Arrays.asList(commitResponse, 
leaveGroupResponse, closeResponse), 0, false);

Review Comment:
   nit: should we also add a test where close response is not received?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1539,30 +1543,30 @@ public void testOffsetOfPausedPartitions() {
 
         client.requests().clear();
         consumer.unsubscribe();
-        consumer.close();
+        consumer.close(Duration.ZERO);
     }
 
     @Test
     public void testPollWithNoSubscription() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) 
null)) {
-            assertThrows(IllegalStateException.class, () -> 
consumer.poll(Duration.ZERO));
-        }
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
+        assertThrows(IllegalStateException.class, () -> 
consumer.poll(Duration.ZERO));
+        consumer.close(Duration.ZERO);
     }
 
     @Test
     public void testPollWithEmptySubscription() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
-            consumer.subscribe(Collections.emptyList());
-            assertThrows(IllegalStateException.class, () -> 
consumer.poll(Duration.ZERO));
-        }
+        KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
+        consumer.subscribe(Collections.emptyList());
+        assertThrows(IllegalStateException.class, () -> 
consumer.poll(Duration.ZERO));
+        consumer.close(Duration.ZERO);
     }
 
     @Test
     public void testPollWithEmptyUserAssignment() {
-        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {

Review Comment:
   Are we loosing anything by removing all those try resources? It basically 
means that the consumer is not closed in case of an exception.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -1600,9 +1605,8 @@ public void testCloseInterrupt() throws Exception {
     @Test
     public void testCloseShouldBeIdempotent() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
-        consumer.close();
-        consumer.close();
-        consumer.close();

Review Comment:
   Is the third one not necessary?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -5171,27 +5222,27 @@ private <K, V> void buildFetcher(MetricConfig 
metricConfig,
                                      SubscriptionState subscriptionState,
                                      LogContext logContext) {
         buildDependencies(metricConfig, metadataExpireMs, subscriptionState, 
logContext);
-        fetcher = new Fetcher<>(
-                new LogContext(),
-                consumerClient,
-                minBytes,
-                maxBytes,
-                maxWaitMs,
-                fetchSize,
-                maxPollRecords,
-                true, // check crc
-                "",
-                keyDeserializer,
-                valueDeserializer,
-                metadata,
-                subscriptions,
-                metrics,
-                metricsRegistry,
-                time,
-                retryBackoffMs,
-                requestTimeoutMs,
-                isolationLevel,
-                apiVersions);
+        fetcher = spy(new Fetcher<>(
+                        new LogContext(),
+                        consumerClient,
+                        minBytes,

Review Comment:
   If we are doing this, could we adopt the new style directly:
   
   ```
   new Fetcher(
       a,
       b,
       ....
   );



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1941,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {

Review Comment:
   It seems that his was not addressed.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1943,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {
+            final FetchSessionHandler sessionHandler = entry.getValue();
+            // set the session handler to notify close. This will set the next 
metadata request to send close message.
+            sessionHandler.notifyClose();
+
+            final int sessionId = sessionHandler.sessionId();
+            final Integer fetchTargetNodeId = entry.getKey();
+            // FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+            // skip sending the close request.
+            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+            if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+                log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+                continue;
+            }
+
+            final RequestFuture<ClientResponse> responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+                @Override
+                public void onSuccess(ClientResponse value) {
+                    log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    log.debug("Unable to a close message for fetch session: {} 
to node: {}. " +
+                        "This may result in unnecessary fetch sessions at the 
broker.", sessionId, fetchTarget, e);
+                }
+            });
+
+            requestFutures.add(responseFuture);
+        }
+
+        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+        // all requests have received a response.
+        do {
+            client.poll(timer, null, true);
+        } while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone));
+
+        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+            // we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
+            // here.
+            log.debug("All requests couldn't be sent in the specific timeout 
period {}ms. " +
+                "This may result in unnecessary fetch sessions at the broker. 
Consider increasing the timeout passed for " +
+                "KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
+        }
+    }
+
+    public void close(final Timer timer) {
+        if (!isClosed.compareAndSet(false, true)) {
+            log.info("Fetcher {} is already closed.", this);
+            return;
+        }
+
+        // Shared states (e.g. sessionHandlers) could be accessed by multiple 
threads (such as heartbeat thread), hence,
+        // it is necessary to acquire a lock on the fetcher instance before 
modifying the states.

Review Comment:
   I am trying to understand how the heartbeat thread interacts with the 
fetcher here. Could you elaborate? Also, not that the heartbeat thread is 
stopped at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to