youurayy opened a new issue #6157: Failing to cumulative acknowledge messages as old as 15 seconds URL: https://github.com/apache/pulsar/issues/6157 Observed behavior: Topic backlog is filling up and never cleared. Expected behavior: Topic backlog is cleared. In further tests observed to exhibit over as low as 5 seconds / 5 consecutive messages. - Pulsar 2.4.2, cluster of 4 * brokers/proxies/bookies/zk - Single persistent non-partitioned topic - Failover subscription - Single Producer - Single Consumer Namespace creation: ```sh bin/pulsar-admin tenants create cb bin/pulsar-admin namespaces create cb/test bin/pulsar-admin namespaces set-backlog-quota cb/test --limit 300M --policy consumer_backlog_eviction bin/pulsar-admin namespaces set-deduplication cb/test --enable ``` Client (shared code): ```java public class PulsarTestBase { protected SystemLogger logger; protected String clientId; protected PulsarClient pulsar; protected String pulsarUrl = "pulsar://3.234.220.124:30004/"; protected String topicName = "persistent://cb/test/topic-1"; PulsarTestBase(String clientId) { this.clientId = clientId; logger = new SystemLogger.Builder(clientId).withCloudwatch(false).withPrometheus(false).build(); } protected void createClient() throws PulsarClientException { pulsar = PulsarClient.builder() .serviceUrl(pulsarUrl) .connectionsPerBroker(1) // default: 1 .connectionTimeout(10, TimeUnit.SECONDS) .enableTcpNoDelay(true) .keepAliveInterval(999, TimeUnit.DAYS) .maxBackoffInterval(5, TimeUnit.SECONDS) .startingBackoffInterval(1, TimeUnit.SECONDS) .statsInterval(60, TimeUnit.SECONDS) // number of threads to be used for handling connections to brokers .ioThreads(1) // default: 1 // number of threads to be used for message listeners .listenerThreads(1) // default: 1 // number of concurrent lookup-requests allowed to send on each broker-connection .maxConcurrentLookupRequests(5000) // default: 5000 .maxLookupRequests(50000) // default: 50000 // how many broker-rejects within 30 secs before connection to broker is recycled .maxNumberOfRejectedRequestPerConnection(50) // default: 50 // how long to retry a broker op before the op is marked as failed .operationTimeout(30, TimeUnit.SECONDS) // default: 30 sec .build(); } } ``` Producer code: ```java public class TestProducer extends PulsarTestBase { public static void main(String[] args) throws Exception { new TestProducer(args[0]); } TestProducer(String clientId) throws Exception { super(clientId); createClient(); Producer<Order> producer = pulsar .newProducer(Schema.PROTOBUF(Order.class)) .producerName(clientId) .topic(topicName) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // default: 1 ms .batchingMaxMessages(1000) // default: 1000 // batching disabled, as we produce far more rarely than 1ms range .enableBatching(false) // default: true // block vs throw exception if pending-ack queue is full .blockIfQueueFull(false) // default: false // how many non-acked msgs to buffer before block or throw-exception .maxPendingMessages(1000) // default: 1000 .sendTimeout(30, TimeUnit.SECONDS) // default: 30 sec // .initialSequenceId() // TODO what if non-existent ID passed? .hashingScheme(HashingScheme.JavaStringHash) // default: java string .create(); V3.log("msg", "producing messages"); long counter = 0L; while (true) { Thread.sleep(1000L); V3.log("msg", "producing message", "counter", counter); producer .newMessage() .value(Order.newBuilder().setPrice(1d).setSize(counter).build()) .send(); counter++; } } private final BiConsumer<? super MessageId, ? super Throwable> producerResultHandler = new BiConsumer<MessageId, Throwable>() { @Override public void accept(MessageId messageId, Throwable ex) { if (ex != null) { V3.log(ex, "msg", "pulsar producer submit error", "*metric", "DroppedData", "value", 1); } } }; } ``` Consumer code: ```java public class TestConsumer extends PulsarTestBase { public static void main(String[] args) throws Exception { new TestConsumer(args[0]); } TestConsumer(String clientId) throws Exception { super(clientId); createClient(); Consumer<Order> consumer = pulsar .newConsumer(Schema.PROTOBUF(Order.class)) .consumerName(clientId) .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) .subscriptionType(SubscriptionType.Failover) .subscriptionName("failover-subscription") .topic(topicName) .subscribe(); MessageId firstMsgIdToAck = null; int msgAckDelayCounter = 0; V3.log("msg", "waiting for messages"); while (true) { Message<Order> message = consumer.receive(); Order order = message.getValue(); V3.log( "msg", "received message", "sequenceId", message.getSequenceId(), "price", order.getPrice(), "size", order.getSize(), "producer", message.getProducerName(), "messageId", Pulsar.msgIdToHex(message.getMessageId())); if (firstMsgIdToAck == null) { firstMsgIdToAck = message.getMessageId(); V3.log("msg", "storing ack message id", "messageId", Pulsar.msgIdToHex(firstMsgIdToAck)); } if (++msgAckDelayCounter == 15) { V3.log("msg", "acknowledging message id", "messageId", Pulsar.msgIdToHex(firstMsgIdToAck)); consumer.acknowledgeCumulativeAsync(firstMsgIdToAck); firstMsgIdToAck = null; msgAckDelayCounter = 0; } } } } ``` Producer log screenshot:  Consumer log screenshot:  Admin console screenshot: 
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
