This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 606b6a71efd [fix] [test] Revert the modification to
NonDurableSubscriptionTest caused by a mistake in the PR#23129 (#23168)
606b6a71efd is described below
commit 606b6a71efd76d7c695414aa61701c01a645f0f4
Author: fengyubiao <[email protected]>
AuthorDate: Wed Aug 14 20:57:49 2024 +0800
[fix] [test] Revert the modification to NonDurableSubscriptionTest caused
by a mistake in the PR#23129 (#23168)
---
.../client/api/NonDurableSubscriptionTest.java | 33 +++++-----------------
1 file changed, 7 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 80adc79e6fe..bbac688d922 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -34,7 +34,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
-import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -47,12 +46,10 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
-import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -545,34 +542,18 @@ public class NonDurableSubscriptionTest extends
ProducerConsumerBase {
.getStats(topicName, true, true,
true).getSubscriptions().get("s1");
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
- PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName);
- ManagedLedgerInternalStats.CursorStats cursorStats =
internalStats.cursors.get("s1");
+ ManagedLedgerInternalStats.CursorStats cursorStats =
+
admin.topics().getInternalStats(topicName).cursors.get("s1");
String[] ledgerIdAndEntryId =
cursorStats.markDeletePosition.split(":");
- ImmutablePositionImpl actMarkDeletedPos =
- new
ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId[0]),
Long.valueOf(ledgerIdAndEntryId[1]));
- ImmutablePositionImpl expectedMarkDeletedPos =
- new
ImmutablePositionImpl(msgIdInDeletedLedger5.getLedgerId(),
msgIdInDeletedLedger5.getEntryId());
- log.info("LAC: {}", internalStats.lastConfirmedEntry);
+ Position actMarkDeletedPos =
+
PositionFactory.create(Long.valueOf(ledgerIdAndEntryId[0]),
Long.valueOf(ledgerIdAndEntryId[1]));
+ Position expectedMarkDeletedPos =
+
PositionFactory.create(msgIdInDeletedLedger5.getLedgerId(),
msgIdInDeletedLedger5.getEntryId());
log.info("Expected mark deleted position: {}",
expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}",
cursorStats.markDeletePosition);
-
AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >=
0);
+ assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >=
0);
});
- admin.topics().createSubscription(topicName, "s2", MessageId.earliest);
- admin.topics().createSubscription(topicName, "s3", MessageId.latest);
- PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName);
- ManagedLedgerInternalStats.CursorStats cursorStats2 =
internalStats.cursors.get("s2");
- String[] ledgerIdAndEntryId2 =
cursorStats2.markDeletePosition.split(":");
- ImmutablePositionImpl actMarkDeletedPos2 =
- new
ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId2[0]),
Long.valueOf(ledgerIdAndEntryId2[1]));
- ManagedLedgerInternalStats.CursorStats cursorStats3 =
internalStats.cursors.get("s3");
- String[] ledgerIdAndEntryId3 =
cursorStats3.markDeletePosition.split(":");
- ImmutablePositionImpl actMarkDeletedPos3 =
- new
ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId3[0]),
Long.valueOf(ledgerIdAndEntryId3[1]));
- log.info("LAC: {}", internalStats.lastConfirmedEntry);
- log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2);
- log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3);
- pulsar.getBrokerService().getTopic(topicName, false).join().get();
// cleanup.
reader.close();
producer.close();