programmerahul opened a new issue, #25294: URL: https://github.com/apache/pulsar/issues/25294
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment Pulsar: 3.0.12 OperatingSytem: Linux Java: 17.0.18 ### Issue Description The topic is not loaded because the reader of __change_events topic is stuck at some offset in compacted ledger. Sequence of events: 1. Producer and consumer clients tries to produce and consume message from topic persistent://t/n/t 2. The topic is not loaded in broker A 3. The produce and consume call triggers topic loading in broker A 4. This triggers the loading of topic policies cache by creating reader of persistent://t/n/__change_events topic 5. The topic persistent://t/n/__change_events is owned by broker B 6. While reading from persistent://t/n/__change_events topic, the connection closes because broker B unloaded that topic (due to restart) 7. The topic persistent://t/n/__change_events is now owned by broker A 8. The reader reconnects after a timeout of 0.1s and triggers subscription creation on persistent://t/n/__change_events at offset (x). 9. This reader just stuck at that offset (x), 10. Whereas all other readers of persistent://t/n/__change_event reconnects at offset(y) and work normally. 11. The compaction horizon at that time was z, such that x<z, y=z 12. The topic never gets loaded until we manually restart broker A, and all produce and consume calls to this topic fails until then. Impacts: This makes all topics of the namespace t/n unavailable which are owned by broker A Further Details: 1. The client gets this error while calling consume (every ~1min for default configurations) - `broker-A x:x:x:x [http-0] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 32, remoteAddress: /x.x.x.x:54598, subscription: s/p1` 2. The producer gets this error (every ~1min for default configurations): - `org.apache.pulsar.client.impl.ProducerImpl - [persistent://t/n/t-partition-0] [p1] Temporary error in creating producer: request timeout {'durationMs': '30000', 'reqId':'2961594912657351684', 'remote':'x.x.x.x/x.x.x.x:6650', 'local':'/x.x.x.x:48342'}` 3. On broker we get following error for producer((every ~1min until broker was restarted): -`12:24:01.620 [pulsar-io-4-9] INFO org.apache.pulsar.broker.service.ServerCnx - [/x.x.x.x:59296] Closed producer before its creation was completed. producerId=220` 4. On broker we get following error for consumer((every ~1min until broker was restarted): -`12:29:04.579 [http-1] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 121, remoteAddress: /x.x.x.x:59016, subscription: s/p1 java.util.concurrent.CompletionException: java.lang.IllegalStateException: Closed consumer before creation was complete Caused by: java.lang.IllegalStateException: Closed consumer before creation was complete` ### Error messages ```text "12:29:04.579 [http-1] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 121, remoteAddress: /x.x.x.x:59016, subscription: s/p1 java.util.concurrent.CompletionException: java.lang.IllegalStateException: Closed consumer before creation was complete at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:413) ~[?:?] at java.util.concurrent.CompletableFuture.getNow(CompletableFuture.java:2134) ~[?:?] at org.apache.pulsar.broker.service.ServerCnx.getErrorCodeWithErrorLog(ServerCnx.java:3339) ~[org.apache.pulsar-pulsar-broker-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT] at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$24(ServerCnx.java:1262) ~[org.apache.pulsar-pulsar-broker-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1163) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.lang.IllegalStateException: Closed consumer before creation was complete at org.apache.pulsar.broker.service.ServerCnx.handleCloseConsumer(ServerCnx.java:2132) ~[org.apache.pulsar-pulsar-broker-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:171) ~[org.apache.pulsar-pulsar-common-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) ~[io.netty-netty-handler-4.1.122.Final.jar:4.1.122.Final] at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164) ~[io.netty-netty-handler-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.122.Final.jar:4.1.122.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[io.netty-netty-codec-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[io.netty-netty-transport-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) ~[io.netty-netty-transport-classes-epoll-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) ~[io.netty-netty-transport-classes-epoll-4.1.122.Final.jar:4.1.122.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) ~[io.netty-netty-transport-classes-epoll-4.1.122.Final.jar:4.1.122.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final] ... 1 more" ``` ### Reproducing the issue Right now we don't know how to reproduce this issue. It has appeared 3 -4 times during broker rollout. This happens to cluster where: - Doing broker rollout - Number of topics per namespace is high ~200, 300 ### Additional information On producer side: - When the producer creation gets timeout due to client side timeout, the producer instance gets removed from producer list in broker. But the future that is present to load the system topic cache is still present. So next call again creates a new producer, and also a new Future to load the system topic cache. So every time when this produce call comes from client, the producer gets created but after timeout the producer gets removed, but not the future. These futures gets accumulated and gets released when we restart the broker, with following error (for all topic which were not loaded and ~50 times each , since there were total ~50 produce call for each topic before the broker was restarted, so we got ~450 instant errors of this type when we restarted the broker): - `13:20:02.306 [broker-client-shared-internal-executor-6-1] ERROR org.apache.pulsar.broker.service.BrokerService - Topic creation encountered an exception by initialize topic policies service. topic_name=persistent://t/n/t-partition-0 error_message=The consumer which subscribes the topic persistent://t/n/__change_events with subscription name reader-4f8aabbb2e was already closed when cleaning and closing the consumers org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: The consumer which subscribes the topic persistent://t/n/__change_events with subscription name reader-4f8aabbb2e was already closed when cleaning and closing the consumers at org.apache.pulsar.client.impl.ConsumerBase.failPendingReceives(ConsumerBase.java:374) ~[org.apache.pulsar-pulsar-client-original-3.0.13-SNAPSHOT.jar:3.0.13-SNAPSHOT] at org.apache.pulsar.client.impl.ConsumerBase.lambda$failPendingReceive$1(ConsumerBase.java:355) ~[org.apache.pulsar-pulsar-client-original-3.0.13-SNAPSHOT .jar:3.0.13-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.122.Final.jar:4.1.122.Final] at java.lang.Thread.run(Thread.java:840) ~[?:?]` - It throws already closed exception on reader because when we restart the broker, it triggered reader.close - This situation means that it results in memory leak, because for every producer creation call, it creates the future and all these future gets accumulated over time On consumer side: - The consumer creation fails with following error: `12:47:01.765 [http-0] ERROR org.apache.pulsar.broker.service.ServerCnx - A failed consumer with id is already present on the connection. consumerId: 75, remoteAddress: /x.x.x.x:58204, subscription: s/p1 java.util.concurrent.CompletionException: java.lang.IllegalStateException: Closed consumer before creation was complete` - Here all consume calls fail directly, because the the broker will not remove the first consumer from consumer list until that creation fails, so no memory leak Topic policy cache loading: - Right now, there is not timeout in "initPolicesCache(reader, stageFuture);" in SystemTopicBasedTopicPoliciesService.java , due to which if a reader is not able to load topic policies, it can get stuck indefinitely and silently. So we propose following changes: We need to add timeout and retries in this call "initPolicesCache" and if it fails after certain retries, then the topic should be unloaded to a different broker, also emit metrics and logs for this failure. ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
