Technoboy- opened a new pull request, #20582: URL: https://github.com/apache/pulsar/pull/20582
Cherry-pick #20567 ### Motivation When the `replicator.producer` retries to start<sup>[1]</sup> the topic close<sup>[2]</sup> executed concurrently, there is an orphan replicator after this topic is closed. You can reproduce by the test `testRetryStartProducerStoppedByTopicRemove ` | time | `restart producer of a replicator` | `topic.close` | | --- | --- | --- | | 1 | Compare and set state: `Starting -> Stopped` | | 2 | Add a delay task to try to start the producer again | | 3 | | Close topic | | 4 | The delayed task executed | | 5 | If fail, retry again and loop | **[1]**: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L151 **[2]**: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L163 ### Logs ``` 2023-06-12T06:32:44,968+0000 [bookkeeper-ml-scheduler-OrderedScheduler-6-0] WARN org.apache.bookkeeper.mledger.impl.OpReadEntry - [public/default/persistent/bt1-fen-prod-router-5f9754654d][pulsar.repl.bt3] read failed from ledger at position:14152754:15112 org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException: java.lang.Exception: Attempted to use a fenced managed ledger Caused by: java.lang.Exception: Attempted to use a fenced managed ledger at org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException.<init>(ManagedLedgerException.java:80) ~[io.streamnative-managed-ledger-2.10.3.5.jar:2.10.3.5] at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntries(ManagedLedgerImpl.java:1788) ~[io.streamnative-managed-ledger-2.10.3.5.jar:2.10.3.5] at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.checkForNewEntries(ManagedCursorImpl.java:920) ~[io.streamnative-managed-ledger-2.10.3.5.jar:2.10.3.5] at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReadEntriesOrWait$7(ManagedCursorImpl.java:878) ~[io.streamnative-managed-ledger-2.10.3.5.jar:2.10.3.5] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[org.apache.bookkeeper-bookkeeper-common-4.14.7.jar:4.14.7] at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) ~[org.apache.bookkeeper-bookkeeper-common-4.14.7.jar:4.14.7] at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) ~[org.apache.bookkeeper-bookkeeper-common-4.14.7.jar:4.14.7] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131) ~[com.google.guava-guava-31.0.1-jre.jar:?] at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74) ~[com.google.guava-guava-31.0.1-jre.jar:?] at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82) ~[com.google.guava-guava-31.0.1-jre.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.87.Final.jar:4.1.87.Final] at java.lang.Thread.run(Thread.java:829) ~[?:?] 2023-06-12T06:32:47,668+0000 [pulsar-io-4-18] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x86bba67c, L:/192.168.0.157:49602 - R:bt1-broker-13/192.168.0.15:6651] Received error from server: org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'pulsar.repl.bt1r-->bt1' is already connected to topic 2023-06-12T06:32:47,668+0000 [pulsar-io-4-18] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://public/default/bt3-fen-prod-gke-ane3-a-ping-96f58d448] [pulsar.repl.bt1r-->bt1] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'pulsar.repl.bt1r-->bt1' is already connected to topic","reqId":4570674733310375089, "remote":"bt1-broker-13/192.168.0.15:6651", "local":"/192.168.0.157:49602"} 2023-06-12T06:32:47,668+0000 [pulsar-io-4-18] WARN org.apache.pulsar.broker.service.AbstractReplicator - [persistent://public/default/bt3-fen-prod-gke-ane3-a-ping-96f58d448][bt1r -> bt1] Failed to create remote producer (org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'pulsar.repl.bt1r-->bt1' is already connected to topic","reqId":4570674733310375089, "remote":"bt1-broker-13/192.168.0.15:6651", "local":"/192.168.0.157:49602"}), retrying in 55.709 s ``` ### Another case in heap dump We can see that there still have a delayed task to start the `replicator.producer` after the topic closing.  ### Modifications If the topic was already closed, stop to retry. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
