dajac commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r1100295718
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ########## @@ -447,6 +430,33 @@ private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builde return client.send(node, request); } + /** + * Send Fetch Request to Kafka cluster asynchronously. + * + * This method is visible for testing. + * + * @return A future that indicates result of sent Fetch request + */ + private RequestFuture<ClientResponse> sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, + final Node fetchTarget) { Review Comment: nit: Indentation is off here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ########## @@ -2425,17 +2426,38 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< return clusterResourceListeners; } - private void close(long timeoutMs, boolean swallowException) { + private Timer createTimerForRequest(final Duration timeout) { + // this.time could be null if an exception occurs in constructor prior to setting the this.time field + final Time localTime = (time == null) ? Time.SYSTEM : time; + return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); + } + + private void close(Duration timeout, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference<Throwable> firstException = new AtomicReference<>(); - try { - if (coordinator != null) - coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close coordinator", t); + + final Timer closeTimer = createTimerForRequest(timeout); + // Close objects with a timeout. The timeout is required because the coordinator & the fetcher send requests to + // the server in the process of closing which may not respect the overall timeout defined for closing the + // consumer. + if (coordinator != null) { + // This is a blocking call bound by the time remaining in closeTimer + LambdaUtils.swallow(() -> coordinator.close(closeTimer), firstException); Review Comment: We have `closeQuietly` in `Utils` that is pretty close to this one. I think that `swallow` should be move there as well. We don't need another `*Utils` class. ########## clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Lambda helpers. + */ +@FunctionalInterface +public interface LambdaUtils { + /** + * Run some code, possibly throw some exceptions. + * + * @throws Exception + */ + void run() throws Exception; + + /** + * Provide an idempotent instance of the supplied code - ensure that the supplied code gets run only once, no + * matter how many times .run() is called. + */ + static Runnable idempotent(final Runnable code) { Review Comment: Is this one used anywhere? I can't find it. ########## clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Lambda helpers. + */ +@FunctionalInterface +public interface LambdaUtils { + /** + * Run some code, possibly throw some exceptions. + * + * @throws Exception + */ + void run() throws Exception; + + /** + * Provide an idempotent instance of the supplied code - ensure that the supplied code gets run only once, no + * matter how many times .run() is called. + */ + static Runnable idempotent(final Runnable code) { + return new Runnable() { + boolean run = false; + + public void run() { + if (run) + return; + + run = true; + code.run(); + } + }; + } + + /** + * Run the supplied code. If an exception is thrown, it is swallowed and registered to the firstException parameter. + */ + static void swallow(final LambdaUtils code, final AtomicReference<Throwable> firstException) { + if (code != null) { + try { + code.run(); + } catch (Exception t) { + firstException.compareAndSet(null, t); + } + } + } + + static RuntimeException wrap(final Exception ex) { + return ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex); + } + + /** + * Run the supplied callable, wrapping non-runtime exceptions in runtime exceptions. + */ + static <T> T wrapThrow(final Callable<T> code) { Review Comment: Same question for this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org