dajac commented on code in PR #13856:
URL: https://github.com/apache/kafka/pull/13856#discussion_r1233637338


##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * An inter-broker send thread that utilizes a non-blocking network client.
+ */
+public abstract class InterBrokerSendThread extends ShutdownableThread {
+
+    protected volatile KafkaClient networkClient;
+
+    private final int requestTimeoutMs;
+    private final Time time;
+    private final UnsentRequests unsentRequests;
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time
+    ) {
+        this(name, networkClient, requestTimeoutMs, time, true);
+    }
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time,
+        boolean isInterruptible
+    ) {
+        super(name, isInterruptible);
+        this.networkClient = networkClient;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.time = time;
+        this.unsentRequests = new UnsentRequests();
+    }
+
+    public abstract Collection<RequestAndCompletionHandler> generateRequests();
+
+    public boolean hasUnsentRequests() {
+        return unsentRequests.iterator().hasNext();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        initiateShutdown();
+        networkClient.initiateClose();
+        awaitShutdown();
+        try {
+            networkClient.close();
+        } catch (IOException e) {

Review Comment:
   I was a bit surprised by this one. What's the reasoning here? I was 
wondering if we should just use `closeQuietly`. What do you think?



##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * An inter-broker send thread that utilizes a non-blocking network client.
+ */
+public abstract class InterBrokerSendThread extends ShutdownableThread {
+
+    protected volatile KafkaClient networkClient;
+
+    private final int requestTimeoutMs;
+    private final Time time;
+    private final UnsentRequests unsentRequests;
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time
+    ) {
+        this(name, networkClient, requestTimeoutMs, time, true);
+    }
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time,
+        boolean isInterruptible
+    ) {
+        super(name, isInterruptible);
+        this.networkClient = networkClient;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.time = time;
+        this.unsentRequests = new UnsentRequests();
+    }
+
+    public abstract Collection<RequestAndCompletionHandler> generateRequests();
+
+    public boolean hasUnsentRequests() {
+        return unsentRequests.iterator().hasNext();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        initiateShutdown();
+        networkClient.initiateClose();
+        awaitShutdown();
+        try {
+            networkClient.close();
+        } catch (IOException e) {
+            // Not expected - currently no Kafka client implementations throw 
on close()
+            log.error("exception while trying to close Kafka client", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void drainGeneratedRequests() {
+        generateRequests().forEach(request ->
+            unsentRequests.put(
+                request.destination,
+                networkClient.newClientRequest(
+                    request.destination.idString(),
+                    request.request,
+                    request.creationTimeMs,
+                    true,
+                    requestTimeoutMs,
+                    request.handler
+                )));

Review Comment:
   nit: You mix two code styles here. If you put the closing parenthesis on a 
new line like here, you should do it for all of them.
   
   ```
           generateRequests().forEach(request ->
               unsentRequests.put(
                   request.destination,
                   networkClient.newClientRequest(
                       request.destination.idString(),
                       request.request,
                       request.creationTimeMs,
                       true,
                       requestTimeoutMs,
                       request.handler
                   )
               )
           );
   ```
   
   Alternatively, you can keep all the closing parenthesis after 
`request.handler`. I personally prefer the former style but I leave it up to 
you.



##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * An inter-broker send thread that utilizes a non-blocking network client.
+ */
+public abstract class InterBrokerSendThread extends ShutdownableThread {
+
+    protected volatile KafkaClient networkClient;
+
+    private final int requestTimeoutMs;
+    private final Time time;
+    private final UnsentRequests unsentRequests;
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time
+    ) {
+        this(name, networkClient, requestTimeoutMs, time, true);
+    }
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time,
+        boolean isInterruptible
+    ) {
+        super(name, isInterruptible);
+        this.networkClient = networkClient;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.time = time;
+        this.unsentRequests = new UnsentRequests();
+    }
+
+    public abstract Collection<RequestAndCompletionHandler> generateRequests();
+
+    public boolean hasUnsentRequests() {
+        return unsentRequests.iterator().hasNext();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        initiateShutdown();
+        networkClient.initiateClose();
+        awaitShutdown();
+        try {
+            networkClient.close();
+        } catch (IOException e) {
+            // Not expected - currently no Kafka client implementations throw 
on close()
+            log.error("exception while trying to close Kafka client", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void drainGeneratedRequests() {
+        generateRequests().forEach(request ->
+            unsentRequests.put(
+                request.destination,
+                networkClient.newClientRequest(
+                    request.destination.idString(),
+                    request.request,
+                    request.creationTimeMs,
+                    true,
+                    requestTimeoutMs,
+                    request.handler
+                )));
+    }
+
+    protected void pollOnce(long maxTimeoutMs) {
+        try {
+            drainGeneratedRequests();
+            long now = time.milliseconds();
+            final long timeout = sendRequests(now, maxTimeoutMs);
+            networkClient.poll(timeout, now);
+            now = time.milliseconds();
+            checkDisconnects(now);
+            failExpiredRequests(now);
+            unsentRequests.clean();
+        } catch (FatalExitError fee) {
+            throw fee;
+        } catch (Throwable t) {
+            if (t instanceof DisconnectException && !networkClient.active()) {
+                // DisconnectException is expected when 
NetworkClient#initiateClose is called
+                return;
+            }
+            log.error("unhandled exception caught in InterBrokerSendThread", 
t);
+            // rethrow any unhandled exceptions as FatalExitError so the JVM 
will be terminated
+            // as we will be in an unknown state with potentially some 
requests dropped and not
+            // being able to make progress. Known and expected Errors should 
have been appropriately
+            // dealt with already.
+            throw new FatalExitError();
+        }
+    }
+
+    @Override
+    public void doWork() {
+        pollOnce(Long.MAX_VALUE);
+    }
+
+    private long sendRequests(long now, long maxTimeoutMs) {
+        long pollTimeout = maxTimeoutMs;
+        for (Node node : unsentRequests.nodes()) {
+            final Iterator<ClientRequest> requestIterator = 
unsentRequests.requestIterator(node);
+            while (requestIterator.hasNext()) {
+                final ClientRequest request = requestIterator.next();
+                if (networkClient.ready(node, now)) {
+                    networkClient.send(request, now);
+                    requestIterator.remove();
+                } else {
+                    pollTimeout = Math.min(pollTimeout, 
networkClient.connectionDelay(node, now));
+                }
+            }
+        }
+        return pollTimeout;
+    }
+
+    private void checkDisconnects(long now) {
+        // any disconnects affecting requests that have already been 
transmitted will be handled
+        // by NetworkClient, so we just need to check whether connections for 
any of the unsent
+        // requests have been disconnected; if they have, then we complete the 
corresponding future
+        // and set the disconnect flag in the ClientResponse
+        final Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = 
unsentRequests.iterator();
+        while (iterator.hasNext()) {
+            final Map.Entry<Node, ArrayDeque<ClientRequest>> entry = 
iterator.next();
+            final Node node = entry.getKey();
+            final ArrayDeque<ClientRequest> requests = entry.getValue();
+            if (!requests.isEmpty() && networkClient.connectionFailed(node)) {
+                iterator.remove();
+                for (ClientRequest request : requests) {
+                    final AuthenticationException authenticationException = 
networkClient.authenticationException(node);
+                    if (authenticationException != null) {
+                        log.error("Failed to send the following request due to 
authentication error: {}", request);
+                    }
+                    completeWithDisconnect(request, now, 
authenticationException);
+                }
+            }
+        }
+    }
+
+    private void failExpiredRequests(long now) {
+        // clear all expired unsent requests
+        final Collection<ClientRequest> timedOutRequests = 
unsentRequests.removeAllTimedOut(now);
+        for (ClientRequest request : timedOutRequests) {
+            log.debug("Failed to send the following request after {} ms: {}", 
request.requestTimeoutMs(), request);
+            completeWithDisconnect(request, now, null);
+        }
+    }
+
+    void completeWithDisconnect(ClientRequest request,
+        long now,
+        AuthenticationException authenticationException) {
+        final RequestCompletionHandler handler = request.callback();
+        handler.onComplete(
+            new ClientResponse(
+                
request.makeHeader(request.requestBuilder().latestAllowedVersion()),
+                handler,
+                request.destination(),
+                now /* createdTimeMs */,
+                now /* receivedTimeMs */,
+                true /* disconnected */,
+                null /* versionMismatch */,
+                authenticationException,
+                null));

Review Comment:
   nit: On closing parenthesis, whatever style you decide to use, let's try to 
be consistent in the file.



##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * An inter-broker send thread that utilizes a non-blocking network client.
+ */
+public abstract class InterBrokerSendThread extends ShutdownableThread {
+
+    protected volatile KafkaClient networkClient;
+
+    private final int requestTimeoutMs;
+    private final Time time;
+    private final UnsentRequests unsentRequests;
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time
+    ) {
+        this(name, networkClient, requestTimeoutMs, time, true);
+    }
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time,
+        boolean isInterruptible
+    ) {
+        super(name, isInterruptible);
+        this.networkClient = networkClient;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.time = time;
+        this.unsentRequests = new UnsentRequests();
+    }
+
+    public abstract Collection<RequestAndCompletionHandler> generateRequests();
+
+    public boolean hasUnsentRequests() {
+        return unsentRequests.iterator().hasNext();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        initiateShutdown();
+        networkClient.initiateClose();
+        awaitShutdown();
+        try {
+            networkClient.close();
+        } catch (IOException e) {
+            // Not expected - currently no Kafka client implementations throw 
on close()
+            log.error("exception while trying to close Kafka client", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void drainGeneratedRequests() {
+        generateRequests().forEach(request ->
+            unsentRequests.put(
+                request.destination,
+                networkClient.newClientRequest(
+                    request.destination.idString(),
+                    request.request,
+                    request.creationTimeMs,
+                    true,
+                    requestTimeoutMs,
+                    request.handler
+                )));
+    }
+
+    protected void pollOnce(long maxTimeoutMs) {
+        try {
+            drainGeneratedRequests();
+            long now = time.milliseconds();
+            final long timeout = sendRequests(now, maxTimeoutMs);
+            networkClient.poll(timeout, now);
+            now = time.milliseconds();
+            checkDisconnects(now);
+            failExpiredRequests(now);
+            unsentRequests.clean();
+        } catch (FatalExitError fee) {
+            throw fee;
+        } catch (Throwable t) {
+            if (t instanceof DisconnectException && !networkClient.active()) {
+                // DisconnectException is expected when 
NetworkClient#initiateClose is called
+                return;
+            }
+            log.error("unhandled exception caught in InterBrokerSendThread", 
t);
+            // rethrow any unhandled exceptions as FatalExitError so the JVM 
will be terminated
+            // as we will be in an unknown state with potentially some 
requests dropped and not
+            // being able to make progress. Known and expected Errors should 
have been appropriately
+            // dealt with already.
+            throw new FatalExitError();
+        }
+    }
+
+    @Override
+    public void doWork() {
+        pollOnce(Long.MAX_VALUE);
+    }
+
+    private long sendRequests(long now, long maxTimeoutMs) {
+        long pollTimeout = maxTimeoutMs;
+        for (Node node : unsentRequests.nodes()) {
+            final Iterator<ClientRequest> requestIterator = 
unsentRequests.requestIterator(node);
+            while (requestIterator.hasNext()) {
+                final ClientRequest request = requestIterator.next();
+                if (networkClient.ready(node, now)) {
+                    networkClient.send(request, now);
+                    requestIterator.remove();
+                } else {
+                    pollTimeout = Math.min(pollTimeout, 
networkClient.connectionDelay(node, now));
+                }
+            }
+        }
+        return pollTimeout;
+    }
+
+    private void checkDisconnects(long now) {
+        // any disconnects affecting requests that have already been 
transmitted will be handled
+        // by NetworkClient, so we just need to check whether connections for 
any of the unsent
+        // requests have been disconnected; if they have, then we complete the 
corresponding future
+        // and set the disconnect flag in the ClientResponse
+        final Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = 
unsentRequests.iterator();
+        while (iterator.hasNext()) {
+            final Map.Entry<Node, ArrayDeque<ClientRequest>> entry = 
iterator.next();
+            final Node node = entry.getKey();
+            final ArrayDeque<ClientRequest> requests = entry.getValue();
+            if (!requests.isEmpty() && networkClient.connectionFailed(node)) {
+                iterator.remove();
+                for (ClientRequest request : requests) {
+                    final AuthenticationException authenticationException = 
networkClient.authenticationException(node);
+                    if (authenticationException != null) {
+                        log.error("Failed to send the following request due to 
authentication error: {}", request);
+                    }
+                    completeWithDisconnect(request, now, 
authenticationException);
+                }
+            }
+        }
+    }
+
+    private void failExpiredRequests(long now) {
+        // clear all expired unsent requests
+        final Collection<ClientRequest> timedOutRequests = 
unsentRequests.removeAllTimedOut(now);
+        for (ClientRequest request : timedOutRequests) {
+            log.debug("Failed to send the following request after {} ms: {}", 
request.requestTimeoutMs(), request);
+            completeWithDisconnect(request, now, null);
+        }
+    }
+
+    void completeWithDisconnect(ClientRequest request,
+        long now,
+        AuthenticationException authenticationException) {
+        final RequestCompletionHandler handler = request.callback();
+        handler.onComplete(
+            new ClientResponse(
+                
request.makeHeader(request.requestBuilder().latestAllowedVersion()),
+                handler,
+                request.destination(),
+                now /* createdTimeMs */,
+                now /* receivedTimeMs */,
+                true /* disconnected */,
+                null /* versionMismatch */,
+                authenticationException,
+                null));
+    }
+
+    public void wakeup() {
+        networkClient.wakeup();
+    }
+
+    private static final class UnsentRequests {
+
+        private final Map<Node, ArrayDeque<ClientRequest>> unsent = new 
HashMap<>();
+
+        void put(Node node, ClientRequest request) {
+            ArrayDeque<ClientRequest> requests = unsent.computeIfAbsent(node, 
n -> new ArrayDeque<>());
+            requests.add(request);
+        }
+
+        Collection<ClientRequest> removeAllTimedOut(long now) {
+            final List<ClientRequest> expiredRequests = new ArrayList<>();
+            for (ArrayDeque<ClientRequest> requests : unsent.values()) {
+                final Iterator<ClientRequest> requestIterator = 
requests.iterator();
+                boolean foundExpiredRequest = false;
+                while (requestIterator.hasNext() && !foundExpiredRequest) {
+                    final ClientRequest request = requestIterator.next();
+                    final long elapsedMs = Math.max(0, now - 
request.createdTimeMs());
+                    if (elapsedMs > request.requestTimeoutMs()) {
+                        expiredRequests.add(request);
+                        requestIterator.remove();
+                        foundExpiredRequest = true;
+                    }
+                }
+            }
+            return expiredRequests;
+        }
+
+        void clean() {
+            unsent.values().removeIf(ArrayDeque::isEmpty);

Review Comment:
   Nice one! I was not aware of `removeIf`.



##########
server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+
+public class InterBrokerSendThreadTest {
+
+    private final Time time = new MockTime();
+    private final KafkaClient networkClient = mock(KafkaClient.class);
+    private final StubCompletionHandler completionHandler = new 
StubCompletionHandler();
+    private final int requestTimeoutMs = 1000;
+
+    class TestInterBrokerSendThread extends InterBrokerSendThread {
+
+        private final Consumer<Throwable> exceptionCallback;
+        private final Queue<RequestAndCompletionHandler> queue = new 
ArrayDeque<>();
+
+        TestInterBrokerSendThread() {
+            this(
+                InterBrokerSendThreadTest.this.networkClient,
+                t -> {
+                    throw (t instanceof RuntimeException)
+                        ? ((RuntimeException) t)
+                        : new RuntimeException(t);
+                });
+        }
+
+        TestInterBrokerSendThread(KafkaClient networkClient, 
Consumer<Throwable> exceptionCallback) {
+            super("name", networkClient, requestTimeoutMs, time);
+            this.exceptionCallback = exceptionCallback;
+        }
+
+        void enqueue(RequestAndCompletionHandler request) {
+            queue.offer(request);
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            return queue.isEmpty() ? Collections.emptyList() : 
Collections.singletonList(queue.poll());
+        }
+
+        @Override
+        protected void pollOnce(long maxTimeoutMs) {
+            try {
+                super.pollOnce(maxTimeoutMs);
+            } catch (Throwable t) {
+                exceptionCallback.accept(t);
+            }
+        }
+    }
+
+    @Test
+    public void shutdownThreadShouldNotCauseException() throws 
InterruptedException, IOException {
+        // InterBrokerSendThread#shutdown calls NetworkClient#initiateClose 
first so NetworkClient#poll
+        // can throw DisconnectException when thread is running
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(new 
DisconnectException());
+        when(networkClient.active()).thenReturn(false);
+
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, exception::getAndSet);
+        thread.shutdown();
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).initiateClose();
+        verify(networkClient).close();
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        assertNull(exception.get());
+    }
+
+    @Test
+    public void disconnectWithoutShutdownShouldCauseException() {
+        DisconnectException de = new DisconnectException();
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(de);
+        when(networkClient.active()).thenReturn(true);
+
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, throwable::getAndSet);
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        Throwable thrown = throwable.get();
+        assertNotNull(thrown);
+        assertTrue(thrown instanceof FatalExitError);
+    }
+
+    @Test
+    public void shouldNotSendAnythingWhenNoRequests() {
+        final InterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        // poll is always called but there should be no further invocations on 
NetworkClient
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.doWork();
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void shouldCreateClientRequestAndSendWhenNodeIsReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.enqueue(handler);
+        sendThread.doWork();
+
+        verify(networkClient)
+            .newClientRequest(
+                ArgumentMatchers.eq("1"),
+                same(handler.request),
+                anyLong(),
+                ArgumentMatchers.eq(true),
+                ArgumentMatchers.eq(requestTimeoutMs),
+                same(handler.handler));
+        verify(networkClient).ready(any(), anyLong());
+        verify(networkClient).send(same(clientRequest), anyLong());
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void 
shouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
+
+        when(networkClient.connectionDelay(any(), anyLong())).thenReturn(0L);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());

Review Comment:
   nit: Could we use `Collections.emptyList()`?



##########
server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+
+public class InterBrokerSendThreadTest {
+
+    private final Time time = new MockTime();
+    private final KafkaClient networkClient = mock(KafkaClient.class);
+    private final StubCompletionHandler completionHandler = new 
StubCompletionHandler();
+    private final int requestTimeoutMs = 1000;
+
+    class TestInterBrokerSendThread extends InterBrokerSendThread {
+
+        private final Consumer<Throwable> exceptionCallback;
+        private final Queue<RequestAndCompletionHandler> queue = new 
ArrayDeque<>();
+
+        TestInterBrokerSendThread() {
+            this(
+                InterBrokerSendThreadTest.this.networkClient,
+                t -> {
+                    throw (t instanceof RuntimeException)
+                        ? ((RuntimeException) t)
+                        : new RuntimeException(t);
+                });
+        }
+
+        TestInterBrokerSendThread(KafkaClient networkClient, 
Consumer<Throwable> exceptionCallback) {
+            super("name", networkClient, requestTimeoutMs, time);
+            this.exceptionCallback = exceptionCallback;
+        }
+
+        void enqueue(RequestAndCompletionHandler request) {
+            queue.offer(request);
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            return queue.isEmpty() ? Collections.emptyList() : 
Collections.singletonList(queue.poll());
+        }
+
+        @Override
+        protected void pollOnce(long maxTimeoutMs) {
+            try {
+                super.pollOnce(maxTimeoutMs);
+            } catch (Throwable t) {
+                exceptionCallback.accept(t);
+            }
+        }
+    }
+
+    @Test
+    public void shutdownThreadShouldNotCauseException() throws 
InterruptedException, IOException {
+        // InterBrokerSendThread#shutdown calls NetworkClient#initiateClose 
first so NetworkClient#poll
+        // can throw DisconnectException when thread is running
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(new 
DisconnectException());
+        when(networkClient.active()).thenReturn(false);
+
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, exception::getAndSet);
+        thread.shutdown();
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).initiateClose();
+        verify(networkClient).close();
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        assertNull(exception.get());
+    }
+
+    @Test
+    public void disconnectWithoutShutdownShouldCauseException() {
+        DisconnectException de = new DisconnectException();
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(de);
+        when(networkClient.active()).thenReturn(true);
+
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, throwable::getAndSet);
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        Throwable thrown = throwable.get();
+        assertNotNull(thrown);
+        assertTrue(thrown instanceof FatalExitError);
+    }
+
+    @Test
+    public void shouldNotSendAnythingWhenNoRequests() {
+        final InterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        // poll is always called but there should be no further invocations on 
NetworkClient
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.doWork();
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void shouldCreateClientRequestAndSendWhenNodeIsReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.enqueue(handler);
+        sendThread.doWork();
+
+        verify(networkClient)
+            .newClientRequest(
+                ArgumentMatchers.eq("1"),
+                same(handler.request),
+                anyLong(),
+                ArgumentMatchers.eq(true),
+                ArgumentMatchers.eq(requestTimeoutMs),
+                same(handler.handler));
+        verify(networkClient).ready(any(), anyLong());
+        verify(networkClient).send(same(clientRequest), anyLong());
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void 
shouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
+
+        when(networkClient.connectionDelay(any(), anyLong())).thenReturn(0L);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        when(networkClient.connectionFailed(node)).thenReturn(true);
+
+        when(networkClient.authenticationException(node)).thenReturn(new 
AuthenticationException(""));
+
+        sendThread.enqueue(handler);
+        sendThread.doWork();
+
+        verify(networkClient)
+            .newClientRequest(
+                ArgumentMatchers.eq("1"),
+                same(handler.request),
+                anyLong(),
+                ArgumentMatchers.eq(true),
+                ArgumentMatchers.eq(requestTimeoutMs),
+                same(handler.handler));
+        verify(networkClient).ready(any(), anyLong());
+        verify(networkClient).connectionDelay(any(), anyLong());
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).connectionFailed(any());
+        verify(networkClient).authenticationException(any());
+        verifyNoMoreInteractions(networkClient);
+
+        assertTrue(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void testFailingExpiredRequests() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest(
+                "dest", request, 0, "1", time.milliseconds(), true, 
requestTimeoutMs, handler.handler);
+        time.sleep(1500L);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            ArgumentMatchers.eq(handler.creationTimeMs),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        // make the node unready so the request is not cleared
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
+
+        when(networkClient.connectionDelay(any(), anyLong())).thenReturn(0L);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());

Review Comment:
   nit: `emptyList()`?



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -37,7 +39,8 @@ class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransac
 
 
 class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time)
-  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time)
+  with Logging {

Review Comment:
   Do we need to set the log prefix?



##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * An inter-broker send thread that utilizes a non-blocking network client.
+ */
+public abstract class InterBrokerSendThread extends ShutdownableThread {
+
+    protected volatile KafkaClient networkClient;
+
+    private final int requestTimeoutMs;
+    private final Time time;
+    private final UnsentRequests unsentRequests;
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time
+    ) {
+        this(name, networkClient, requestTimeoutMs, time, true);
+    }
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time,
+        boolean isInterruptible
+    ) {
+        super(name, isInterruptible);
+        this.networkClient = networkClient;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.time = time;
+        this.unsentRequests = new UnsentRequests();
+    }
+
+    public abstract Collection<RequestAndCompletionHandler> generateRequests();
+
+    public boolean hasUnsentRequests() {
+        return unsentRequests.iterator().hasNext();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        initiateShutdown();
+        networkClient.initiateClose();
+        awaitShutdown();
+        try {
+            networkClient.close();
+        } catch (IOException e) {
+            // Not expected - currently no Kafka client implementations throw 
on close()
+            log.error("exception while trying to close Kafka client", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void drainGeneratedRequests() {
+        generateRequests().forEach(request ->
+            unsentRequests.put(
+                request.destination,
+                networkClient.newClientRequest(
+                    request.destination.idString(),
+                    request.request,
+                    request.creationTimeMs,
+                    true,
+                    requestTimeoutMs,
+                    request.handler
+                )));
+    }
+
+    protected void pollOnce(long maxTimeoutMs) {
+        try {
+            drainGeneratedRequests();
+            long now = time.milliseconds();
+            final long timeout = sendRequests(now, maxTimeoutMs);
+            networkClient.poll(timeout, now);
+            now = time.milliseconds();
+            checkDisconnects(now);
+            failExpiredRequests(now);
+            unsentRequests.clean();
+        } catch (FatalExitError fee) {
+            throw fee;
+        } catch (Throwable t) {
+            if (t instanceof DisconnectException && !networkClient.active()) {
+                // DisconnectException is expected when 
NetworkClient#initiateClose is called
+                return;
+            }
+            log.error("unhandled exception caught in InterBrokerSendThread", 
t);
+            // rethrow any unhandled exceptions as FatalExitError so the JVM 
will be terminated
+            // as we will be in an unknown state with potentially some 
requests dropped and not
+            // being able to make progress. Known and expected Errors should 
have been appropriately
+            // dealt with already.
+            throw new FatalExitError();
+        }
+    }
+
+    @Override
+    public void doWork() {
+        pollOnce(Long.MAX_VALUE);
+    }
+
+    private long sendRequests(long now, long maxTimeoutMs) {
+        long pollTimeout = maxTimeoutMs;
+        for (Node node : unsentRequests.nodes()) {
+            final Iterator<ClientRequest> requestIterator = 
unsentRequests.requestIterator(node);
+            while (requestIterator.hasNext()) {
+                final ClientRequest request = requestIterator.next();
+                if (networkClient.ready(node, now)) {
+                    networkClient.send(request, now);
+                    requestIterator.remove();
+                } else {
+                    pollTimeout = Math.min(pollTimeout, 
networkClient.connectionDelay(node, now));
+                }
+            }
+        }
+        return pollTimeout;
+    }
+
+    private void checkDisconnects(long now) {
+        // any disconnects affecting requests that have already been 
transmitted will be handled
+        // by NetworkClient, so we just need to check whether connections for 
any of the unsent
+        // requests have been disconnected; if they have, then we complete the 
corresponding future
+        // and set the disconnect flag in the ClientResponse
+        final Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = 
unsentRequests.iterator();
+        while (iterator.hasNext()) {
+            final Map.Entry<Node, ArrayDeque<ClientRequest>> entry = 
iterator.next();
+            final Node node = entry.getKey();
+            final ArrayDeque<ClientRequest> requests = entry.getValue();
+            if (!requests.isEmpty() && networkClient.connectionFailed(node)) {
+                iterator.remove();
+                for (ClientRequest request : requests) {
+                    final AuthenticationException authenticationException = 
networkClient.authenticationException(node);
+                    if (authenticationException != null) {
+                        log.error("Failed to send the following request due to 
authentication error: {}", request);
+                    }
+                    completeWithDisconnect(request, now, 
authenticationException);
+                }
+            }
+        }
+    }
+
+    private void failExpiredRequests(long now) {
+        // clear all expired unsent requests
+        final Collection<ClientRequest> timedOutRequests = 
unsentRequests.removeAllTimedOut(now);
+        for (ClientRequest request : timedOutRequests) {
+            log.debug("Failed to send the following request after {} ms: {}", 
request.requestTimeoutMs(), request);
+            completeWithDisconnect(request, now, null);
+        }
+    }
+
+    void completeWithDisconnect(ClientRequest request,
+        long now,
+        AuthenticationException authenticationException) {

Review Comment:
   nit: We never format method like this. The following would be better.
   
   ```
       void completeWithDisconnect(
           ClientRequest request,
           long now,
           AuthenticationException authenticationException
       ) {
   ```



##########
server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.AbstractRequest;
+
+public final class RequestAndCompletionHandler {

Review Comment:
   I was wondering about whether we should define this one in 
`InterBrokerSendThread` like it was in Scala. What do you think?



##########
server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+
+public class InterBrokerSendThreadTest {
+
+    private final Time time = new MockTime();
+    private final KafkaClient networkClient = mock(KafkaClient.class);
+    private final StubCompletionHandler completionHandler = new 
StubCompletionHandler();
+    private final int requestTimeoutMs = 1000;
+
+    class TestInterBrokerSendThread extends InterBrokerSendThread {
+
+        private final Consumer<Throwable> exceptionCallback;
+        private final Queue<RequestAndCompletionHandler> queue = new 
ArrayDeque<>();
+
+        TestInterBrokerSendThread() {
+            this(
+                InterBrokerSendThreadTest.this.networkClient,
+                t -> {
+                    throw (t instanceof RuntimeException)
+                        ? ((RuntimeException) t)
+                        : new RuntimeException(t);
+                });
+        }
+
+        TestInterBrokerSendThread(KafkaClient networkClient, 
Consumer<Throwable> exceptionCallback) {
+            super("name", networkClient, requestTimeoutMs, time);
+            this.exceptionCallback = exceptionCallback;
+        }
+
+        void enqueue(RequestAndCompletionHandler request) {
+            queue.offer(request);
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            return queue.isEmpty() ? Collections.emptyList() : 
Collections.singletonList(queue.poll());
+        }
+
+        @Override
+        protected void pollOnce(long maxTimeoutMs) {
+            try {
+                super.pollOnce(maxTimeoutMs);
+            } catch (Throwable t) {
+                exceptionCallback.accept(t);
+            }
+        }
+    }
+
+    @Test
+    public void shutdownThreadShouldNotCauseException() throws 
InterruptedException, IOException {

Review Comment:
   nit: We usually prefix all tests with `test`.



##########
server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+
+public class InterBrokerSendThreadTest {
+
+    private final Time time = new MockTime();
+    private final KafkaClient networkClient = mock(KafkaClient.class);
+    private final StubCompletionHandler completionHandler = new 
StubCompletionHandler();
+    private final int requestTimeoutMs = 1000;
+
+    class TestInterBrokerSendThread extends InterBrokerSendThread {
+
+        private final Consumer<Throwable> exceptionCallback;
+        private final Queue<RequestAndCompletionHandler> queue = new 
ArrayDeque<>();
+
+        TestInterBrokerSendThread() {
+            this(
+                InterBrokerSendThreadTest.this.networkClient,
+                t -> {
+                    throw (t instanceof RuntimeException)
+                        ? ((RuntimeException) t)
+                        : new RuntimeException(t);
+                });
+        }
+
+        TestInterBrokerSendThread(KafkaClient networkClient, 
Consumer<Throwable> exceptionCallback) {
+            super("name", networkClient, requestTimeoutMs, time);
+            this.exceptionCallback = exceptionCallback;
+        }
+
+        void enqueue(RequestAndCompletionHandler request) {
+            queue.offer(request);
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            return queue.isEmpty() ? Collections.emptyList() : 
Collections.singletonList(queue.poll());
+        }
+
+        @Override
+        protected void pollOnce(long maxTimeoutMs) {
+            try {
+                super.pollOnce(maxTimeoutMs);
+            } catch (Throwable t) {
+                exceptionCallback.accept(t);
+            }
+        }
+    }
+
+    @Test
+    public void shutdownThreadShouldNotCauseException() throws 
InterruptedException, IOException {
+        // InterBrokerSendThread#shutdown calls NetworkClient#initiateClose 
first so NetworkClient#poll
+        // can throw DisconnectException when thread is running
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(new 
DisconnectException());
+        when(networkClient.active()).thenReturn(false);
+
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, exception::getAndSet);
+        thread.shutdown();
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).initiateClose();
+        verify(networkClient).close();
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        assertNull(exception.get());
+    }
+
+    @Test
+    public void disconnectWithoutShutdownShouldCauseException() {
+        DisconnectException de = new DisconnectException();
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(de);
+        when(networkClient.active()).thenReturn(true);
+
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, throwable::getAndSet);
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        Throwable thrown = throwable.get();
+        assertNotNull(thrown);
+        assertTrue(thrown instanceof FatalExitError);
+    }
+
+    @Test
+    public void shouldNotSendAnythingWhenNoRequests() {
+        final InterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        // poll is always called but there should be no further invocations on 
NetworkClient
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.doWork();
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void shouldCreateClientRequestAndSendWhenNodeIsReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);

Review Comment:
   nit: The format is weird here. How about?
   
   ```
       .....
       same(handler.handler)
   )).thenReturn(clientRequest);
   ```



##########
server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * An inter-broker send thread that utilizes a non-blocking network client.
+ */
+public abstract class InterBrokerSendThread extends ShutdownableThread {
+
+    protected volatile KafkaClient networkClient;
+
+    private final int requestTimeoutMs;
+    private final Time time;
+    private final UnsentRequests unsentRequests;
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time
+    ) {
+        this(name, networkClient, requestTimeoutMs, time, true);
+    }
+
+    public InterBrokerSendThread(
+        String name,
+        KafkaClient networkClient,
+        int requestTimeoutMs,
+        Time time,
+        boolean isInterruptible
+    ) {
+        super(name, isInterruptible);
+        this.networkClient = networkClient;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.time = time;
+        this.unsentRequests = new UnsentRequests();
+    }
+
+    public abstract Collection<RequestAndCompletionHandler> generateRequests();
+
+    public boolean hasUnsentRequests() {
+        return unsentRequests.iterator().hasNext();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        initiateShutdown();
+        networkClient.initiateClose();
+        awaitShutdown();
+        try {
+            networkClient.close();
+        } catch (IOException e) {
+            // Not expected - currently no Kafka client implementations throw 
on close()
+            log.error("exception while trying to close Kafka client", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void drainGeneratedRequests() {
+        generateRequests().forEach(request ->
+            unsentRequests.put(
+                request.destination,
+                networkClient.newClientRequest(
+                    request.destination.idString(),
+                    request.request,
+                    request.creationTimeMs,
+                    true,
+                    requestTimeoutMs,
+                    request.handler
+                )));
+    }
+
+    protected void pollOnce(long maxTimeoutMs) {
+        try {
+            drainGeneratedRequests();
+            long now = time.milliseconds();
+            final long timeout = sendRequests(now, maxTimeoutMs);
+            networkClient.poll(timeout, now);
+            now = time.milliseconds();
+            checkDisconnects(now);
+            failExpiredRequests(now);
+            unsentRequests.clean();
+        } catch (FatalExitError fee) {
+            throw fee;
+        } catch (Throwable t) {
+            if (t instanceof DisconnectException && !networkClient.active()) {
+                // DisconnectException is expected when 
NetworkClient#initiateClose is called
+                return;
+            }
+            log.error("unhandled exception caught in InterBrokerSendThread", 
t);
+            // rethrow any unhandled exceptions as FatalExitError so the JVM 
will be terminated
+            // as we will be in an unknown state with potentially some 
requests dropped and not
+            // being able to make progress. Known and expected Errors should 
have been appropriately
+            // dealt with already.
+            throw new FatalExitError();
+        }
+    }
+
+    @Override
+    public void doWork() {
+        pollOnce(Long.MAX_VALUE);
+    }
+
+    private long sendRequests(long now, long maxTimeoutMs) {
+        long pollTimeout = maxTimeoutMs;
+        for (Node node : unsentRequests.nodes()) {
+            final Iterator<ClientRequest> requestIterator = 
unsentRequests.requestIterator(node);
+            while (requestIterator.hasNext()) {
+                final ClientRequest request = requestIterator.next();
+                if (networkClient.ready(node, now)) {
+                    networkClient.send(request, now);
+                    requestIterator.remove();
+                } else {
+                    pollTimeout = Math.min(pollTimeout, 
networkClient.connectionDelay(node, now));
+                }
+            }
+        }
+        return pollTimeout;
+    }
+
+    private void checkDisconnects(long now) {
+        // any disconnects affecting requests that have already been 
transmitted will be handled
+        // by NetworkClient, so we just need to check whether connections for 
any of the unsent
+        // requests have been disconnected; if they have, then we complete the 
corresponding future
+        // and set the disconnect flag in the ClientResponse
+        final Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = 
unsentRequests.iterator();
+        while (iterator.hasNext()) {
+            final Map.Entry<Node, ArrayDeque<ClientRequest>> entry = 
iterator.next();
+            final Node node = entry.getKey();
+            final ArrayDeque<ClientRequest> requests = entry.getValue();
+            if (!requests.isEmpty() && networkClient.connectionFailed(node)) {
+                iterator.remove();
+                for (ClientRequest request : requests) {
+                    final AuthenticationException authenticationException = 
networkClient.authenticationException(node);
+                    if (authenticationException != null) {
+                        log.error("Failed to send the following request due to 
authentication error: {}", request);
+                    }
+                    completeWithDisconnect(request, now, 
authenticationException);
+                }
+            }
+        }
+    }
+
+    private void failExpiredRequests(long now) {
+        // clear all expired unsent requests
+        final Collection<ClientRequest> timedOutRequests = 
unsentRequests.removeAllTimedOut(now);
+        for (ClientRequest request : timedOutRequests) {
+            log.debug("Failed to send the following request after {} ms: {}", 
request.requestTimeoutMs(), request);
+            completeWithDisconnect(request, now, null);
+        }
+    }
+
+    void completeWithDisconnect(ClientRequest request,

Review Comment:
   Could we make it private?



##########
server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.AbstractRequest;
+
+public final class RequestAndCompletionHandler {
+
+    public final long creationTimeMs;
+    public final Node destination;
+    public final AbstractRequest.Builder<? extends AbstractRequest> request;
+    public final RequestCompletionHandler handler;
+
+    public RequestAndCompletionHandler(
+        long creationTimeMs,
+        Node destination,
+        AbstractRequest.Builder<? extends AbstractRequest> request,
+        RequestCompletionHandler handler
+    ) {
+        this.creationTimeMs = creationTimeMs;
+        this.destination = destination;
+        this.request = request;
+        this.handler = handler;
+    }

Review Comment:
   nit: Should we define a `toString`? case classes in scala have it by default 
so it may be useful here.



##########
server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+
+public class InterBrokerSendThreadTest {
+
+    private final Time time = new MockTime();
+    private final KafkaClient networkClient = mock(KafkaClient.class);
+    private final StubCompletionHandler completionHandler = new 
StubCompletionHandler();
+    private final int requestTimeoutMs = 1000;
+
+    class TestInterBrokerSendThread extends InterBrokerSendThread {
+
+        private final Consumer<Throwable> exceptionCallback;
+        private final Queue<RequestAndCompletionHandler> queue = new 
ArrayDeque<>();
+
+        TestInterBrokerSendThread() {
+            this(
+                InterBrokerSendThreadTest.this.networkClient,
+                t -> {
+                    throw (t instanceof RuntimeException)
+                        ? ((RuntimeException) t)
+                        : new RuntimeException(t);
+                });
+        }
+
+        TestInterBrokerSendThread(KafkaClient networkClient, 
Consumer<Throwable> exceptionCallback) {
+            super("name", networkClient, requestTimeoutMs, time);
+            this.exceptionCallback = exceptionCallback;
+        }
+
+        void enqueue(RequestAndCompletionHandler request) {
+            queue.offer(request);
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            return queue.isEmpty() ? Collections.emptyList() : 
Collections.singletonList(queue.poll());
+        }
+
+        @Override
+        protected void pollOnce(long maxTimeoutMs) {
+            try {
+                super.pollOnce(maxTimeoutMs);
+            } catch (Throwable t) {
+                exceptionCallback.accept(t);
+            }
+        }
+    }
+
+    @Test
+    public void shutdownThreadShouldNotCauseException() throws 
InterruptedException, IOException {
+        // InterBrokerSendThread#shutdown calls NetworkClient#initiateClose 
first so NetworkClient#poll
+        // can throw DisconnectException when thread is running
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(new 
DisconnectException());
+        when(networkClient.active()).thenReturn(false);
+
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, exception::getAndSet);
+        thread.shutdown();
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).initiateClose();
+        verify(networkClient).close();
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        assertNull(exception.get());
+    }
+
+    @Test
+    public void disconnectWithoutShutdownShouldCauseException() {
+        DisconnectException de = new DisconnectException();
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(de);
+        when(networkClient.active()).thenReturn(true);
+
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, throwable::getAndSet);
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        Throwable thrown = throwable.get();
+        assertNotNull(thrown);
+        assertTrue(thrown instanceof FatalExitError);
+    }
+
+    @Test
+    public void shouldNotSendAnythingWhenNoRequests() {
+        final InterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        // poll is always called but there should be no further invocations on 
NetworkClient
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.doWork();
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void shouldCreateClientRequestAndSendWhenNodeIsReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.enqueue(handler);
+        sendThread.doWork();
+
+        verify(networkClient)
+            .newClientRequest(
+                ArgumentMatchers.eq("1"),
+                same(handler.request),
+                anyLong(),
+                ArgumentMatchers.eq(true),
+                ArgumentMatchers.eq(requestTimeoutMs),
+                same(handler.handler));
+        verify(networkClient).ready(any(), anyLong());
+        verify(networkClient).send(same(clientRequest), anyLong());
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void 
shouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
+
+        when(networkClient.connectionDelay(any(), anyLong())).thenReturn(0L);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        when(networkClient.connectionFailed(node)).thenReturn(true);
+
+        when(networkClient.authenticationException(node)).thenReturn(new 
AuthenticationException(""));
+
+        sendThread.enqueue(handler);
+        sendThread.doWork();
+
+        verify(networkClient)
+            .newClientRequest(
+                ArgumentMatchers.eq("1"),
+                same(handler.request),
+                anyLong(),
+                ArgumentMatchers.eq(true),
+                ArgumentMatchers.eq(requestTimeoutMs),
+                same(handler.handler));
+        verify(networkClient).ready(any(), anyLong());
+        verify(networkClient).connectionDelay(any(), anyLong());
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).connectionFailed(any());
+        verify(networkClient).authenticationException(any());
+        verifyNoMoreInteractions(networkClient);
+
+        assertTrue(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void testFailingExpiredRequests() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest(
+                "dest", request, 0, "1", time.milliseconds(), true, 
requestTimeoutMs, handler.handler);
+        time.sleep(1500L);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            ArgumentMatchers.eq(handler.creationTimeMs),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);

Review Comment:
   nit: format.



##########
server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+
+public class InterBrokerSendThreadTest {
+
+    private final Time time = new MockTime();
+    private final KafkaClient networkClient = mock(KafkaClient.class);
+    private final StubCompletionHandler completionHandler = new 
StubCompletionHandler();
+    private final int requestTimeoutMs = 1000;
+
+    class TestInterBrokerSendThread extends InterBrokerSendThread {
+
+        private final Consumer<Throwable> exceptionCallback;
+        private final Queue<RequestAndCompletionHandler> queue = new 
ArrayDeque<>();
+
+        TestInterBrokerSendThread() {
+            this(
+                InterBrokerSendThreadTest.this.networkClient,
+                t -> {
+                    throw (t instanceof RuntimeException)
+                        ? ((RuntimeException) t)
+                        : new RuntimeException(t);
+                });
+        }
+
+        TestInterBrokerSendThread(KafkaClient networkClient, 
Consumer<Throwable> exceptionCallback) {
+            super("name", networkClient, requestTimeoutMs, time);
+            this.exceptionCallback = exceptionCallback;
+        }
+
+        void enqueue(RequestAndCompletionHandler request) {
+            queue.offer(request);
+        }
+
+        @Override
+        public Collection<RequestAndCompletionHandler> generateRequests() {
+            return queue.isEmpty() ? Collections.emptyList() : 
Collections.singletonList(queue.poll());
+        }
+
+        @Override
+        protected void pollOnce(long maxTimeoutMs) {
+            try {
+                super.pollOnce(maxTimeoutMs);
+            } catch (Throwable t) {
+                exceptionCallback.accept(t);
+            }
+        }
+    }
+
+    @Test
+    public void shutdownThreadShouldNotCauseException() throws 
InterruptedException, IOException {
+        // InterBrokerSendThread#shutdown calls NetworkClient#initiateClose 
first so NetworkClient#poll
+        // can throw DisconnectException when thread is running
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(new 
DisconnectException());
+        when(networkClient.active()).thenReturn(false);
+
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, exception::getAndSet);
+        thread.shutdown();
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).initiateClose();
+        verify(networkClient).close();
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        assertNull(exception.get());
+    }
+
+    @Test
+    public void disconnectWithoutShutdownShouldCauseException() {
+        DisconnectException de = new DisconnectException();
+        when(networkClient.poll(anyLong(), anyLong())).thenThrow(de);
+        when(networkClient.active()).thenReturn(true);
+
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
+        final InterBrokerSendThread thread =
+            new TestInterBrokerSendThread(networkClient, throwable::getAndSet);
+        thread.pollOnce(100);
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verify(networkClient).active();
+        verifyNoMoreInteractions(networkClient);
+
+        Throwable thrown = throwable.get();
+        assertNotNull(thrown);
+        assertTrue(thrown instanceof FatalExitError);
+    }
+
+    @Test
+    public void shouldNotSendAnythingWhenNoRequests() {
+        final InterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        // poll is always called but there should be no further invocations on 
NetworkClient
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.doWork();
+
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void shouldCreateClientRequestAndSendWhenNodeIsReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);
+
+        when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
+
+        when(networkClient.poll(anyLong(), anyLong())).thenReturn(new 
ArrayList<>());
+
+        sendThread.enqueue(handler);
+        sendThread.doWork();
+
+        verify(networkClient)
+            .newClientRequest(
+                ArgumentMatchers.eq("1"),
+                same(handler.request),
+                anyLong(),
+                ArgumentMatchers.eq(true),
+                ArgumentMatchers.eq(requestTimeoutMs),
+                same(handler.handler));
+        verify(networkClient).ready(any(), anyLong());
+        verify(networkClient).send(same(clientRequest), anyLong());
+        verify(networkClient).poll(anyLong(), anyLong());
+        verifyNoMoreInteractions(networkClient);
+
+        assertFalse(completionHandler.executedWithDisconnectedResponse);
+    }
+
+    @Test
+    public void 
shouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady() {
+        final AbstractRequest.Builder<?> request = new StubRequestBuilder<>();
+        final Node node = new Node(1, "", 8080);
+        final RequestAndCompletionHandler handler =
+            new RequestAndCompletionHandler(time.milliseconds(), node, 
request, completionHandler);
+        final TestInterBrokerSendThread sendThread = new 
TestInterBrokerSendThread();
+
+        final ClientRequest clientRequest =
+            new ClientRequest("dest", request, 0, "1", 0, true, 
requestTimeoutMs, handler.handler);
+
+        when(networkClient.newClientRequest(
+            ArgumentMatchers.eq("1"),
+            same(handler.request),
+            anyLong(),
+            ArgumentMatchers.eq(true),
+            ArgumentMatchers.eq(requestTimeoutMs),
+            same(handler.handler)))
+            .thenReturn(clientRequest);

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -306,8 +307,8 @@ class BrokerToControllerRequestThread(
   initialNetworkClient,
   Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt,
   time,
-  isInterruptible = false
-) {
+  false
+) with Logging {

Review Comment:
   ditto?



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -366,16 +367,16 @@ class BrokerToControllerRequestThread(
         val controllerAddress = activeControllerAddress()
         if (controllerAddress.isDefined) {
           requestIter.remove()
-          return Some(RequestAndCompletionHandler(
+          return util.Collections.singletonList(new 
RequestAndCompletionHandler(
             time.milliseconds(),
             controllerAddress.get,
             request.request,
-            handleResponse(request)
+            response => handleResponse(request)(response)
           ))
         }
       }
     }
-    None
+    util.Collections.emptyList()

Review Comment:
   nit: Is there a reason why we need `util.` prefix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to