crossoverJie opened a new issue, #20262:
URL: https://github.com/apache/pulsar/issues/20262

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   latest version
   
   ### Minimal reproduce step
   
   ```java
       @Test(dataProvider = "topicName")
       public void testSkipMessages(String topicName) throws Exception {
           final String subName = topicName;
           assertEquals(admin.topics().getList("prop-xyz/ns1"), new 
ArrayList<>());
   
           final String persistentTopicName = "persistent://prop-xyz/ns1/" + 
topicName;
           // Force to create a topic
           publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + 
topicName, 0);
           assertEquals(admin.topics().getList("prop-xyz/ns1"),
                   List.of("persistent://prop-xyz/ns1/" + topicName));
   
           // create consumer and subscription
           @Cleanup
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl(pulsar.getWebServiceAddress())
                   .statsInterval(0, TimeUnit.SECONDS)
                   .build();
           AtomicInteger total = new AtomicInteger();
           Consumer<byte[]> consumer = 
client.newConsumer().topic(persistentTopicName)
                   .messageListener(new MessageListener<byte[]>() {
                       @SneakyThrows
                       @Override
                       public void received(Consumer<byte[]> consumer, 
Message<byte[]> msg) {
                           if (total.get() %2 !=0){
                               
log.info("msg_id{}",msg.getMessageId().toString());
                               consumer.acknowledge(msg);
                           }
                           total.incrementAndGet();
                       }
                   })
                   .subscriptionName(subName)
                   .subscriptionType(SubscriptionType.Exclusive).subscribe();
   
           assertEquals(admin.topics().getSubscriptions(persistentTopicName), 
List.of(subName));
   
           publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + 
topicName, 100);
           TimeUnit.SECONDS.sleep(3);
           TopicStats topicStats = admin.topics().getStats(persistentTopicName);
           long msgBacklog = 
topicStats.getSubscriptions().get(subName).getMsgBacklog();
           log.info("back={}",msgBacklog);
           int skipNumber = 20;
           admin.topics().skipMessages(persistentTopicName, subName, 
skipNumber);
           topicStats = admin.topics().getStats(persistentTopicName);
           
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 
msgBacklog - skipNumber);
       }
   ```
   
   **When I artificially created 50 hollow messages** and skipped 20 messages, 
only 14 messages were actually skipped.
   
   ### What did you expect to see?
   
   The expected number of skipped messages is equal to the actual number of 
skipped messages.
   
   ### What did you see instead?
   
   The actual number of skips is less than the expected number of skips.
   <img width="241" alt="image" 
src="https://user-images.githubusercontent.com/15684156/236889435-cfb6098b-eb90-42b2-bce1-fc3273fc583e.png";>
   
   
   ### Anything else?
   
   
https://github.com/apache/pulsar/blob/e956db729f5098ff319237fd4220dbfb234c1b18/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1772-L1785
   <img width="1052" alt="image" 
src="https://user-images.githubusercontent.com/15684156/236890402-dd9a5023-e597-432f-b188-abd6443a3965.png";>
   
   The reason for this issue is that the `recycle()` function reuses objects, 
causing the object referenced by r to change during runtime. When the loop 
count is greater than 8, the else block is entered, leading to an incorrect 
calculation of the message count.
   
   
   I believe there are two methods to fix this issue.
   
   The first method is to perform a copy before assignment, similar to the 
following:
   
   ```java
           public void setStartPosition(PositionImpl startPosition) {
               PositionImpl cp = new PositionImpl(startPosition.ledgerId, 
startPosition.entryId);
               this.startPosition = cp;
           }
   
           public void setEndPosition(PositionImpl endPosition) {
               PositionImpl cp = new PositionImpl(endPosition.ledgerId, 
endPosition.entryId);
               this.endPosition = cp;
           }
        
        // state.endPosition = r.lowerEndpoint();
        state.setEndPosition(r.lowerEndpoint());
   
        // state.startPosition = r.upperEndpoint();
        state.setStartPosition(r.upperEndpoint());
   ```
   
   The second method is to remove `recyclePositionRangeConverter`.
   
   <img width="1697" alt="image" 
src="https://user-images.githubusercontent.com/15684156/236891897-138ec453-a2df-4e4b-862d-eb65a437e7c9.png";>
   
   any other suggestions for a better solution?
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to