visortelle commented on issue #22527:
URL: https://github.com/apache/pulsar/issues/22527#issuecomment-2067133567
@ragaur-tibco it seems we both were wrong here and it works as expected.
I'm stupidly bad at Java concurrency, so I rewrote it in ZIO which is
simpler to understand for me.
It seems like the consumer simply doesn't have time to update the list of
topics.
Corrected code:
```scala
topics = Vector(
"persistent://new-tenant/new-namespace/topic-a",
"persistent://new-tenant/new-namespace/topic-b",
"non-persistent://new-tenant/new-namespace/topic-c",
"non-persistent://new-tenant/new-namespace/topic-d"
)
numMessagesPerTopic = 10
// Thread-safe counter
numMessagesReceivedRef <- Ref.make(0)
_ <- ZIO.attempt {
// Cleanup
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.delete(_, true))
pulsarAdmin.topics.getPartitionedTopicList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.deletePartitionedTopic(_, true))
}
consumer <- ZIO.attempt {
pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*".r.pattern)
.subscriptionName("new-subscription")
.patternAutoDiscoveryPeriod(100, TimeUnit.MILLISECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
}
// Consume messages in background
consumeInBackgroundFib <- (for {
isMessageReceived <- ZIO.attempt {
Option(consumer.receive(1, TimeUnit.SECONDS)) match
case None => false
case Some(msg) =>
println(s"Received: ${msg.getValue.mkString(",")}. From
topic: ${msg.getTopicName}")
consumer.acknowledge(msg.getMessageId)
true
}
_ <- numMessagesReceivedRef.update(_ + 1).when(isMessageReceived)
} yield ())
.forever // like `while true`
.fork // Run in background
producers <- ZIO.attempt {
topics.map(topic => pulsarClient.newProducer.topic(topic).create())
}
// Wait for the expected number of consumers
_ <- ZIO.attempt {
// Cast consumer to PatternMultiTopicsConsumerImpl
// that has extra pattern-related methods
val numConsumers = consumer
.asInstanceOf[PatternMultiTopicsConsumerImpl[Array[Byte]]]
.getConsumers
.size
if numConsumers != topics.size
then throw new Exception(s"Expected $topics.size consumers, but got
$numConsumers")
}
.retry(Schedule.exponential(10.millis))
.timeoutFail(new Exception("Consumers weren't created in
time"))(10.seconds)
_ <- ZIO.attempt {
for (i <- 0 until numMessagesPerTopic)
producers.foreach(producer => producer.sendAsync(Array(i.toByte)))
}
// Wait for all messages are be received
_ <- (for {
numMessagesReceived <- numMessagesReceivedRef.get
_ <- ZIO.attempt {
if numMessagesReceived != topics.size * numMessagesPerTopic
then throw new Exception(s"Expected ${topics.size *
numMessagesPerTopic} messages, but got $numMessagesReceived")
}
} yield ())
.retry(Schedule.spaced(250.millis))
.timeoutFail(new Exception("Messages weren't received in
time"))(10.seconds)
numMessagesReceived <- numMessagesReceivedRef.get
_ <- ZIO.logInfo(s"Messages received: $numMessagesReceived")
_ <- consumeInBackgroundFib.join
```
Logs:
```
22:56:51.304 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with
config:
{"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.382 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.415 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx
[id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.549 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[persistent://new-tenant/new-namespace/topic-a] [standalone-0-147] Created
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650]
22:56:51.567 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with
config:
{"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.568 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.573 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx
[id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.644 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[persistent://new-tenant/new-namespace/topic-b] [standalone-0-148] Created
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650]
22:56:51.663 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with
config:
{"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.664 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.668 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on
cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.695 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-149] Created
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650]
22:56:51.719 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with
config:
{"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.720 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.729 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on
cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.755 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ProducerImpl -
[non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-150] Created
producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650]
22:56:52.190 [pulsar-client-io-3-3] WARN
o.a.pulsar.client.impl.ConsumerImpl -
[non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable]
subscription for a NonPersistentTopic, will use [NonDurable] to subscribe.
Subscription name: new-subscription
22:56:52.231 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder
with config:
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdat
ePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.232 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.244 [pulsar-client-io-3-3] WARN
o.a.pulsar.client.impl.ConsumerImpl -
[non-persistent://new-tenant/new-namespace/topic-d] Cannot create a [Durable]
subscription for a NonPersistentTopic, will use [NonDurable] to subscribe.
Subscription name: new-subscription
22:56:52.246 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder
with config:
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdat
ePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.246 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.249 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder
with config:
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePa
rtitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.250 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.263 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder
with config:
{"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePa
rtitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.263 [pulsar-client-io-3-3] INFO
o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config:
{"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"
tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.266 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[non-persistent://new-tenant/new-namespace/topic-c][new-subscription]
Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650], consumerId 0
22:56:52.281 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[non-persistent://new-tenant/new-namespace/topic-d][new-subscription]
Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650], consumerId 1
22:56:52.282 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing
to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650], consumerId 2
22:56:52.283 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing
to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 -
R:localhost/127.0.0.1:6650], consumerId 3
22:56:52.299 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[non-persistent://new-tenant/new-namespace/topic-c][new-subscription]
Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
22:56:52.304 [pulsar-client-io-3-3] INFO
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f]
[new-subscription] Success subscribe new topic
non-persistent://new-tenant/new-namespace/topic-c in topics consumer,
partitions: 0, allTopicPartitionsNumber: 4
22:56:52.304 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[non-persistent://new-tenant/new-namespace/topic-d][new-subscription]
Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
22:56:52.305 [pulsar-client-io-3-3] INFO
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f]
[new-subscription] Success subscribe new topic
non-persistent://new-tenant/new-namespace/topic-d in topics consumer,
partitions: 0, allTopicPartitionsNumber: 4
22:56:52.318 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to
topic on localhost/127.0.0.1:6650 -- consumer: 2
22:56:52.320 [pulsar-client-io-3-3] INFO
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f]
[new-subscription] Success subscribe new topic
persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions:
0, allTopicPartitionsNumber: 4
22:56:52.329 [pulsar-client-io-3-3] INFO
o.a.pulsar.client.impl.ConsumerImpl -
[persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to
topic on localhost/127.0.0.1:6650 -- consumer: 3
22:56:52.330 [pulsar-client-io-3-3] INFO
o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f]
[new-subscription] Success subscribe new topic
persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions:
0, allTopicPartitionsNumber: 4
Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 7. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 8. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 9. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 6. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 8. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 9. From topic: persistent://new-tenant/new-namespace/topic-b
timestamp=2024-04-19T18:56:52.850936Z level=INFO thread=#zio-fiber-178
message="Messages received: 40"
location=consumer.consumer_session.ConsumerSessionTest.spec.runTest
file=ConsumerSessionTest.scala line=145
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]