This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 643ffadb7a8 [fix][broker][branch-2.10] Fix NPE when reset Replicator's 
cursor by position. (#20597) (#20781)
643ffadb7a8 is described below

commit 643ffadb7a868164b62508acef5b3ebb01694965
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Jul 12 12:43:41 2023 +0800

    [fix][broker][branch-2.10] Fix NPE when reset Replicator's cursor by 
position. (#20597) (#20781)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 42 ++++++++++++++--------
 .../persistent/PersistentMessageExpiryMonitor.java | 23 ++++++++++--
 .../service/persistent/PersistentReplicator.java   |  2 +-
 .../service/persistent/PersistentSubscription.java |  2 +-
 .../service/PersistentMessageFinderTest.java       | 20 ++++++++---
 .../pulsar/broker/service/ReplicatorTest.java      | 30 ++++++++++++++++
 6 files changed, 94 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 44a5a890331..da37d1b7e54 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3794,31 +3794,43 @@ public class PersistentTopicsBase extends AdminResource 
{
                 return;
             }
             try {
-                PersistentSubscription sub = topic.getSubscription(subName);
-                if (sub == null) {
-                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                            "Subscription not found"));
-                    return;
+                PersistentSubscription sub = null;
+                PersistentReplicator repl = null;
+
+                if (subName.startsWith(topic.getReplicatorPrefix())) {
+                    String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                    repl = (PersistentReplicator)
+                            topic.getPersistentReplicator(remoteCluster);
+                    if (repl == null) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                "Replicator not found"));
+                        return;
+                    }
+                } else {
+                    sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                
getSubNotFoundErrorMessage(topicName.toString(), subName)));
+                        return;
+                    }
                 }
+
                 CompletableFuture<Integer> batchSizeFuture = new 
CompletableFuture<>();
                 getEntryBatchSize(batchSizeFuture, topic, messageId, 
batchIndex);
+
+                PersistentReplicator finalRepl = repl;
+                PersistentSubscription finalSub = sub;
+
                 batchSizeFuture.thenAccept(bi -> {
                     PositionImpl position = 
calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
                     boolean issued;
                     try {
                         if (subName.startsWith(topic.getReplicatorPrefix())) {
-                            String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                            PersistentReplicator repl = (PersistentReplicator)
-                                    
topic.getPersistentReplicator(remoteCluster);
-                            if (repl == null) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                        "Replicator not found"));
-                                return;
-                            }
-                            issued = repl.expireMessages(position);
+                            issued = finalRepl.expireMessages(position);
                         } else {
-                            issued = sub.expireMessages(position);
+                            issued = finalSub.expireMessages(position);
                         }
+
                         if (issued) {
                             log.info("[{}] Message expire started up to {} on 
{} {}", clientAppId(), position,
                                     topicName, subName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 4335762a21b..35c6b807589 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 public class PersistentMessageExpiryMonitor implements FindEntryCallback {
     private final ManagedCursor cursor;
     private final String subName;
+    private final PersistentTopic topic;
     private final String topicName;
     private final Rate msgExpired;
     private final LongAdder totalMsgExpired;
@@ -57,9 +58,24 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
             expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
             .newUpdater(PersistentMessageExpiryMonitor.class, 
"expirationCheckInProgress");
 
+    public PersistentMessageExpiryMonitor(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
+                                          PersistentSubscription subscription) 
{
+        this.topic = topic;
+        this.topicName = topic.getName();
+        this.cursor = cursor;
+        this.subName = subscriptionName;
+        this.subscription = subscription;
+        this.msgExpired = new Rate();
+        this.totalMsgExpired = new LongAdder();
+        // check to avoid test failures
+        this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != 
null
+                && 
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
+    }
+
     public PersistentMessageExpiryMonitor(String topicName, String 
subscriptionName, ManagedCursor cursor,
                                           PersistentSubscription subscription) 
{
         this.topicName = topicName;
+        this.topic = subscription.topic;
         this.cursor = cursor;
         this.subName = subscriptionName;
         this.subscription = subscription;
@@ -98,11 +114,12 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
 
     public boolean expireMessages(Position messagePosition) {
         // If it's beyond last position of this topic, do nothing.
-        if (((PositionImpl) 
subscription.getTopic().getLastPosition()).compareTo((PositionImpl) 
messagePosition) < 0) {
+        PositionImpl topicLastPosition = (PositionImpl) 
this.topic.getLastPosition();
+        if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore expire-message scheduled task, 
given position {} is beyond "
-                         + "current topic's last position {}", topicName, 
subName, messagePosition,
-                        subscription.getTopic().getLastPosition());
+                                + "current topic's last position {}", 
topicName, subName, messagePosition,
+                        topicLastPosition);
             }
             return false;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 8254e512afc..adfae64489d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -114,7 +114,7 @@ public class PersistentReplicator extends AbstractReplicator
                 replicationClient);
         this.topic = topic;
         this.cursor = cursor;
-        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
+        this.expiryMonitor = new 
PersistentMessageExpiryMonitor((PersistentTopic) localTopic,
                 Codec.decode(cursor.getName()), cursor, null);
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         PENDING_MESSAGES_UPDATER.set(this, 0);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9654915685b..a14a5108dff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -140,7 +140,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.fullName = MoreObjects.toStringHelper(this).add("topic", 
topicName).add("name", subName).toString();
-        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, 
subscriptionName, cursor, this);
+        this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, 
subscriptionName, cursor, this);
         this.setReplicated(replicated);
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? Collections.emptyMap() : 
Collections.unmodifiableMap(subscriptionProperties);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 05c07d8f44e..5287d842727 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -230,7 +231,11 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
                 });
         assertTrue(ex.get());
 
-        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
+        PersistentTopic mock = mock(PersistentTopic.class);
+        when(mock.getName()).thenReturn("topicname");
+        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
         monitor.findEntryFailed(new 
ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
                 Optional.empty(), null);
         Field field = 
monitor.getClass().getDeclaredField("expirationCheckInProgress");
@@ -407,7 +412,11 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         bkc.deleteLedger(ledgers.get(1).getLedgerId());
         bkc.deleteLedger(ledgers.get(2).getLedgerId());
 
-        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
+        PersistentTopic mock = mock(PersistentTopic.class);
+        when(mock.getName()).thenReturn("topicname");
+        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
         Position previousMarkDelete = null;
         for (int i = 0; i < totalEntries; i++) {
             monitor.expireMessages(1);
@@ -444,15 +453,16 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
 
         PersistentSubscription subscription = 
mock(PersistentSubscription.class);
-        Topic topic = mock(Topic.class);
+        PersistentTopic topic = mock(PersistentTopic.class);
         when(subscription.getTopic()).thenReturn(topic);
+        when(topic.getName()).thenReturn("topicname");
 
         for (int i = 0; i < totalEntries; i++) {
             positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + 
i)));
         }
         
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
 
-        PersistentMessageExpiryMonitor monitor = spy(new 
PersistentMessageExpiryMonitor("topicname",
+        PersistentMessageExpiryMonitor monitor = spy(new 
PersistentMessageExpiryMonitor(topic,
                 cursor.getName(), cursor, subscription));
         assertEquals(cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(0).getLedgerId(), -1));
         boolean issued;
@@ -491,7 +501,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         clearInvocations(monitor);
 
         ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
-        PersistentMessageExpiryMonitor mockMonitor = spy(new 
PersistentMessageExpiryMonitor("topicname",
+        PersistentMessageExpiryMonitor mockMonitor = spy(new 
PersistentMessageExpiryMonitor(topic,
                 cursor.getName(), mockCursor, subscription));
         // Not calling findEntryComplete to clear expirationCheckInProgress 
condition, so following call to
         // expire message shouldn't issue.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 158d223336b..85c8eca8fa0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -726,6 +726,36 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertEquals(status.getReplicationBacklog(), 0);
     }
 
+
+    @Test(timeOut = 30000)
+    public void testResetReplicatorSubscriptionPosition() throws Exception {
+        final TopicName dest = TopicName
+                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));
+
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
+
+        // Produce from cluster1 and consume from the rest
+        for (int i = 0; i < 10; i++) {
+            producer1.produce(2);
+        }
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
+
+        PersistentReplicator replicator = (PersistentReplicator) spy(
+                
topic.getReplicators().get(topic.getReplicators().keys().get(0)));
+
+        MessageId id = topic.getLastMessageId().get();
+        admin1.topics().expireMessages(dest.getPartitionedTopicName(),
+                replicator.getCursor().getName(),
+                id,false);
+
+        replicator.updateRates();
+
+        ReplicatorStats status = replicator.getStats();
+        assertEquals(status.getReplicationBacklog(), 0);
+    }
+
     @Test(timeOut = 30000)
     public void testResetCursorNotFail() throws Exception {
 

Reply via email to