crossoverJie opened a new issue, #20262: URL: https://github.com/apache/pulsar/issues/20262
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version latest version ### Minimal reproduce step ```java @Test(dataProvider = "topicName") public void testSkipMessages(String topicName) throws Exception { final String subName = topicName; assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>()); final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName; // Force to create a topic publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0); assertEquals(admin.topics().getList("prop-xyz/ns1"), List.of("persistent://prop-xyz/ns1/" + topicName)); // create consumer and subscription @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(pulsar.getWebServiceAddress()) .statsInterval(0, TimeUnit.SECONDS) .build(); AtomicInteger total = new AtomicInteger(); Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName) .messageListener(new MessageListener<byte[]>() { @SneakyThrows @Override public void received(Consumer<byte[]> consumer, Message<byte[]> msg) { if (total.get() %2 !=0){ log.info("msg_id{}",msg.getMessageId().toString()); consumer.acknowledge(msg); } total.incrementAndGet(); } }) .subscriptionName(subName) .subscriptionType(SubscriptionType.Exclusive).subscribe(); assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName)); publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 100); TimeUnit.SECONDS.sleep(3); TopicStats topicStats = admin.topics().getStats(persistentTopicName); long msgBacklog = topicStats.getSubscriptions().get(subName).getMsgBacklog(); log.info("back={}",msgBacklog); int skipNumber = 20; admin.topics().skipMessages(persistentTopicName, subName, skipNumber); topicStats = admin.topics().getStats(persistentTopicName); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber); } ``` **When I artificially created 50 hollow messages** and skipped 20 messages, only 14 messages were actually skipped. ### What did you expect to see? The expected number of skipped messages is equal to the actual number of skipped messages. ### What did you see instead? The actual number of skips is less than the expected number of skips. <img width="241" alt="image" src="https://user-images.githubusercontent.com/15684156/236889435-cfb6098b-eb90-42b2-bce1-fc3273fc583e.png"> ### Anything else? https://github.com/apache/pulsar/blob/e956db729f5098ff319237fd4220dbfb234c1b18/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1772-L1785 <img width="1052" alt="image" src="https://user-images.githubusercontent.com/15684156/236890402-dd9a5023-e597-432f-b188-abd6443a3965.png"> The reason for this issue is that the `recycle()` function reuses objects, causing the object referenced by r to change during runtime. When the loop count is greater than 8, the else block is entered, leading to an incorrect calculation of the message count. I believe there are two methods to fix this issue. The first method is to perform a copy before assignment, similar to the following: ```java public void setStartPosition(PositionImpl startPosition) { PositionImpl cp = new PositionImpl(startPosition.ledgerId, startPosition.entryId); this.startPosition = cp; } public void setEndPosition(PositionImpl endPosition) { PositionImpl cp = new PositionImpl(endPosition.ledgerId, endPosition.entryId); this.endPosition = cp; } // state.endPosition = r.lowerEndpoint(); state.setEndPosition(r.lowerEndpoint()); // state.startPosition = r.upperEndpoint(); state.setStartPosition(r.upperEndpoint()); ``` The second method is to remove `recyclePositionRangeConverter`. <img width="1697" alt="image" src="https://user-images.githubusercontent.com/15684156/236891897-138ec453-a2df-4e4b-862d-eb65a437e7c9.png"> any other suggestions for a better solution? ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
