This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0923eff32835162953d010da69f3df6e72039b4
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 13 00:47:04 2024 +0800

    [improve] [client]Add new ServiceUrlProvider implementation: 
SameAuthParamsAutoClusterFailover (#23129)
---
 .../client/api/NonDurableSubscriptionTest.java     | 26 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 20407295ccb..8bd95bc93d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
@@ -45,10 +45,12 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandFlow;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
+import org.testng.AssertJUnit;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -541,18 +543,34 @@ public class NonDurableSubscriptionTest extends 
ProducerConsumerBase {
                     .getStats(topicName, true, true, 
true).getSubscriptions().get("s1");
             log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
             assertEquals(subscriptionStats.getMsgBacklog(), 0);
-            ManagedLedgerInternalStats.CursorStats cursorStats =
-                    
admin.topics().getInternalStats(topicName).cursors.get("s1");
+            PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topicName);
+            ManagedLedgerInternalStats.CursorStats cursorStats = 
internalStats.cursors.get("s1");
             String[] ledgerIdAndEntryId = 
cursorStats.markDeletePosition.split(":");
             PositionImpl actMarkDeletedPos =
                     PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), 
Long.valueOf(ledgerIdAndEntryId[1]));
             PositionImpl expectedMarkDeletedPos =
                     PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), 
msgIdInDeletedLedger5.getEntryId());
+            log.info("LAC: {}", internalStats.lastConfirmedEntry);
             log.info("Expected mark deleted position: {}", 
expectedMarkDeletedPos);
             log.info("Actual mark deleted position: {}", 
cursorStats.markDeletePosition);
-            assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 
0);
+            
AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 
0);
         });
 
+        admin.topics().createSubscription(topicName, "s2", MessageId.earliest);
+        admin.topics().createSubscription(topicName, "s3", MessageId.latest);
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topicName);
+        ManagedLedgerInternalStats.CursorStats cursorStats2 = 
internalStats.cursors.get("s2");
+        String[] ledgerIdAndEntryId2 = 
cursorStats2.markDeletePosition.split(":");
+        PositionImpl actMarkDeletedPos2 =
+                PositionImpl.get(Long.valueOf(ledgerIdAndEntryId2[0]), 
Long.valueOf(ledgerIdAndEntryId2[1]));
+        ManagedLedgerInternalStats.CursorStats cursorStats3 = 
internalStats.cursors.get("s3");
+        String[] ledgerIdAndEntryId3 = 
cursorStats3.markDeletePosition.split(":");
+        PositionImpl actMarkDeletedPos3 =
+                PositionImpl.get(Long.valueOf(ledgerIdAndEntryId3[0]), 
Long.valueOf(ledgerIdAndEntryId3[1]));
+        log.info("LAC: {}", internalStats.lastConfirmedEntry);
+        log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2);
+        log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3);
+        pulsar.getBrokerService().getTopic(topicName, false).join().get();
         // cleanup.
         reader.close();
         producer.close();

Reply via email to