This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 209b222d502 Revert "[fix][broker] Fix NPE when reset Replicator's 
cursor by position. (#20597)"
209b222d502 is described below

commit 209b222d5025a0e5c78011290a76dd5435b79f56
Author: xiangying <[email protected]>
AuthorDate: Tue Jul 11 19:14:48 2023 +0800

    Revert "[fix][broker] Fix NPE when reset Replicator's cursor by position. 
(#20597)"
    
    This reverts commit 18f89b68e5f273a13aeeda45d7d7815379406053.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 42 ++++++++--------------
 .../persistent/PersistentMessageExpiryMonitor.java | 16 ++++-----
 .../service/persistent/PersistentReplicator.java   |  2 +-
 .../service/persistent/PersistentSubscription.java |  2 +-
 .../service/PersistentMessageFinderTest.java       | 20 +++--------
 .../pulsar/broker/service/ReplicatorTest.java      | 30 ----------------
 6 files changed, 28 insertions(+), 84 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index da37d1b7e54..44a5a890331 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3794,43 +3794,31 @@ public class PersistentTopicsBase extends AdminResource 
{
                 return;
             }
             try {
-                PersistentSubscription sub = null;
-                PersistentReplicator repl = null;
-
-                if (subName.startsWith(topic.getReplicatorPrefix())) {
-                    String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
-                    repl = (PersistentReplicator)
-                            topic.getPersistentReplicator(remoteCluster);
-                    if (repl == null) {
-                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                "Replicator not found"));
-                        return;
-                    }
-                } else {
-                    sub = topic.getSubscription(subName);
-                    if (sub == null) {
-                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
-                                
getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                        return;
-                    }
+                PersistentSubscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                            "Subscription not found"));
+                    return;
                 }
-
                 CompletableFuture<Integer> batchSizeFuture = new 
CompletableFuture<>();
                 getEntryBatchSize(batchSizeFuture, topic, messageId, 
batchIndex);
-
-                PersistentReplicator finalRepl = repl;
-                PersistentSubscription finalSub = sub;
-
                 batchSizeFuture.thenAccept(bi -> {
                     PositionImpl position = 
calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
                     boolean issued;
                     try {
                         if (subName.startsWith(topic.getReplicatorPrefix())) {
-                            issued = finalRepl.expireMessages(position);
+                            String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                            PersistentReplicator repl = (PersistentReplicator)
+                                    
topic.getPersistentReplicator(remoteCluster);
+                            if (repl == null) {
+                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                        "Replicator not found"));
+                                return;
+                            }
+                            issued = repl.expireMessages(position);
                         } else {
-                            issued = finalSub.expireMessages(position);
+                            issued = sub.expireMessages(position);
                         }
-
                         if (issued) {
                             log.info("[{}] Message expire started up to {} on 
{} {}", clientAppId(), position,
                                     topicName, subName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index d3b52c52677..4335762a21b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -22,7 +22,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
-import javax.annotation.Nullable;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -44,7 +43,6 @@ import org.slf4j.LoggerFactory;
 public class PersistentMessageExpiryMonitor implements FindEntryCallback {
     private final ManagedCursor cursor;
     private final String subName;
-    private final PersistentTopic topic;
     private final String topicName;
     private final Rate msgExpired;
     private final LongAdder totalMsgExpired;
@@ -59,10 +57,9 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
             expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
             .newUpdater(PersistentMessageExpiryMonitor.class, 
"expirationCheckInProgress");
 
-    public PersistentMessageExpiryMonitor(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
-                                          @Nullable PersistentSubscription 
subscription) {
-        this.topic = topic;
-        this.topicName = topic.getName();
+    public PersistentMessageExpiryMonitor(String topicName, String 
subscriptionName, ManagedCursor cursor,
+                                          PersistentSubscription subscription) 
{
+        this.topicName = topicName;
         this.cursor = cursor;
         this.subName = subscriptionName;
         this.subscription = subscription;
@@ -101,12 +98,11 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
 
     public boolean expireMessages(Position messagePosition) {
         // If it's beyond last position of this topic, do nothing.
-        PositionImpl topicLastPosition = (PositionImpl) 
this.topic.getLastPosition();
-        if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) {
+        if (((PositionImpl) 
subscription.getTopic().getLastPosition()).compareTo((PositionImpl) 
messagePosition) < 0) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore expire-message scheduled task, 
given position {} is beyond "
-                                + "current topic's last position {}", 
topicName, subName, messagePosition,
-                        topicLastPosition);
+                         + "current topic's last position {}", topicName, 
subName, messagePosition,
+                        subscription.getTopic().getLastPosition());
             }
             return false;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index adfae64489d..8254e512afc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -114,7 +114,7 @@ public class PersistentReplicator extends AbstractReplicator
                 replicationClient);
         this.topic = topic;
         this.cursor = cursor;
-        this.expiryMonitor = new 
PersistentMessageExpiryMonitor((PersistentTopic) localTopic,
+        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
                 Codec.decode(cursor.getName()), cursor, null);
         HAVE_PENDING_READ_UPDATER.set(this, FALSE);
         PENDING_MESSAGES_UPDATER.set(this, 0);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a14a5108dff..9654915685b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -140,7 +140,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.fullName = MoreObjects.toStringHelper(this).add("topic", 
topicName).add("name", subName).toString();
-        this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, 
subscriptionName, cursor, this);
+        this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, 
subscriptionName, cursor, this);
         this.setReplicated(replicated);
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? Collections.emptyMap() : 
Collections.unmodifiableMap(subscriptionProperties);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 5287d842727..05c07d8f44e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -62,7 +62,6 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -231,11 +230,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
                 });
         assertTrue(ex.get());
 
-        PersistentTopic mock = mock(PersistentTopic.class);
-        when(mock.getName()).thenReturn("topicname");
-        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
-
-        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
         monitor.findEntryFailed(new 
ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
                 Optional.empty(), null);
         Field field = 
monitor.getClass().getDeclaredField("expirationCheckInProgress");
@@ -412,11 +407,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         bkc.deleteLedger(ledgers.get(1).getLedgerId());
         bkc.deleteLedger(ledgers.get(2).getLedgerId());
 
-        PersistentTopic mock = mock(PersistentTopic.class);
-        when(mock.getName()).thenReturn("topicname");
-        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
-
-        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
         Position previousMarkDelete = null;
         for (int i = 0; i < totalEntries; i++) {
             monitor.expireMessages(1);
@@ -453,16 +444,15 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
 
         PersistentSubscription subscription = 
mock(PersistentSubscription.class);
-        PersistentTopic topic = mock(PersistentTopic.class);
+        Topic topic = mock(Topic.class);
         when(subscription.getTopic()).thenReturn(topic);
-        when(topic.getName()).thenReturn("topicname");
 
         for (int i = 0; i < totalEntries; i++) {
             positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + 
i)));
         }
         
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
 
-        PersistentMessageExpiryMonitor monitor = spy(new 
PersistentMessageExpiryMonitor(topic,
+        PersistentMessageExpiryMonitor monitor = spy(new 
PersistentMessageExpiryMonitor("topicname",
                 cursor.getName(), cursor, subscription));
         assertEquals(cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(0).getLedgerId(), -1));
         boolean issued;
@@ -501,7 +491,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         clearInvocations(monitor);
 
         ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
-        PersistentMessageExpiryMonitor mockMonitor = spy(new 
PersistentMessageExpiryMonitor(topic,
+        PersistentMessageExpiryMonitor mockMonitor = spy(new 
PersistentMessageExpiryMonitor("topicname",
                 cursor.getName(), mockCursor, subscription));
         // Not calling findEntryComplete to clear expirationCheckInProgress 
condition, so following call to
         // expire message shouldn't issue.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 85c8eca8fa0..158d223336b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -726,36 +726,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertEquals(status.getReplicationBacklog(), 0);
     }
 
-
-    @Test(timeOut = 30000)
-    public void testResetReplicatorSubscriptionPosition() throws Exception {
-        final TopicName dest = TopicName
-                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));
-
-        @Cleanup
-        MessageProducer producer1 = new MessageProducer(url1, dest);
-
-        // Produce from cluster1 and consume from the rest
-        for (int i = 0; i < 10; i++) {
-            producer1.produce(2);
-        }
-
-        PersistentTopic topic = (PersistentTopic) 
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
-
-        PersistentReplicator replicator = (PersistentReplicator) spy(
-                
topic.getReplicators().get(topic.getReplicators().keys().get(0)));
-
-        MessageId id = topic.getLastMessageId().get();
-        admin1.topics().expireMessages(dest.getPartitionedTopicName(),
-                replicator.getCursor().getName(),
-                id,false);
-
-        replicator.updateRates();
-
-        ReplicatorStats status = replicator.getStats();
-        assertEquals(status.getReplicationBacklog(), 0);
-    }
-
     @Test(timeOut = 30000)
     public void testResetCursorNotFail() throws Exception {
 

Reply via email to