This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c96a36c587 [fix] [admin] Fix get topic stats fail if a subscription
catch up concurrently (#20971)
7c96a36c587 is described below
commit 7c96a36c58768e71cc445371bb3d98c5ac6e05cd
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 21 04:49:30 2023 +0800
[fix] [admin] Fix get topic stats fail if a subscription catch up
concurrently (#20971)
### Motivation
**Background**: when calling `pulsar-admin topics stats
--get-earliest-time-in-backlog <topic name>`, Pulsar will read the first entry
which is not acknowledged, and respond with the entry write time. The flow is
like this:
- get the mark deleted position of the subscription
- if no backlog, response `-1`
- else read the next position of the mark deleted position, and respond
with the entry write time.
**Issue**: if the command `pulsar-admin topics stats
--get-earliest-time-in-backlog <topic name>` and `consumer.acknowledge` are
executed at the same time, the step 2 in above flow will get a position which
is larger than the last confirmed position, lead a read entry error.
| time | `pulsar-admin topics stats --get-earliest-time-in-backlog <topic
name>` | `consumer.acknowledge` |
| --- | --- | --- |
| 1 | mark deleted position is `3:1` and LAC is `3:2` now |
| 2 | the check `whether has backlog` is passed |
| 3 | | acknowledged `3:2`, mark deleted position is `3:2` now |
| 4 | calculate next position: `3:3` |
| 5 | Read `3:3` and get an error: `read entry failed` |
Note: the test in PR is not intended to reproduce the issue.
### Modifications
Respond `-1` if the next position of the mark deleted position is larger
than the LAC
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++++
.../service/persistent/PersistentSubscription.java | 22 ++++++++++--------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 27 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 4 ++--
4 files changed, 46 insertions(+), 11 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 14f4bfed871..c31a0c38cd3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1240,6 +1240,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
PositionImpl nextPos = getNextValidPosition(pos);
+ if (nextPos.compareTo(lastConfirmedEntry) > 0) {
+ return CompletableFuture.completedFuture(-1L);
+ }
+
asyncReadEntry(nextPos, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 009d00cd89c..2f5485afbaa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1160,16 +1160,20 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
} else {
subStats.backlogSize = -1;
}
- if (getEarliestTimeInBacklog && subStats.msgBacklog > 0) {
- ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
- PositionImpl markDeletedPosition = (PositionImpl)
cursor.getMarkDeletedPosition();
- long result = 0;
- try {
- result =
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
- } catch (InterruptedException | ExecutionException e) {
- result = -1;
+ if (getEarliestTimeInBacklog) {
+ if (subStats.msgBacklog > 0) {
+ ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ PositionImpl markDeletedPosition = (PositionImpl)
cursor.getMarkDeletedPosition();
+ long result = 0;
+ try {
+ result =
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
+ } catch (InterruptedException | ExecutionException e) {
+ result = -1;
+ }
+ subStats.earliestMsgPublishTimeInBacklog = result;
+ } else {
+ subStats.earliestMsgPublishTimeInBacklog = -1;
}
- subStats.earliestMsgPublishTimeInBacklog = result;
}
subStats.msgBacklogNoDelayed = subStats.msgBacklog -
subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index d6176966d85..0adf945a555 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
@@ -3191,6 +3192,32 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
newPartitions);
}
+ /**
+ * Validate retring failed partitioned topic should succeed.
+ * @throws Exception
+ */
+ @Test
+ public void testTopicStatsWithEarliestTimeInBacklogIfNoBacklog() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ final String subscriptionName = "s1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // Send one message.
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
+ .create();
+ MessageIdImpl messageId = (MessageIdImpl) producer.send("123");
+ // Catch up.
+ admin.topics().skipAllMessages(topicName, subscriptionName);
+ // Get topic stats with earliestTimeInBacklog
+ TopicStats topicStats = admin.topics().getStats(topicName, false,
false, true);
+
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getEarliestMsgPublishTimeInBacklog(),
-1L);
+
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topicName);
+ }
+
@Test(dataProvider = "topicType")
public void testPartitionedStatsAggregationByProducerName(String
topicType) throws Exception {
restartClusterIfReused();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 342a409c4ae..a4be829b977 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1289,7 +1289,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
TopicStats topicStats = admin.topics().getStats(topic, false, false,
true);
assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0);
-
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
0);
+
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
-1);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), -1);
// publish several messages
@@ -1309,7 +1309,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
topicStats = admin.topics().getStats(topic, false, true, true);
assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0);
-
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
0);
+
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
-1);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 0);
}