This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch xiangying/branch-2.10/20597 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c4f855b8c251d0374528029c9e16f75f32bdddd7 Author: lifepuzzlefun <[email protected]> AuthorDate: Fri Jun 30 14:01:24 2023 +0800 [fix][broker] Fix NPE when reset Replicator's cursor by position. (#20597) (cherry picked from commit 5abadbeddac4fddf780d093a5cedf6c375ee0b29) --- .../broker/admin/impl/PersistentTopicsBase.java | 42 ++++++++++++++-------- .../persistent/PersistentMessageExpiryMonitor.java | 14 +++++--- .../service/persistent/PersistentReplicator.java | 2 +- .../service/persistent/PersistentSubscription.java | 2 +- .../service/PersistentMessageFinderTest.java | 20 ++++++++--- .../pulsar/broker/service/ReplicatorTest.java | 30 ++++++++++++++++ 6 files changed, 83 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 44a5a890331..da37d1b7e54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3794,31 +3794,43 @@ public class PersistentTopicsBase extends AdminResource { return; } try { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Subscription not found")); - return; + PersistentSubscription sub = null; + PersistentReplicator repl = null; + + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + repl = (PersistentReplicator) + topic.getPersistentReplicator(remoteCluster); + if (repl == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Replicator not found")); + return; + } + } else { + sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); + return; + } } + CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>(); getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); + + PersistentReplicator finalRepl = repl; + PersistentSubscription finalSub = sub; + batchSizeFuture.thenAccept(bi -> { PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); boolean issued; try { if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) - topic.getPersistentReplicator(remoteCluster); - if (repl == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Replicator not found")); - return; - } - issued = repl.expireMessages(position); + issued = finalRepl.expireMessages(position); } else { - issued = sub.expireMessages(position); + issued = finalSub.expireMessages(position); } + if (issued) { log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position, topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 4335762a21b..4be391ea31f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; +import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory; public class PersistentMessageExpiryMonitor implements FindEntryCallback { private final ManagedCursor cursor; private final String subName; + private final PersistentTopic topic; private final String topicName; private final Rate msgExpired; private final LongAdder totalMsgExpired; @@ -57,9 +59,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater .newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress"); - public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor, + public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, PersistentSubscription subscription) { - this.topicName = topicName; + this.topic = topic; + this.topicName = topic.getName(); this.cursor = cursor; this.subName = subscriptionName; this.subscription = subscription; @@ -98,11 +101,12 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. - if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) { + PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition(); + if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond " - + "current topic's last position {}", topicName, subName, messagePosition, - subscription.getTopic().getLastPosition()); + + "current topic's last position {}", topicName, subName, messagePosition, + topicLastPosition); } return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 8254e512afc..adfae64489d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -114,7 +114,7 @@ public class PersistentReplicator extends AbstractReplicator replicationClient); this.topic = topic; this.cursor = cursor; - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, + this.expiryMonitor = new PersistentMessageExpiryMonitor((PersistentTopic) localTopic, Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 9654915685b..a14a5108dff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -140,7 +140,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); + this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 05c07d8f44e..5287d842727 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -62,6 +62,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; @@ -230,7 +231,11 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { }); assertTrue(ex.get()); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), Optional.empty(), null); Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); @@ -407,7 +412,11 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { bkc.deleteLedger(ledgers.get(1).getLedgerId()); bkc.deleteLedger(ledgers.get(2).getLedgerId()); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); Position previousMarkDelete = null; for (int i = 0; i < totalEntries; i++) { monitor.expireMessages(1); @@ -444,15 +453,16 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); PersistentSubscription subscription = mock(PersistentSubscription.class); - Topic topic = mock(Topic.class); + PersistentTopic topic = mock(PersistentTopic.class); when(subscription.getTopic()).thenReturn(topic); + when(topic.getName()).thenReturn("topicname"); for (int i = 0; i < totalEntries; i++) { positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); } when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); - PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname", + PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), cursor, subscription)); assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1)); boolean issued; @@ -491,7 +501,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); - PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname", + PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), mockCursor, subscription)); // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to // expire message shouldn't issue. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 158d223336b..85c8eca8fa0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -726,6 +726,36 @@ public class ReplicatorTest extends ReplicatorTestBase { assertEquals(status.getReplicationBacklog(), 0); } + + @Test(timeOut = 30000) + public void testResetReplicatorSubscriptionPosition() throws Exception { + final TopicName dest = TopicName + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription")); + + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest); + + // Produce from cluster1 and consume from the rest + for (int i = 0; i < 10; i++) { + producer1.produce(2); + } + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); + + PersistentReplicator replicator = (PersistentReplicator) spy( + topic.getReplicators().get(topic.getReplicators().keys().get(0))); + + MessageId id = topic.getLastMessageId().get(); + admin1.topics().expireMessages(dest.getPartitionedTopicName(), + replicator.getCursor().getName(), + id,false); + + replicator.updateRates(); + + ReplicatorStats status = replicator.getStats(); + assertEquals(status.getReplicationBacklog(), 0); + } + @Test(timeOut = 30000) public void testResetCursorNotFail() throws Exception {
