eolivelli opened a new issue, #19579: URL: https://github.com/apache/pulsar/issues/19579
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation When the broker is shutting down there are many subcomponents to shutdown, in particular the Protocol Handlers. Most of the shutdown (close) procedure of the Broker is async but we still have a blocking operation when closing the Protocol Handlers. Protocol Handlers, like KOP/Starlight for Kafka, often start threadpools and Pulsar Clients, and it may happen that in order to try a graceful shutdown the PH waits for some resources to be disposed, but such disposal may be deferred for long time (because the broker is also shuttting down and some resources are no more available, leading to errors and backoffs). The shutdown procedure of the Broker should be as quick as possible in order to prevent latency spikes and other unwanted consequences due to having a partially working broker. [This](https://github.com/datastax/starlight-for-kafka/actions/runs/4224366992/jobs/7335231134) is an example of a test failed due to time out on Starlight for Kafka. ``` Error: testConnectListenerNotExist(io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest) Time elapsed: 20.037 s <<< FAILURE! org.testng.internal.thread.ThreadTimeoutException: Method io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist() didn't finish within the time-out 20000 at [email protected]/java.lang.StackStreamFactory$AbstractStackWalker.callStackWalk(Native Method) at [email protected]/java.lang.StackStreamFactory$AbstractStackWalker.beginStackWalk(StackStreamFactory.java:370) at [email protected]/java.lang.StackStreamFactory$AbstractStackWalker.walk(StackStreamFactory.java:243) at [email protected]/java.lang.StackWalker.walk(StackWalker.java:498) at app//org.apache.logging.log4j.util.StackLocator.calcLocation(StackLocator.java:96) at app//org.apache.logging.log4j.util.StackLocatorUtil.calcLocation(StackLocatorUtil.java:99) at app//org.apache.logging.log4j.spi.AbstractLogger.getLocation(AbstractLogger.java:2216) at app//org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2159) at app//org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2142) at app//org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2040) at app//org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1907) at app//org.apache.logging.slf4j.Log4jLogger.warn(Log4jLogger.java:249) at app//io.streamnative.pulsar.handlers.kop.AbstractPulsarClient.close(AbstractPulsarClient.java:49) at app//io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.close(KafkaProtocolHandler.java:578) at app//org.apache.pulsar.broker.protocol.ProtocolHandlerWithClassLoader.close(ProtocolHandlerWithClassLoader.java:90) at app//org.apache.pulsar.broker.protocol.ProtocolHandlers$$Lambda$1438/0x0000000100b38040.accept(Unknown Source) at [email protected]/java.lang.Iterable.forEach(Iterable.java:75) at app//org.apache.pulsar.broker.protocol.ProtocolHandlers.close(ProtocolHandlers.java:154) at app//org.apache.pulsar.broker.PulsarService.closeAsync(PulsarService.java:458) at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync$accessor$pCM5WTps(Unknown Source) at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$4dJQHnOb.call(Unknown Source) at app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40) at app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62) at app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127) at app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43) at app//org.mockito.Answers.answer(Answers.java:100) at app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103) at app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29) at app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35) at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63) at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49) at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110) at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.closeAsync(Unknown Source) at app//org.apache.pulsar.broker.PulsarService.close(PulsarService.java:380) at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close$accessor$pCM5WTps(Unknown Source) at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588$auxiliary$Lvw9o3WA.call(Unknown Source) at app//org.mockito.internal.invocation.RealMethod$FromCallable$1.call(RealMethod.java:40) at app//org.mockito.internal.invocation.RealMethod$FromBehavior.invoke(RealMethod.java:62) at app//org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:127) at app//org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:43) at app//org.mockito.Answers.answer(Answers.java:100) at app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:103) at app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29) at app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:35) at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:63) at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:49) at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptSuperCallable(MockMethodInterceptor.java:110) at app//org.apache.pulsar.broker.PulsarService$MockitoMock$252149588.close(Unknown Source) at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:411) at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.stopBroker(KopProtocolHandlerTestBase.java:415) at app//io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase.internalCleanup(KopProtocolHandlerTestBase.java:[379](https://github.com/datastax/starlight-for-kafka/actions/runs/4225957299/jobs/7338864110#step:7:380)) at app//io.streamnative.pulsar.handlers.kop.KafkaListenerNameTest.testConnectListenerNotExist(KafkaListenerNameTest.java:211) at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at [email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at [email protected]/java.lang.reflect.Method.invoke(Method.java:566) at app//org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124) at app//org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54) at app//org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44) at [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264) at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at [email protected]/java.lang.Thread.run(Thread.java:829) ``` ### Solution The ProtocolHandler API should provide a closeAsync() method instead of a blocking close(). We could make the API backward compatible by leveraging Java default methods: ``` public default CompletableFuture<?> closeAsync() { CompletableFuture<?> result = new CompletableFuture<>(); try { this.close(); result.complete(null); } catch (Throwable t) { // TODO handle InterruptedException here result.completeExceptionally(t); } } ``` ### Alternatives Remove the close() API at all and break compatibility. Rejected because there are already a few ProtocolHandlers and we would make harm to the community by breaking the compatibility. ### Anything else? We should port this little API change to stable branches, especially 2.10.x that is the latest version that support JDK8. This change is needed in order to enhance the shutdown procedure, that could lead to huge latency spikes. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
