poorbarcode opened a new pull request, #23853:
URL: https://github.com/apache/pulsar/pull/23853

   ### Motivation
   
   **background 1:** producer's reconnection<sup>[1]</sup>
   - set its connection to `null`
   - check the state is not `closing | closed` 
   - set the state to `connecting`
   
   **background 2:** steps of producer closing<sup>[2]</sup>
   - if its `connection` is null:
     - fails all pending sends
     - set state to `closed`
   - if its `connection` is present:
     - remove producer in broker
     - remove the producer in the connection
     - ails all pending sends
     - set state to `closed`
   
   ---
   
   **Issue 1: closed producers were reverted mistakenly**: the steps to 
reproduce the issue are as follows
   
   | time | thread `reconnection` | thread `close producer` |
   | --- | --- | --- |
   | 1 | set its connection to `null` | |
   | 2 | check state is not `closing | closed` | |
   | 3 | | set state to `closing` |
   | 4 | | connection is `null` now |
   | 5 | resend pending messages | fails all pending sends |
   | 6 | encounters a recycled pending message | 
   | 7 | | set state to `close` |
   | 8 | set state to `connecting` |
   | 9 | reconnect successfully |
   
   ---
   
   **Issue 2: resending messages encountered a recycled pending message**
   | time | thread `reconnection` | thread `close producer` |
   | --- | --- | --- |
   | 1 | set its connection to `null` | |
   | 2 | check state is not `closing | closed` | |
   | 3 | | set state to `closing` |
   | 4 | | connection is `null` now |
   | 5 | set state to `connecting` |
   | 6 | reconnect successfully |
   | 7 | resend pending messages | fails all pending sends |
   | 8 | encounters a recycled pending message | 
   | 9 | | set state to `close`|
   
   ---
   
   - **[1]:** producer's reconnection
     - 
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L194-L201
   
   ```java
       public void connectionClosed(ClientCnx cnx, Optional<Long> 
initialConnectionDelayMs, Optional<URI> hostUrl) {
           if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) { // set its 
connection to `null`
               if (!isValidStateForReconnection()) { // check the state is not 
`closing | closed` 
                   return;
               }
               state.setState(State.Connecting); // set the state to 
`connecting`
               grabCnx(hostUrl); // do reconnect.
       }
   ```
   - **[2]**: producer's closing
     - 
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1170-L1174
   
   ```
           if (cnx == null || currentState != State.Ready) {
               log.info("[{}] [{}] Closed Producer (not connected)", topic, 
producerName);
               closeAndClearPendingMessages();
               return CompletableFuture.completedFuture(null);
           }
   ``` 
   
   **logs that encountered the issue 2**
   
   ```
   2025-01-09T11:04:41,172+0000 [pulsar-web-43-5] WARN  
org.apache.pulsar.client.impl.ProducerImpl - 
[persistent://xxx/xxx/xx-partition-0] [pulsar.repl.prod-->prod-repl] Got 
exception while completing the callback for msg 458:
   java.lang.NullPointerException: Cannot read field "replicatorId" because 
"x0" is null
        at 
org.apache.pulsar.broker.service.persistent.PersistentReplicator.access$000(PersistentReplicator.java:71)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.service.persistent.PersistentReplicator$ProducerSendCallback.sendComplete(PersistentReplicator.java:356)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1526)
 ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:2077)
 ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
        at java.util.ArrayDeque.forEach(ArrayDeque.java:888) ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1617)
 ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:2067)
 ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.client.impl.ProducerImpl.closeAndClearPendingMessages(ProducerImpl.java:1113)
 ~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.client.impl.ProducerImpl.closeAsync(ProducerImpl.java:1080) 
~[io.streamnative-pulsar-client-original-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.service.AbstractReplicator.doCloseProducerAsync(AbstractReplicator.java:365)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.service.AbstractReplicator.terminate(AbstractReplicator.java:387)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$close$47(PersistentTopic.java:1598)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:563)
 ~[io.streamnative-pulsar-common-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:277)
 ~[io.streamnative-pulsar-common-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.service.persistent.PersistentTopic.close(PersistentTopic.java:1598)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadNonPartitionedTopicAsync$126(PersistentTopicsBase.java:1174)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) 
~[?:?]
        at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadNonPartitionedTopicAsync(PersistentTopicsBase.java:1174)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalUnloadTopic$110(PersistentTopicsBase.java:962)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:757) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:735)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2182) 
~[?:?]
        at 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalUnloadTopic(PersistentTopicsBase.java:956)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.admin.v2.PersistentTopics.unloadTopic(PersistentTopics.java:1120)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 ~[?:?]
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:569) ~[?:?]
        at 
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256) 
~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) 
~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) 
~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292) 
~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274) 
~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:244) 
~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
        at 
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
 ~[org.glassfish.jersey.core-jersey-common-2.42.jar:?]
        at 
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235) 
~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
 ~[org.glassfish.jersey.core-jersey-server-2.42.jar:?]
        at 
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) 
~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
        at 
org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) 
~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
        at 
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:359)
 ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
        at 
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:312)
 ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
        at 
org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
 ~[org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar:?]
        at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1656)
 ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.apache.pulsar.broker.web.ResponseHandlerFilter.doFilter(ResponseHandlerFilter.java:66)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
 ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.apache.pulsar.broker.web.AuthenticationFilter.doFilter(AuthenticationFilter.java:65)
 ~[io.streamnative-pulsar-broker-common-3.0.6.8.jar:3.0.6.8]
        at 
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
 ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.apache.pulsar.broker.intercept.BrokerInterceptor.onFilter(BrokerInterceptor.java:224)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.apache.pulsar.broker.web.ProcessHandlerFilter.doFilter(ProcessHandlerFilter.java:46)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
 ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.apache.pulsar.broker.web.PreInterceptFilter.doFilter(PreInterceptFilter.java:73)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
 ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.apache.pulsar.broker.web.WebService$FilterInitializer$WaitUntilPulsarServiceIsReadyForIncomingRequestsFilter.doFilter(WebService.java:325)
 ~[io.streamnative-pulsar-broker-3.0.6.8.jar:3.0.6.8]
        at 
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
 ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at org.eclipse.jetty.servlets.QoSFilter.doFilter(QoSFilter.java:202) 
~[org.eclipse.jetty-jetty-servlets-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)
 ~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505) 
~[org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:722) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
 ~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at org.eclipse.jetty.server.Server.handle(Server.java:516) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) 
~[org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
 ~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208]
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) 
~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208]
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) 
~[org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
 ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
 ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
 ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
 ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
 ~[org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar:9.4.54.v20240208]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.113.Final.jar:4.1.113.Final]
        at java.lang.Thread.run(Thread.java:840) ~[?:?]
   
   2025-01-09T11:04:41,171+0000 [pulsar-web-43-5] INFO  
org.apache.pulsar.client.impl.ProducerImpl - 
[persistent://xxx/xxx/xxx-partition-0] [pulsar.repl.prod-->prod-repl] Closed 
Producer (not connected)
   ```
   
   ### Modifications
   - update `state` atomically when calling `reconnection`, see 
https://github.com/apache/pulsar/compare/master...poorbarcode:fix/producer_race_condition?expand=1#diff-bcd53f63180847515f1fe1d5b00deb218d023cbfe9cbfade19b44c2babd734ffR194-L196
   - adds lost locks when modifying `producer. pendingMessages`
   - move `fails all pending sends when producer.connection is null` to 
reconnect successfully, which was introduced by 
https://github.com/apache/pulsar/pull/23761
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: x
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to