poorbarcode opened a new pull request, #23853: URL: https://github.com/apache/pulsar/pull/23853
### Motivation **background 1:** producer's reconnection<sup>[1]</sup> - set its connection to `null` - check the state is not `closing | closed` - set the state to `connecting` **background 2:** steps of producer closing<sup>[2]</sup> - if its `connection` is null: - fails all pending sends - set state to `closed` - if its `connection` is present: - remove producer in broker - remove the producer in the connection - ails all pending sends - set state to `closed` --- **Issue 1: closed producers were reverted mistakenly**: the steps to reproduce the issue are as follows | time | thread `reconnection` | thread `close producer` | | --- | --- | --- | | 1 | set its connection to `null` | | | 2 | check state is not `closing | closed` | | | 3 | | set state to `closing` | | 4 | | connection is `null` now | | 5 | resend pending messages | fails all pending sends | | 6 | encounters a recycled pending message | | 7 | | set state to `close` | | 8 | set state to `connecting` | | 9 | reconnect successfully | --- **Issue 2: resending messages encountered a recycled pending message** | time | thread `reconnection` | thread `close producer` | | --- | --- | --- | | 1 | set its connection to `null` | | | 2 | check state is not `closing | closed` | | | 3 | | set state to `closing` | | 4 | | connection is `null` now | | 5 | set state to `connecting` | | 6 | reconnect successfully | | 7 | resend pending messages | fails all pending sends | | 8 | encounters a recycled pending message | | 9 | | set state to `close`| --- - **[1]:** producer's reconnection - https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L194-L201 ```java public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDelayMs, Optional<URI> hostUrl) { if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) { // set its connection to `null` if (!isValidStateForReconnection()) { // check the state is not `closing | closed` return; } state.setState(State.Connecting); // set the state to `connecting` grabCnx(hostUrl); // do reconnect. } ``` - **[2]**: producer's closing - https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1170-L1174 ``` if (cnx == null || currentState != State.Ready) { log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName); closeAndClearPendingMessages(); return CompletableFuture.completedFuture(null); } ``` **logs that encountered the issue 2** ``` 2025-01-09T11:04:41,172+0000 [pulsar-web-43-5] WARN org.apache.pulsar.client.impl.ProducerImpl - [persistent://xxx/xxx/xx-partition-0] [pulsar.repl.prod-->prod-repl] Got exception while completing the callback for msg 458: java.lang.NullPointerException: Cannot read field "replicatorId" because "x0" is null at org.apache.pulsar.broker.service.persistent.PersistentReplicator.access$000(PersistentReplicator.java:71) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.service.persistent.PersistentReplicator$ProducerSendCallback.sendComplete(PersistentReplicator.java:356) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1526) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:2077) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8] at java.util.ArrayDeque.forEach(ArrayDeque.java:888) ~[?:?] at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1617) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:2067) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.client.impl.ProducerImpl.closeAndClearPendingMessages(ProducerImpl.java:1113) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.client.impl.ProducerImpl.closeAsync(ProducerImpl.java:1080) ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.service.AbstractReplicator.doCloseProducerAsync(AbstractReplicator.java:365) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.service.AbstractReplicator.terminate(AbstractReplicator.java:387) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$close$47(PersistentTopic.java:1598) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:563) ~[io.streamnative-pulsar-common-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:277) ~[io.streamnative-pulsar-common-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.service.persistent.PersistentTopic.close(PersistentTopic.java:1598) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadNonPartitionedTopicAsync$126(PersistentTopicsBase.java:1174) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?] at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadNonPartitionedTopicAsync(PersistentTopicsBase.java:1174) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadTopic$110(PersistentTopicsBase.java:962) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) ~[?:?] at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735) ~[?:?] at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) ~[?:?] at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadTopic(PersistentTopicsBase.java:956) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.admin.v2.PersistentTopics.unloadTopic(PersistentTopics.java:1120) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:569) ~[?:?] at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?] at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684) ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?] at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?] at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:359) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:312) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?] at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?] at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:66) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.apache.pulsar.broker.web.AuthenticationFilter.doFilter(AuthenticationFilter.java:65) ~[io.streamnative-pulsar-broker-common-3.0.6.8.jar:3.0.6.8] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.apache.pulsar.broker.intercept.BrokerInterceptor.onFilter(BrokerInterceptor.java:224) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.apache.pulsar.broker.web.ProcessHandlerFilter.doFilter(ProcessHandlerFilter.java:46) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.apache.pulsar.broker.web.PreInterceptFilter.doFilter(PreInterceptFilter.java:73) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.apache.pulsar.broker.web.WebService$FilterInitializer$WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter.doFilter(WebService.java:325) ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) ~[org.eclipse.jetty-jetty-servlets-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:722) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) ~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) ~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) ~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208] at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409) ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final] at java.lang.Thread.run(Thread.java:840) ~[?:?] 2025-01-09T11:04:41,171+0000 [pulsar-web-43-5] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://xxx/xxx/xxx-partition-0] [pulsar.repl.prod-->prod-repl] Closed Producer (not connected) ``` ### Modifications - update `state` atomically when calling `reconnection`, see https://github.com/apache/pulsar/compare/master...poorbarcode:fix/producer_race_condition?expand=1#diff-bcd53f63180847515f1fe1d5b00deb218d023cbfe9cbfade19b44c2babd734ffR194-L196 - adds lost locks when modifying `producer. pendingMessages` - move `fails all pending sends when producer.connection is null` to reconnect successfully, which was introduced by https://github.com/apache/pulsar/pull/23761 ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org