This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 209b222d502 Revert "[fix][broker] Fix NPE when reset Replicator's
cursor by position. (#20597)"
209b222d502 is described below
commit 209b222d5025a0e5c78011290a76dd5435b79f56
Author: xiangying <[email protected]>
AuthorDate: Tue Jul 11 19:14:48 2023 +0800
Revert "[fix][broker] Fix NPE when reset Replicator's cursor by position.
(#20597)"
This reverts commit 18f89b68e5f273a13aeeda45d7d7815379406053.
---
.../broker/admin/impl/PersistentTopicsBase.java | 42 ++++++++--------------
.../persistent/PersistentMessageExpiryMonitor.java | 16 ++++-----
.../service/persistent/PersistentReplicator.java | 2 +-
.../service/persistent/PersistentSubscription.java | 2 +-
.../service/PersistentMessageFinderTest.java | 20 +++--------
.../pulsar/broker/service/ReplicatorTest.java | 30 ----------------
6 files changed, 28 insertions(+), 84 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index da37d1b7e54..44a5a890331 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3794,43 +3794,31 @@ public class PersistentTopicsBase extends AdminResource
{
return;
}
try {
- PersistentSubscription sub = null;
- PersistentReplicator repl = null;
-
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
- repl = (PersistentReplicator)
- topic.getPersistentReplicator(remoteCluster);
- if (repl == null) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
- "Replicator not found"));
- return;
- }
- } else {
- sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new
RestException(Status.NOT_FOUND,
-
getSubNotFoundErrorMessage(topicName.toString(), subName)));
- return;
- }
+ PersistentSubscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Subscription not found"));
+ return;
}
-
CompletableFuture<Integer> batchSizeFuture = new
CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, topic, messageId,
batchIndex);
-
- PersistentReplicator finalRepl = repl;
- PersistentSubscription finalSub = sub;
-
batchSizeFuture.thenAccept(bi -> {
PositionImpl position =
calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
boolean issued;
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
- issued = finalRepl.expireMessages(position);
+ String remoteCluster =
PersistentReplicator.getRemoteCluster(subName);
+ PersistentReplicator repl = (PersistentReplicator)
+
topic.getPersistentReplicator(remoteCluster);
+ if (repl == null) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "Replicator not found"));
+ return;
+ }
+ issued = repl.expireMessages(position);
} else {
- issued = finalSub.expireMessages(position);
+ issued = sub.expireMessages(position);
}
-
if (issued) {
log.info("[{}] Message expire started up to {} on
{} {}", clientAppId(), position,
topicName, subName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index d3b52c52677..4335762a21b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -22,7 +22,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
-import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -44,7 +43,6 @@ import org.slf4j.LoggerFactory;
public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final ManagedCursor cursor;
private final String subName;
- private final PersistentTopic topic;
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
@@ -59,10 +57,9 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageExpiryMonitor.class,
"expirationCheckInProgress");
- public PersistentMessageExpiryMonitor(PersistentTopic topic, String
subscriptionName, ManagedCursor cursor,
- @Nullable PersistentSubscription
subscription) {
- this.topic = topic;
- this.topicName = topic.getName();
+ public PersistentMessageExpiryMonitor(String topicName, String
subscriptionName, ManagedCursor cursor,
+ PersistentSubscription subscription)
{
+ this.topicName = topicName;
this.cursor = cursor;
this.subName = subscriptionName;
this.subscription = subscription;
@@ -101,12 +98,11 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
- PositionImpl topicLastPosition = (PositionImpl)
this.topic.getLastPosition();
- if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) {
+ if (((PositionImpl)
subscription.getTopic().getLastPosition()).compareTo((PositionImpl)
messagePosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task,
given position {} is beyond "
- + "current topic's last position {}",
topicName, subName, messagePosition,
- topicLastPosition);
+ + "current topic's last position {}", topicName,
subName, messagePosition,
+ subscription.getTopic().getLastPosition());
}
return false;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index adfae64489d..8254e512afc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -114,7 +114,7 @@ public class PersistentReplicator extends AbstractReplicator
replicationClient);
this.topic = topic;
this.cursor = cursor;
- this.expiryMonitor = new
PersistentMessageExpiryMonitor((PersistentTopic) localTopic,
+ this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a14a5108dff..9654915685b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -140,7 +140,7 @@ public class PersistentSubscription extends
AbstractSubscription implements Subs
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic",
topicName).add("name", subName).toString();
- this.expiryMonitor = new PersistentMessageExpiryMonitor(topic,
subscriptionName, cursor, this);
+ this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
subscriptionName, cursor, this);
this.setReplicated(replicated);
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() :
Collections.unmodifiableMap(subscriptionProperties);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 5287d842727..05c07d8f44e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -62,7 +62,6 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
@@ -231,11 +230,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
});
assertTrue(ex.get());
- PersistentTopic mock = mock(PersistentTopic.class);
- when(mock.getName()).thenReturn("topicname");
- when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
-
- PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
monitor.findEntryFailed(new
ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field =
monitor.getClass().getDeclaredField("expirationCheckInProgress");
@@ -412,11 +407,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());
- PersistentTopic mock = mock(PersistentTopic.class);
- when(mock.getName()).thenReturn("topicname");
- when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
-
- PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
@@ -453,16 +444,15 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
ManagedCursorImpl cursor = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
PersistentSubscription subscription =
mock(PersistentSubscription.class);
- PersistentTopic topic = mock(PersistentTopic.class);
+ Topic topic = mock(Topic.class);
when(subscription.getTopic()).thenReturn(topic);
- when(topic.getName()).thenReturn("topicname");
for (int i = 0; i < totalEntries; i++) {
positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" +
i)));
}
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
- PersistentMessageExpiryMonitor monitor = spy(new
PersistentMessageExpiryMonitor(topic,
+ PersistentMessageExpiryMonitor monitor = spy(new
PersistentMessageExpiryMonitor("topicname",
cursor.getName(), cursor, subscription));
assertEquals(cursor.getMarkDeletedPosition(),
PositionImpl.get(positions.get(0).getLedgerId(), -1));
boolean issued;
@@ -501,7 +491,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
clearInvocations(monitor);
ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
- PersistentMessageExpiryMonitor mockMonitor = spy(new
PersistentMessageExpiryMonitor(topic,
+ PersistentMessageExpiryMonitor mockMonitor = spy(new
PersistentMessageExpiryMonitor("topicname",
cursor.getName(), mockCursor, subscription));
// Not calling findEntryComplete to clear expirationCheckInProgress
condition, so following call to
// expire message shouldn't issue.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 85c8eca8fa0..158d223336b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -726,36 +726,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertEquals(status.getReplicationBacklog(), 0);
}
-
- @Test(timeOut = 30000)
- public void testResetReplicatorSubscriptionPosition() throws Exception {
- final TopicName dest = TopicName
-
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));
-
- @Cleanup
- MessageProducer producer1 = new MessageProducer(url1, dest);
-
- // Produce from cluster1 and consume from the rest
- for (int i = 0; i < 10; i++) {
- producer1.produce(2);
- }
-
- PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
-
- PersistentReplicator replicator = (PersistentReplicator) spy(
-
topic.getReplicators().get(topic.getReplicators().keys().get(0)));
-
- MessageId id = topic.getLastMessageId().get();
- admin1.topics().expireMessages(dest.getPartitionedTopicName(),
- replicator.getCursor().getName(),
- id,false);
-
- replicator.updateRates();
-
- ReplicatorStats status = replicator.getStats();
- assertEquals(status.getReplicationBacklog(), 0);
- }
-
@Test(timeOut = 30000)
public void testResetCursorNotFail() throws Exception {