This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 168f50cca9c [fix][broker] Fix skip message API when hole messages
exists (#20326)
168f50cca9c is described below
commit 168f50cca9c633e5dd95c39c2cc07bac37869d3c
Author: crossoverJie <[email protected]>
AuthorDate: Thu Jun 1 11:48:11 2023 +0800
[fix][broker] Fix skip message API when hole messages exists (#20326)
(cherry picked from commit c35b820bb323c8e52bd9cd8ccd29565c23764117)
(cherry picked from commit 967e1e10f1d7a06c862d6a240698685809403d31)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 1 -
.../apache/pulsar/broker/admin/AdminApiTest.java | 49 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2d2597e15e2..3196a6f5c96 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1585,7 +1585,6 @@ public class ManagedCursorImpl implements ManagedCursor {
} finally {
if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
((PositionImplRecyclable) r.lowerEndpoint()).recycle();
- ((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
}
}, recyclePositionRangeConverter);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ca88b31c338..b979df7d2b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -53,11 +53,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import lombok.Builder;
import lombok.Cleanup;
+import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
@@ -87,6 +89,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -896,6 +899,52 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getList("prop-xyz/ns1"),
Lists.newArrayList());
}
+ @Test(dataProvider = "topicName")
+ public void testSkipHoleMessages(String topicName) throws Exception {
+ final String subName = topicName;
+ assertEquals(admin.topics().getList("prop-xyz/ns1"), new
ArrayList<>());
+
+ final String persistentTopicName = "persistent://prop-xyz/ns1/" +
topicName;
+ // Force to create a topic
+ publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" +
topicName, 0);
+ assertEquals(admin.topics().getList("prop-xyz/ns1"),
+ Collections.singletonList("persistent://prop-xyz/ns1/" +
topicName));
+
+ // create consumer and subscription
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getWebServiceAddress())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ AtomicInteger total = new AtomicInteger();
+ Consumer<byte[]> consumer =
client.newConsumer().topic(persistentTopicName)
+ .messageListener(new MessageListener<byte[]>() {
+ @SneakyThrows
+ @Override
+ public void received(Consumer<byte[]> consumer,
Message<byte[]> msg) {
+ if (total.get() %2 !=0){
+ // artificially created 50 hollow messages
+ consumer.acknowledge(msg);
+ }
+ total.incrementAndGet();
+ }
+ })
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Exclusive).subscribe();
+
+ assertEquals(admin.topics().getSubscriptions(persistentTopicName),
Collections.singletonList(subName));
+
+ publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" +
topicName, 100);
+ TimeUnit.SECONDS.sleep(2);
+ TopicStats topicStats = admin.topics().getStats(persistentTopicName);
+ long msgBacklog =
topicStats.getSubscriptions().get(subName).getMsgBacklog();
+ log.info("back={}",msgBacklog);
+ int skipNumber = 20;
+ admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
+ topicStats = admin.topics().getStats(persistentTopicName);
+
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(),
msgBacklog - skipNumber);
+ }
+
@Test(dataProvider = "topicNamesForAllTypes")
public void partitionedTopics(String topicType, String topicName) throws
Exception {
final String namespace = "prop-xyz/ns1";