dajac commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1100295718


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -447,6 +430,33 @@ private RequestFuture<ClientResponse> 
sendMetadataRequest(MetadataRequest.Builde
             return client.send(node, request);
     }
 
+    /**
+     * Send Fetch Request to Kafka cluster asynchronously.
+     *
+     * This method is visible for testing.
+     *
+     * @return A future that indicates result of sent Fetch request
+     */
+    private RequestFuture<ClientResponse> sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,
+                                                         final Node 
fetchTarget) {

Review Comment:
   nit: Indentation is off here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -2425,17 +2426,38 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
         return clusterResourceListeners;
     }
 
-    private void close(long timeoutMs, boolean swallowException) {
+    private Timer createTimerForRequest(final Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time localTime = (time == null) ? Time.SYSTEM : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        try {
-            if (coordinator != null)
-                coordinator.close(time.timer(Math.min(timeoutMs, 
requestTimeoutMs)));
-        } catch (Throwable t) {
-            firstException.compareAndSet(null, t);
-            log.error("Failed to close coordinator", t);
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        // Close objects with a timeout. The timeout is required because the 
coordinator & the fetcher send requests to
+        // the server in the process of closing which may not respect the 
overall timeout defined for closing the
+        // consumer.
+        if (coordinator != null) {
+            // This is a blocking call bound by the time remaining in 
closeTimer
+            LambdaUtils.swallow(() -> coordinator.close(closeTimer), 
firstException);

Review Comment:
   We have `closeQuietly` in `Utils` that is pretty close to this one. I think 
that `swallow` should be move there as well. We don't need another `*Utils` 
class.



##########
clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Lambda helpers.
+ */
+@FunctionalInterface
+public interface LambdaUtils {
+    /**
+     * Run some code, possibly throw some exceptions.
+     *
+     * @throws Exception
+     */
+    void run() throws Exception;
+
+    /**
+     * Provide an idempotent instance of the supplied code - ensure that the 
supplied code gets run only once, no
+     * matter how many times .run() is called.
+     */
+    static Runnable idempotent(final Runnable code) {

Review Comment:
   Is this one used anywhere? I can't find it.



##########
clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Lambda helpers.
+ */
+@FunctionalInterface
+public interface LambdaUtils {
+    /**
+     * Run some code, possibly throw some exceptions.
+     *
+     * @throws Exception
+     */
+    void run() throws Exception;
+
+    /**
+     * Provide an idempotent instance of the supplied code - ensure that the 
supplied code gets run only once, no
+     * matter how many times .run() is called.
+     */
+    static Runnable idempotent(final Runnable code) {
+        return new Runnable() {
+            boolean run = false;
+
+            public void run() {
+                if (run)
+                    return;
+
+                run = true;
+                code.run();
+            }
+        };
+    }
+
+    /**
+     * Run the supplied code. If an exception is thrown, it is swallowed and 
registered to the firstException parameter.
+     */
+    static void swallow(final LambdaUtils code, final 
AtomicReference<Throwable> firstException) {
+        if (code != null) {
+            try {
+                code.run();
+            } catch (Exception t) {
+                firstException.compareAndSet(null, t);
+            }
+        }
+    }
+
+    static RuntimeException wrap(final Exception ex) {
+        return ex instanceof RuntimeException ? (RuntimeException) ex : new 
RuntimeException(ex);
+    }
+
+    /**
+     * Run the supplied callable, wrapping non-runtime exceptions in runtime 
exceptions.
+     */
+    static <T> T wrapThrow(final Callable<T> code) {

Review Comment:
   Same question for this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to