This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 89f60158971 [improve][broker] Fix replicated subscriptions race
condition with mark delete update and snapshot completion (#16651)
89f60158971 is described below
commit 89f60158971d6e0cee6f54d50b5749659b4313da
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 9 19:19:50 2025 +0200
[improve][broker] Fix replicated subscriptions race condition with mark
delete update and snapshot completion (#16651)
---
.../service/persistent/PersistentSubscription.java | 60 ++++++++++++++--------
.../ReplicatedSubscriptionSnapshotCache.java | 9 +++-
.../ReplicatedSubscriptionsController.java | 15 ++++--
.../org/apache/pulsar/broker/BrokerTestUtil.java | 12 +++--
.../broker/auth/MockedPulsarServiceBaseTest.java | 6 ++-
.../broker/service/ReplicatedSubscriptionTest.java | 33 ++++++++----
.../pulsar/broker/service/ReplicatorTestBase.java | 2 +-
7 files changed, 92 insertions(+), 45 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 056e9a25fda..a0be5c7c945 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -467,21 +467,6 @@ public class PersistentSubscription extends
AbstractSubscription {
}
}
- if
(!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
- this.updateLastMarkDeleteAdvancedTimestamp();
-
- // Mark delete position advance
- ReplicatedSubscriptionSnapshotCache snapshotCache =
this.replicatedSubscriptionSnapshotCache;
- if (snapshotCache != null) {
- ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
-
.advancedMarkDeletePosition(cursor.getMarkDeletedPosition());
- if (snapshot != null) {
- topic.getReplicatedSubscriptionController()
- .ifPresent(c ->
c.localSubscriptionUpdated(subName, snapshot));
- }
- }
- }
-
if (topic.getManagedLedger().isTerminated() &&
cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
if (dispatcher != null) {
@@ -514,7 +499,7 @@ public class PersistentSubscription extends
AbstractSubscription {
dispatcher.afterAckMessages(null, ctx);
}
// Signal the dispatchers to give chance to take extra actions
- notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD);
+ notifyTheMarkDeletePositionChanged(oldMD);
}
@Override
@@ -541,7 +526,7 @@ public class PersistentSubscription extends
AbstractSubscription {
if (dispatcher != null) {
dispatcher.afterAckMessages(null, context);
}
- notifyTheMarkDeletePositionMoveForwardIfNeeded((Position) context);
+ notifyTheMarkDeletePositionChanged((Position) context);
}
@Override
@@ -554,11 +539,42 @@ public class PersistentSubscription extends
AbstractSubscription {
}
};
- private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position
oldPosition) {
- Position oldMD = oldPosition;
+ /**
+ * This method is called after acknowledgements (such as individual acks)
have been processed and the mark-delete
+ * position has possibly been updated and advanced after "ack holes" have
been filled up by the latest individual
+ * acknowledgements.
+ * @param oldPosition previous mark-delete position before the update
+ */
+ private void notifyTheMarkDeletePositionChanged(Position oldPosition) {
Position newMD = cursor.getMarkDeletedPosition();
- if (dispatcher != null && newMD.compareTo(oldMD) > 0) {
- dispatcher.markDeletePositionMoveForward();
+
+ // check if the mark delete position has changed since the last call
+ if (newMD.compareTo(oldPosition) != 0) {
+ updateLastMarkDeleteAdvancedTimestamp();
+ handleReplicatedSubscriptionsUpdate(newMD);
+
+ if (dispatcher != null) {
+ dispatcher.markDeletePositionMoveForward();
+ }
+ }
+ }
+
+ /**
+ * Checks the snapshot cache for a snapshot that corresponds to the given
mark-delete position.
+ * If a snapshot is found, it will notify the replicated subscription
controller that the local subscription
+ * has been updated.
+ * This method is called when the mark-delete position is advanced or when
a new snapshot is added to the cache.
+ * When the new snapshot is added, it might be suitable for the current
mark-delete position.
+ * @param markDeletePosition the mark delete position to check for a
snapshot
+ */
+ private void handleReplicatedSubscriptionsUpdate(Position
markDeletePosition) {
+ ReplicatedSubscriptionSnapshotCache snapshotCache =
this.replicatedSubscriptionSnapshotCache;
+ if (snapshotCache != null) {
+ ReplicatedSubscriptionsSnapshot snapshot =
snapshotCache.advancedMarkDeletePosition(markDeletePosition);
+ if (snapshot != null) {
+ topic.getReplicatedSubscriptionController()
+ .ifPresent(c -> c.localSubscriptionUpdated(subName,
snapshot));
+ }
}
}
@@ -1569,6 +1585,8 @@ public class PersistentSubscription extends
AbstractSubscription {
ReplicatedSubscriptionSnapshotCache snapshotCache =
this.replicatedSubscriptionSnapshotCache;
if (snapshotCache != null) {
snapshotCache.addNewSnapshot(new
ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
+ // check if the newly added snapshot can be used with the current
mark delete position
+
handleReplicatedSubscriptionsUpdate(cursor.getMarkDeletedPosition());
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
index f78aabfd821..6b4e60dc63b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
@@ -65,9 +66,15 @@ public class ReplicatedSubscriptionSnapshotCache {
public synchronized ReplicatedSubscriptionsSnapshot
advancedMarkDeletePosition(Position pos) {
ReplicatedSubscriptionsSnapshot snapshot = null;
while (!snapshots.isEmpty()) {
- Position first = snapshots.firstKey();
+ Map.Entry<Position, ReplicatedSubscriptionsSnapshot> firstEntry =
+ snapshots.firstEntry();
+ Position first = firstEntry.getKey();
if (first.compareTo(pos) > 0) {
// Snapshot is associated which an higher position, so it
cannot be used now
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Snapshot {} is associated with an higher
position {} so it cannot be used for mark "
+ + "delete position {}", subscription,
firstEntry.getValue(), first, pos);
+ }
break;
} else {
// This snapshot is potentially good. Continue the search for
to see if there is a higher snapshot we
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index b21fe7acfdb..7ae48f7976b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -128,7 +128,8 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
public void localSubscriptionUpdated(String subscriptionName,
ReplicatedSubscriptionsSnapshot snapshot) {
if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Updating subscription to snapshot {}", topic,
subscriptionName,
+ log.debug("[{}][{}][{}] Updating subscription to snapshot {}",
+ topic.getBrokerService().pulsar().getBrokerId(), topic,
subscriptionName,
snapshot.getClustersList().stream()
.map(cmid -> String.format("%s -> %d:%d",
cmid.getCluster(),
cmid.getMessageId().getLedgerId(),
cmid.getMessageId().getEntryId()))
@@ -157,7 +158,8 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
// message id.
Position lastMsgId = topic.getLastPosition();
if (log.isDebugEnabled()) {
- log.debug("[{}] Received snapshot request. Last msg id: {}",
topic.getName(), lastMsgId);
+ log.debug("[{}][{}] Received snapshot request. Last msg id: {}",
+ topic.getBrokerService().pulsar().getBrokerId(),
topic.getName(), lastMsgId);
}
ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
@@ -242,7 +244,8 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
// There was no message written since the last snapshot, we can
skip creating a new snapshot
if (log.isDebugEnabled()) {
- log.debug("[{}] There is no new data in topic. Skipping
snapshot creation.", topic.getName());
+ log.debug("[{}][{}] There is no new data in topic. Skipping
snapshot creation.",
+ topic.getBrokerService().pulsar().getBrokerId(),
topic.getName());
}
return;
}
@@ -264,7 +267,8 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
}
if (log.isDebugEnabled()) {
- log.debug("[{}] Starting snapshot creation.", topic.getName());
+ log.debug("[{}][{}] Starting snapshot creation.",
topic.getBrokerService().pulsar().getBrokerId(),
+ topic.getName());
}
pendingSnapshotsMetric.inc();
@@ -328,7 +332,8 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
// Nothing to do in case of publish errors since the retry logic is
applied upstream after a snapshot is not
// closed
if (log.isDebugEnabled()) {
- log.debug("[{}] Published marker at {}:{}. Exception: {}",
topic.getName(), ledgerId, entryId, e);
+ log.debug("[{}][{}] Published marker at {}:{}. Exception: {}",
+ topic.getBrokerService().pulsar().getBrokerId(),
topic.getName(), ledgerId, entryId, e);
}
this.positionOfLastLocalMarker = PositionFactory.create(ledgerId,
entryId);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index 39de3f9fc02..92c93dd0112 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -130,14 +130,16 @@ public class BrokerTestUtil {
/**
* Logs the topic stats and internal stats for the given topic.
- * @param logger logger to use
+ *
+ * @param logger logger to use
* @param pulsarAdmin PulsarAdmin client to use
- * @param topic topic name
+ * @param topic topic name
+ * @param description
*/
- public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin,
String topic) {
+ public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin,
String topic, String description) {
try {
- logger.info("[{}] stats: {}", topic,
toJson(pulsarAdmin.topics().getStats(topic)));
- logger.info("[{}] internalStats: {}", topic,
+ logger.info("[{}] {} stats: {}", topic, description,
toJson(pulsarAdmin.topics().getStats(topic)));
+ logger.info("[{}] {} internalStats: {}", topic, description,
toJson(pulsarAdmin.topics().getInternalStats(topic,
true)));
} catch (PulsarAdminException e) {
logger.warn("Failed to get stats for topic {}", topic, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b349ba71922..73bb407f52a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -806,7 +806,11 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
}
protected void logTopicStats(String topic) {
- BrokerTestUtil.logTopicStats(log, admin, topic);
+ logTopicStats(topic, "");
+ }
+
+ protected void logTopicStats(String topic, String description) {
+ BrokerTestUtil.logTopicStats(log, admin, topic, description);
}
@DataProvider(name = "trueFalse")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 45026ce61bf..74dbc5e5291 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -106,11 +106,7 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
String namespace =
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
String topicName = "persistent://" + namespace + "/mytopic";
String subscriptionName = "cluster-subscription";
- // Subscription replication produces duplicates,
https://github.com/apache/pulsar/issues/10054
- // TODO: duplications shouldn't be allowed, change to "false" when
fixing the issue
- boolean allowDuplicates = true;
- // this setting can be used to manually run the test with subscription
replication disabled
- // it shows that subscription replication has no impact in behavior
for this test case
+ boolean allowDuplicates = false;
boolean replicateSubscriptionState = true;
admin1.namespaces().createNamespace(namespace);
@@ -134,7 +130,8 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
Set<String> sentMessages = new LinkedHashSet<>();
- // send messages in r1
+ log.info("Send messages in r1");
+
{
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
@@ -144,32 +141,41 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
int numMessages = 6;
for (int i = 0; i < numMessages; i++) {
String body = "message" + i;
- producer.send(body.getBytes(StandardCharsets.UTF_8));
+ MessageId messageId =
producer.send(body.getBytes(StandardCharsets.UTF_8));
+ log.info("Sent message: {} with msgId: {}", body, messageId);
sentMessages.add(body);
+ if (i == 2) {
+ // wait for subscription snapshot to be created
+ Thread.sleep(2 *
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+ }
}
}
Set<String> receivedMessages = new LinkedHashSet<>();
+ log.info("Consuming 3 messages in r1");
+
// consume 3 messages in r1
try (Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
+ .receiverQueueSize(2)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer1, receivedMessages, 3, allowDuplicates);
+ log.info("Waiting after reading 3 messages in r1.");
+
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
}
- // wait for subscription to be replicated
- Thread.sleep(2 *
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
-
- // consume remaining messages in r2
+ log.info("Consume remaining messages in r2");
try (Consumer<byte[]> consumer2 = client2.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer2, receivedMessages, -1, allowDuplicates);
+ } finally {
+ printStats(topicName);
}
// assert that all messages have been received
@@ -188,6 +194,11 @@ public class ReplicatedSubscriptionTest extends
ReplicatorTestBase {
histogramPoint ->
histogramPoint.hasSumGreaterThan(0.0))));
}
+ private void printStats(String topicName) throws PulsarAdminException {
+ BrokerTestUtil.logTopicStats(log, admin1, topicName, "admin1");
+ BrokerTestUtil.logTopicStats(log, admin2, topicName, "admin2");
+ }
+
/**
* Tests replicated subscriptions across two regions and can read
successful.
*/
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 676a6f125e6..fc5dff49f80 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -128,7 +128,7 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
protected final String cluster2 = "r2";
protected final String cluster3 = "r3";
protected final String cluster4 = "r4";
- protected String loadManagerClassName;
+ protected String loadManagerClassName =
"org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl";
protected String getLoadManagerClassName() {
return loadManagerClassName;