This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 643ffadb7a8 [fix][broker][branch-2.10] Fix NPE when reset Replicator's
cursor by position. (#20597) (#20781)
643ffadb7a8 is described below
commit 643ffadb7a868164b62508acef5b3ebb01694965
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Jul 12 12:43:41 2023 +0800
[fix][broker][branch-2.10] Fix NPE when reset Replicator's cursor by
position. (#20597) (#20781)
---
.../broker/admin/impl/PersistentTopicsBase.java | 42 ++++++++++++++--------
.../persistent/PersistentMessageExpiryMonitor.java | 23 ++++++++++--
.../service/persistent/PersistentReplicator.java | 2 +-
.../service/persistent/PersistentSubscription.java | 2 +-
.../service/PersistentMessageFinderTest.java | 20 ++++++++---
.../pulsar/broker/service/ReplicatorTest.java | 30 ++++++++++++++++
6 files changed, 94 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 44a5a890331..da37d1b7e54 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3794,31 +3794,43 @@ public class PersistentTopicsBase extends AdminResource
{
return;
}
try {
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "Subscription not found"));
- return;
+ PersistentSubscription sub = null;
+ PersistentReplicator repl = null;
+
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
+ String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
+ repl = (PersistentReplicator)
+ topic.getPersistentReplicator(remoteCluster);
+ if (repl == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "Replicator not found"));
+ return;
+ }
+ } else {
+ sub = topic.getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+
getSubNotFoundErrorMessage(topicName.toString(), subName)));
+ return;
+ }
}
+
CompletableFuture<Integer> batchSizeFuture = new
CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, topic, messageId,
batchIndex);
+
+ PersistentReplicator finalRepl = repl;
+ PersistentSubscription finalSub = sub;
+
batchSizeFuture.thenAccept(bi -> {
PositionImpl position =
calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
boolean issued;
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
- String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
- PersistentReplicator repl = (PersistentReplicator)
-
topic.getPersistentReplicator(remoteCluster);
- if (repl == null) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Replicator not found"));
- return;
- }
- issued = repl.expireMessages(position);
+ issued = finalRepl.expireMessages(position);
} else {
- issued = sub.expireMessages(position);
+ issued = finalSub.expireMessages(position);
}
+
if (issued) {
log.info("[{}] Message expire started up to {} on
{} {}", clientAppId(), position,
topicName, subName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 4335762a21b..35c6b807589 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final ManagedCursor cursor;
private final String subName;
+ private final PersistentTopic topic;
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
@@ -57,9 +58,24 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageExpiryMonitor.class,
"expirationCheckInProgress");
+ public PersistentMessageExpiryMonitor(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
+ PersistentSubscription subscription)
{
+ this.topic = topic;
+ this.topicName = topic.getName();
+ this.cursor = cursor;
+ this.subName = subscriptionName;
+ this.subscription = subscription;
+ this.msgExpired = new Rate();
+ this.totalMsgExpired = new LongAdder();
+ // check to avoid test failures
+ this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() !=
null
+ &&
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
+ }
+
public PersistentMessageExpiryMonitor(String topicName, String
subscriptionName, ManagedCursor cursor,
PersistentSubscription subscription)
{
this.topicName = topicName;
+ this.topic = subscription.topic;
this.cursor = cursor;
this.subName = subscriptionName;
this.subscription = subscription;
@@ -98,11 +114,12 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
- if (((PositionImpl)
subscription.getTopic().getLastPosition()).compareTo((PositionImpl)
messagePosition) < 0) {
+ PositionImpl topicLastPosition = (PositionImpl)
this.topic.getLastPosition();
+ if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task,
given position {} is beyond "
- + "current topic's last position {}", topicName,
subName, messagePosition,
- subscription.getTopic().getLastPosition());
+ + "current topic's last position {}",
topicName, subName, messagePosition,
+ topicLastPosition);
}
return false;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 8254e512afc..adfae64489d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -114,7 +114,7 @@ public class PersistentReplicator extends AbstractReplicator
replicationClient);
this.topic = topic;
this.cursor = cursor;
- this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
+ this.expiryMonitor = new
PersistentMessageExpiryMonitor((PersistentTopic) localTopic,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9654915685b..a14a5108dff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -140,7 +140,7 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic",
topicName).add("name", subName).toString();
- this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
subscriptionName, cursor, this);
+ this.expiryMonitor = new PersistentMessageExpiryMonitor(topic,
subscriptionName, cursor, this);
this.setReplicated(replicated);
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() :
Collections.unmodifiableMap(subscriptionProperties);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 05c07d8f44e..5287d842727 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -230,7 +231,11 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
});
assertTrue(ex.get());
- PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
+ PersistentTopic mock = mock(PersistentTopic.class);
+ when(mock.getName()).thenReturn("topicname");
+ when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
monitor.findEntryFailed(new
ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field =
monitor.getClass().getDeclaredField("expirationCheckInProgress");
@@ -407,7 +412,11 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());
- PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
+ PersistentTopic mock = mock(PersistentTopic.class);
+ when(mock.getName()).thenReturn("topicname");
+ when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
@@ -444,15 +453,16 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
PersistentSubscription subscription =
mock(PersistentSubscription.class);
- Topic topic = mock(Topic.class);
+ PersistentTopic topic = mock(PersistentTopic.class);
when(subscription.getTopic()).thenReturn(topic);
+ when(topic.getName()).thenReturn("topicname");
for (int i = 0; i < totalEntries; i++) {
positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" +
i)));
}
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
- PersistentMessageExpiryMonitor monitor = spy(new
PersistentMessageExpiryMonitor("topicname",
+ PersistentMessageExpiryMonitor monitor = spy(new
PersistentMessageExpiryMonitor(topic,
cursor.getName(), cursor, subscription));
assertEquals(cursor.getMarkDeletedPosition(),
PositionImpl.get(positions.get(0).getLedgerId(), -1));
boolean issued;
@@ -491,7 +501,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
clearInvocations(monitor);
ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
- PersistentMessageExpiryMonitor mockMonitor = spy(new
PersistentMessageExpiryMonitor("topicname",
+ PersistentMessageExpiryMonitor mockMonitor = spy(new
PersistentMessageExpiryMonitor(topic,
cursor.getName(), mockCursor, subscription));
// Not calling findEntryComplete to clear expirationCheckInProgress
condition, so following call to
// expire message shouldn't issue.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 158d223336b..85c8eca8fa0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -726,6 +726,36 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertEquals(status.getReplicationBacklog(), 0);
}
+
+ @Test(timeOut = 30000)
+ public void testResetReplicatorSubscriptionPosition() throws Exception {
+ final TopicName dest = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));
+
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ // Produce from cluster1 and consume from the rest
+ for (int i = 0; i < 10; i++) {
+ producer1.produce(2);
+ }
+
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
+
+ PersistentReplicator replicator = (PersistentReplicator) spy(
+
topic.getReplicators().get(topic.getReplicators().keys().get(0)));
+
+ MessageId id = topic.getLastMessageId().get();
+ admin1.topics().expireMessages(dest.getPartitionedTopicName(),
+ replicator.getCursor().getName(),
+ id,false);
+
+ replicator.updateRates();
+
+ ReplicatorStats status = replicator.getStats();
+ assertEquals(status.getReplicationBacklog(), 0);
+ }
+
@Test(timeOut = 30000)
public void testResetCursorNotFail() throws Exception {