visortelle commented on issue #22527:
URL: https://github.com/apache/pulsar/issues/22527#issuecomment-2067133567

   @ragaur-tibco it seems we both were wrong here and it works as expected.
   
   I'm stupidly bad at Java concurrency, so I rewrote it in ZIO which is 
simpler to understand for me.
   
   It seems like the consumer simply doesn't have time to update the list of 
topics.
   
   Corrected code:
   
   ```scala
   
   topics = Vector(
       "persistent://new-tenant/new-namespace/topic-a",
       "persistent://new-tenant/new-namespace/topic-b",
       "non-persistent://new-tenant/new-namespace/topic-c",
       "non-persistent://new-tenant/new-namespace/topic-d"
   )
   numMessagesPerTopic = 10
   
   // Thread-safe counter
   numMessagesReceivedRef <- Ref.make(0)
   
   _ <- ZIO.attempt {
       // Cleanup
       pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
           .foreach(pulsarAdmin.topics.delete(_, true))
       
pulsarAdmin.topics.getPartitionedTopicList("new-tenant/new-namespace").asScala
           .foreach(pulsarAdmin.topics.deletePartitionedTopic(_, true))
   }
   
   consumer <- ZIO.attempt {
       pulsarClient.newConsumer()
           .topicsPattern("new-tenant/new-namespace/.*".r.pattern)
           .subscriptionName("new-subscription")
           .patternAutoDiscoveryPeriod(100, TimeUnit.MILLISECONDS)
           .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
           .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
           .subscribe()
   }
   
   // Consume messages in background
   consumeInBackgroundFib <- (for {
       isMessageReceived <- ZIO.attempt {
           Option(consumer.receive(1, TimeUnit.SECONDS)) match
               case None => false
               case Some(msg) =>
                   println(s"Received: ${msg.getValue.mkString(",")}. From 
topic: ${msg.getTopicName}")
                   consumer.acknowledge(msg.getMessageId)
                   true
       }
   
       _ <- numMessagesReceivedRef.update(_ + 1).when(isMessageReceived)
   } yield ())
       .forever // like `while true`
       .fork // Run in background
   
   producers <- ZIO.attempt {
       topics.map(topic => pulsarClient.newProducer.topic(topic).create())
   }
   
   // Wait for the expected number of consumers
   _ <- ZIO.attempt {
       // Cast consumer to PatternMultiTopicsConsumerImpl
       // that has extra pattern-related methods
       val numConsumers = consumer
           .asInstanceOf[PatternMultiTopicsConsumerImpl[Array[Byte]]]
           .getConsumers
           .size
   
       if numConsumers != topics.size
       then throw new Exception(s"Expected $topics.size consumers, but got 
$numConsumers")
   }
       .retry(Schedule.exponential(10.millis))
       .timeoutFail(new Exception("Consumers weren't created in 
time"))(10.seconds)
   
   _ <- ZIO.attempt {
       for (i <- 0 until numMessagesPerTopic)
           producers.foreach(producer => producer.sendAsync(Array(i.toByte)))
   }
   
   // Wait for all messages are be received
   _ <- (for {
       numMessagesReceived <- numMessagesReceivedRef.get
       _ <- ZIO.attempt {
           if numMessagesReceived != topics.size * numMessagesPerTopic
           then throw new Exception(s"Expected ${topics.size * 
numMessagesPerTopic} messages, but got $numMessagesReceived")
       }
   } yield ())
       .retry(Schedule.spaced(250.millis))
       .timeoutFail(new Exception("Messages weren't received in 
time"))(10.seconds)
   
   numMessagesReceived <- numMessagesReceivedRef.get
   _ <- ZIO.logInfo(s"Messages received: $numMessagesReceived")
   
   _ <- consumeInBackgroundFib.join
   ```
   
   Logs:
   
   ```
   22:56:51.304 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with 
config: 
{"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
   22:56:51.382 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:51.415 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx 
[id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
   22:56:51.549 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[persistent://new-tenant/new-namespace/topic-a] [standalone-0-147] Created 
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650]
   22:56:51.567 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with 
config: 
{"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
   22:56:51.568 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:51.573 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx 
[id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
   22:56:51.644 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[persistent://new-tenant/new-namespace/topic-b] [standalone-0-148] Created 
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650]
   22:56:51.663 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with 
config: 
{"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
   22:56:51.664 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:51.668 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on 
cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
   22:56:51.695 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-149] Created 
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650]
   22:56:51.719 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with 
config: 
{"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
   22:56:51.720 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:51.729 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on 
cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
   22:56:51.755 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-150] Created 
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650]
   22:56:52.190 [pulsar-client-io-3-3] WARN  
o.a.pulsar.client.impl.ConsumerImpl - 
[non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable] 
subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. 
Subscription name: new-subscription
   22:56:52.231 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder 
with config: 
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdat
 
ePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
   22:56:52.232 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:52.244 [pulsar-client-io-3-3] WARN  
o.a.pulsar.client.impl.ConsumerImpl - 
[non-persistent://new-tenant/new-namespace/topic-d] Cannot create a [Durable] 
subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. 
Subscription name: new-subscription
   22:56:52.246 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder 
with config: 
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdat
 
ePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
   22:56:52.246 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:52.249 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder 
with config: 
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePa
 
rtitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
   22:56:52.250 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:52.263 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder 
with config: 
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePa
 
rtitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
   22:56:52.263 [pulsar-client-io-3-3] INFO  
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: 
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
 
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
   22:56:52.266 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[non-persistent://new-tenant/new-namespace/topic-c][new-subscription] 
Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650], consumerId 0
   22:56:52.281 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[non-persistent://new-tenant/new-namespace/topic-d][new-subscription] 
Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650], consumerId 1
   22:56:52.282 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing 
to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650], consumerId 2
   22:56:52.283 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing 
to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - 
R:localhost/127.0.0.1:6650], consumerId 3
   22:56:52.299 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[non-persistent://new-tenant/new-namespace/topic-c][new-subscription] 
Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
   22:56:52.304 [pulsar-client-io-3-3] INFO  
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] 
[new-subscription] Success subscribe new topic 
non-persistent://new-tenant/new-namespace/topic-c in topics consumer, 
partitions: 0, allTopicPartitionsNumber: 4
   22:56:52.304 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[non-persistent://new-tenant/new-namespace/topic-d][new-subscription] 
Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
   22:56:52.305 [pulsar-client-io-3-3] INFO  
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] 
[new-subscription] Success subscribe new topic 
non-persistent://new-tenant/new-namespace/topic-d in topics consumer, 
partitions: 0, allTopicPartitionsNumber: 4
   22:56:52.318 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to 
topic on localhost/127.0.0.1:6650 -- consumer: 2
   22:56:52.320 [pulsar-client-io-3-3] INFO  
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] 
[new-subscription] Success subscribe new topic 
persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 
0, allTopicPartitionsNumber: 4
   22:56:52.329 [pulsar-client-io-3-3] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to 
topic on localhost/127.0.0.1:6650 -- consumer: 3
   22:56:52.330 [pulsar-client-io-3-3] INFO  
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] 
[new-subscription] Success subscribe new topic 
persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 
0, allTopicPartitionsNumber: 4
   Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 0. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-d
   Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-c
   Received: 0. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 1. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 2. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 3. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 4. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 5. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 6. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 7. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 8. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 9. From topic: persistent://new-tenant/new-namespace/topic-a
   Received: 1. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 2. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 3. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 4. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 5. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 6. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 7. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 8. From topic: persistent://new-tenant/new-namespace/topic-b
   Received: 9. From topic: persistent://new-tenant/new-namespace/topic-b
   timestamp=2024-04-19T18:56:52.850936Z level=INFO thread=#zio-fiber-178 
message="Messages received: 40" 
location=consumer.consumer_session.ConsumerSessionTest.spec.runTest 
file=ConsumerSessionTest.scala line=145
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to