hellozepp commented on issue #4391: pulsar consumption ack strange behavior URL: https://github.com/apache/pulsar/issues/4391#issuecomment-501591371 Hi~, @KannarFr and @sijie ,I tried write a unit test, but can not reproduce this issure. What have I lost? ` @Test(timeOut = testTimeout) public void testReRunExclusiveAckedNormalTopic() throws Exception { String key = "testExclusiveAckedNormalTopic"; final String topicName = "persistent://prop/use/ns-abc/topic-" + key; final String subscriptionName = "my-ex-subscription-" + key; final String messagePredicate = "my-message-" + key + "-"; final String rerunMessagePredicate = "my-rerun-message-" + key + "-"; final int totalMessages = 15; // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) .receiverQueueSize(50).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .subscriptionType(SubscriptionType.Exclusive).subscribe(); // 3. producer publish messages for (int i = 0; i < totalMessages / 3; i++) { String message = messagePredicate + i; log.info("Producer produced: " + message); producer.send(message.getBytes()); } // 4. Receiver receives the message, do ack Message<byte[]> message = consumer.receive(); while (message != null) { String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); message = consumer.receive(100, TimeUnit.MILLISECONDS); } long size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); consumer.unsubscribe(); consumer.close(); // 5. Simulate ackTimeout Thread.sleep(ackTimeOutMillis); // 6. Create consumer again consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) .receiverQueueSize(50).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .subscriptionType(SubscriptionType.Exclusive).subscribe(); // 7. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { String m = rerunMessagePredicate + i; log.info("Producer produced: " + m); producer.send(m.getBytes()); } // 8. Receiver receives the message, doesn't ack message = consumer.receive(); int redelivered = 0; while (message != null) { redelivered++; String data = new String(message.getData()); log.info("Consumer received : " + data); message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered, 5); size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); assertEquals(size, 5); Thread.sleep(ackTimeOutMillis); // 9. Receiver receives redelivered messages message = consumer.receive(); int redelivered1 = 0; while (message != null) { redelivered1++; String data = new String(message.getData()); log.info("Consumer received : " + data); consumer.acknowledge(message); message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered1, 5); size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); } ` The unit test can be passed.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
