This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e7859bbabe8 KAFKA-18274 Failed to restart controller in testing due to closed socket channel [2/2] (#18337) e7859bbabe8 is described below commit e7859bbabe868fee8ef57f831397f67d8f845c80 Author: Peter Lee <peterx...@gmail.com> AuthorDate: Tue Jan 14 03:12:42 2025 +0800 KAFKA-18274 Failed to restart controller in testing due to closed socket channel [2/2] (#18337) Reviewers: TengYao Chi <kiting...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/common/test/PreboundSocketFactoryManager.java | 15 +++++++++++++++ .../kafka/common/test/api/ClusterTestExtensionsTest.java | 16 ++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java index 5358b211c77..fa082001d64 100644 --- a/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java +++ b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java @@ -49,7 +49,22 @@ public class PreboundSocketFactoryManager implements AutoCloseable { ServerSocketChannel socketChannel = getSocketForListenerAndMarkAsUsed( nodeId, listenerName); + if (socketChannel != null) { + if (socketChannel.isOpen()) { + return socketChannel; + } + // When restarting components(e.g. controllers, brokers) in tests, we want to reuse the same + // port that was previously allocated to maintain consistent addressing + // so the client can reconnect to the same port. + // Since those components would close the socket when they are restarted, + // we need to rebind the socket to the same port. + socketAddress = new InetSocketAddress(socketAddress.getHostString(), socketChannel.socket().getLocalPort()); + socketChannel = ServerSocketFactory.INSTANCE.openServerSocket( + listenerName, + socketAddress, + listenBacklogSize, + recvBufferSize); return socketChannel; } return ServerSocketFactory.INSTANCE.openServerSocket( diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java index c680ff004ae..60203649274 100644 --- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java +++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.common.test.api; +import kafka.server.ControllerServer; + import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.Config; @@ -345,6 +347,20 @@ public class ClusterTestExtensionsTest { } } + @ClusterTest(types = {Type.KRAFT}) + public void testControllerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException { + try (Admin admin = cluster.admin()) { + + ControllerServer controller = cluster.controllers().values().iterator().next(); + controller.shutdown(); + controller.awaitShutdown(); + + controller.startup(); + + assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size()); + } + } + @ClusterTest( types = {Type.KRAFT, Type.CO_KRAFT}, brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,