This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 967e1e10f1d [fix][broker] Fix skip message API when hole messages 
exists (#20326)
967e1e10f1d is described below

commit 967e1e10f1d7a06c862d6a240698685809403d31
Author: crossoverJie <[email protected]>
AuthorDate: Thu Jun 1 11:48:11 2023 +0800

    [fix][broker] Fix skip message API when hole messages exists (#20326)
    
    (cherry picked from commit c35b820bb323c8e52bd9cd8ccd29565c23764117)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  1 -
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 49 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 42775ca6404..e8bdc1e474e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1598,7 +1598,6 @@ public class ManagedCursorImpl implements ManagedCursor {
                 } finally {
                     if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
                         ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
-                        ((PositionImplRecyclable) r.upperEndpoint()).recycle();
                     }
                 }
             }, recyclePositionRangeConverter);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 7ea016dffae..70737d8e2ac 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -53,11 +53,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Response.Status;
 import lombok.Builder;
 import lombok.Cleanup;
+import lombok.SneakyThrows;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
@@ -86,6 +88,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -914,6 +917,52 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(admin.topics().getList("prop-xyz/ns1"), 
Lists.newArrayList());
     }
 
+    @Test(dataProvider = "topicName")
+    public void testSkipHoleMessages(String topicName) throws Exception {
+        final String subName = topicName;
+        assertEquals(admin.topics().getList("prop-xyz/ns1"), new 
ArrayList<>());
+
+        final String persistentTopicName = "persistent://prop-xyz/ns1/" + 
topicName;
+        // Force to create a topic
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + 
topicName, 0);
+        assertEquals(admin.topics().getList("prop-xyz/ns1"),
+                Collections.singletonList("persistent://prop-xyz/ns1/" + 
topicName));
+
+        // create consumer and subscription
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        AtomicInteger total = new AtomicInteger();
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(persistentTopicName)
+                .messageListener(new MessageListener<byte[]>() {
+                    @SneakyThrows
+                    @Override
+                    public void received(Consumer<byte[]> consumer, 
Message<byte[]> msg) {
+                        if (total.get() %2 !=0){
+                            // artificially created 50 hollow messages
+                            consumer.acknowledge(msg);
+                        }
+                        total.incrementAndGet();
+                    }
+                })
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Exclusive).subscribe();
+
+        assertEquals(admin.topics().getSubscriptions(persistentTopicName), 
Collections.singletonList(subName));
+
+        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + 
topicName, 100);
+        TimeUnit.SECONDS.sleep(2);
+        TopicStats topicStats = admin.topics().getStats(persistentTopicName);
+        long msgBacklog = 
topicStats.getSubscriptions().get(subName).getMsgBacklog();
+        log.info("back={}",msgBacklog);
+        int skipNumber = 20;
+        admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
+        topicStats = admin.topics().getStats(persistentTopicName);
+        
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 
msgBacklog - skipNumber);
+    }
+
     @Test(dataProvider = "topicNamesForAllTypes")
     public void partitionedTopics(String topicType, String topicName) throws 
Exception {
         final String namespace = "prop-xyz/ns1";

Reply via email to