This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 89f60158971 [improve][broker] Fix replicated subscriptions race 
condition with mark delete update and snapshot completion (#16651)
89f60158971 is described below

commit 89f60158971d6e0cee6f54d50b5749659b4313da
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 9 19:19:50 2025 +0200

    [improve][broker] Fix replicated subscriptions race condition with mark 
delete update and snapshot completion (#16651)
---
 .../service/persistent/PersistentSubscription.java | 60 ++++++++++++++--------
 .../ReplicatedSubscriptionSnapshotCache.java       |  9 +++-
 .../ReplicatedSubscriptionsController.java         | 15 ++++--
 .../org/apache/pulsar/broker/BrokerTestUtil.java   | 12 +++--
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  6 ++-
 .../broker/service/ReplicatedSubscriptionTest.java | 33 ++++++++----
 .../pulsar/broker/service/ReplicatorTestBase.java  |  2 +-
 7 files changed, 92 insertions(+), 45 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 056e9a25fda..a0be5c7c945 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -467,21 +467,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
             }
         }
 
-        if 
(!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
-            this.updateLastMarkDeleteAdvancedTimestamp();
-
-            // Mark delete position advance
-            ReplicatedSubscriptionSnapshotCache snapshotCache = 
this.replicatedSubscriptionSnapshotCache;
-            if (snapshotCache != null) {
-                ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
-                        
.advancedMarkDeletePosition(cursor.getMarkDeletedPosition());
-                if (snapshot != null) {
-                    topic.getReplicatedSubscriptionController()
-                            .ifPresent(c -> 
c.localSubscriptionUpdated(subName, snapshot));
-                }
-            }
-        }
-
         if (topic.getManagedLedger().isTerminated() && 
cursor.getNumberOfEntriesInBacklog(false) == 0) {
             // Notify all consumer that the end of topic was reached
             if (dispatcher != null) {
@@ -514,7 +499,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 dispatcher.afterAckMessages(null, ctx);
             }
             // Signal the dispatchers to give chance to take extra actions
-            notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD);
+            notifyTheMarkDeletePositionChanged(oldMD);
         }
 
         @Override
@@ -541,7 +526,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
             if (dispatcher != null) {
                 dispatcher.afterAckMessages(null, context);
             }
-            notifyTheMarkDeletePositionMoveForwardIfNeeded((Position) context);
+            notifyTheMarkDeletePositionChanged((Position) context);
         }
 
         @Override
@@ -554,11 +539,42 @@ public class PersistentSubscription extends 
AbstractSubscription {
         }
     };
 
-    private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position 
oldPosition) {
-        Position oldMD = oldPosition;
+    /**
+     * This method is called after acknowledgements (such as individual acks) 
have been processed and the mark-delete
+     * position has possibly been updated and advanced after "ack holes" have 
been filled up by the latest individual
+     * acknowledgements.
+     * @param oldPosition previous mark-delete position before the update
+     */
+    private void notifyTheMarkDeletePositionChanged(Position oldPosition) {
         Position newMD = cursor.getMarkDeletedPosition();
-        if (dispatcher != null && newMD.compareTo(oldMD) > 0) {
-            dispatcher.markDeletePositionMoveForward();
+
+        // check if the mark delete position has changed since the last call
+        if (newMD.compareTo(oldPosition) != 0) {
+            updateLastMarkDeleteAdvancedTimestamp();
+            handleReplicatedSubscriptionsUpdate(newMD);
+
+            if (dispatcher != null) {
+                dispatcher.markDeletePositionMoveForward();
+            }
+        }
+    }
+
+    /**
+     * Checks the snapshot cache for a snapshot that corresponds to the given 
mark-delete position.
+     * If a snapshot is found, it will notify the replicated subscription 
controller that the local subscription
+     * has been updated.
+     * This method is called when the mark-delete position is advanced or when 
a new snapshot is added to the cache.
+     * When the new snapshot is added, it might be suitable for the current 
mark-delete position.
+     * @param markDeletePosition the mark delete position to check for a 
snapshot
+     */
+    private void handleReplicatedSubscriptionsUpdate(Position 
markDeletePosition) {
+        ReplicatedSubscriptionSnapshotCache snapshotCache = 
this.replicatedSubscriptionSnapshotCache;
+        if (snapshotCache != null) {
+            ReplicatedSubscriptionsSnapshot snapshot = 
snapshotCache.advancedMarkDeletePosition(markDeletePosition);
+            if (snapshot != null) {
+                topic.getReplicatedSubscriptionController()
+                        .ifPresent(c -> c.localSubscriptionUpdated(subName, 
snapshot));
+            }
         }
     }
 
@@ -1569,6 +1585,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
         ReplicatedSubscriptionSnapshotCache snapshotCache = 
this.replicatedSubscriptionSnapshotCache;
         if (snapshotCache != null) {
             snapshotCache.addNewSnapshot(new 
ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
+            // check if the newly added snapshot can be used with the current 
mark delete position
+            
handleReplicatedSubscriptionsUpdate(cursor.getMarkDeletedPosition());
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
index f78aabfd821..6b4e60dc63b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import lombok.extern.slf4j.Slf4j;
@@ -65,9 +66,15 @@ public class ReplicatedSubscriptionSnapshotCache {
     public synchronized ReplicatedSubscriptionsSnapshot 
advancedMarkDeletePosition(Position pos) {
         ReplicatedSubscriptionsSnapshot snapshot = null;
         while (!snapshots.isEmpty()) {
-            Position first = snapshots.firstKey();
+            Map.Entry<Position, ReplicatedSubscriptionsSnapshot> firstEntry =
+                    snapshots.firstEntry();
+            Position first = firstEntry.getKey();
             if (first.compareTo(pos) > 0) {
                 // Snapshot is associated which an higher position, so it 
cannot be used now
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Snapshot {} is associated with an higher 
position {} so it cannot be used for mark "
+                            + "delete position {}", subscription, 
firstEntry.getValue(), first, pos);
+                }
                 break;
             } else {
                 // This snapshot is potentially good. Continue the search for 
to see if there is a higher snapshot we
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index b21fe7acfdb..7ae48f7976b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -128,7 +128,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
 
     public void localSubscriptionUpdated(String subscriptionName, 
ReplicatedSubscriptionsSnapshot snapshot) {
         if (log.isDebugEnabled()) {
-            log.debug("[{}][{}] Updating subscription to snapshot {}", topic, 
subscriptionName,
+            log.debug("[{}][{}][{}] Updating subscription to snapshot {}",
+                    topic.getBrokerService().pulsar().getBrokerId(), topic, 
subscriptionName,
                     snapshot.getClustersList().stream()
                             .map(cmid -> String.format("%s -> %d:%d", 
cmid.getCluster(),
                                     cmid.getMessageId().getLedgerId(), 
cmid.getMessageId().getEntryId()))
@@ -157,7 +158,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
         // message id.
         Position lastMsgId = topic.getLastPosition();
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Received snapshot request. Last msg id: {}", 
topic.getName(), lastMsgId);
+            log.debug("[{}][{}] Received snapshot request. Last msg id: {}",
+                    topic.getBrokerService().pulsar().getBrokerId(), 
topic.getName(), lastMsgId);
         }
 
         ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
@@ -242,7 +244,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
                 || topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
             // There was no message written since the last snapshot, we can 
skip creating a new snapshot
             if (log.isDebugEnabled()) {
-                log.debug("[{}] There is no new data in topic. Skipping 
snapshot creation.", topic.getName());
+                log.debug("[{}][{}] There is no new data in topic. Skipping 
snapshot creation.",
+                        topic.getBrokerService().pulsar().getBrokerId(), 
topic.getName());
             }
             return;
         }
@@ -264,7 +267,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Starting snapshot creation.", topic.getName());
+            log.debug("[{}][{}] Starting snapshot creation.", 
topic.getBrokerService().pulsar().getBrokerId(),
+                    topic.getName());
         }
 
         pendingSnapshotsMetric.inc();
@@ -328,7 +332,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
         // Nothing to do in case of publish errors since the retry logic is 
applied upstream after a snapshot is not
         // closed
         if (log.isDebugEnabled()) {
-            log.debug("[{}] Published marker at {}:{}. Exception: {}", 
topic.getName(), ledgerId, entryId, e);
+            log.debug("[{}][{}] Published marker at {}:{}. Exception: {}",
+                    topic.getBrokerService().pulsar().getBrokerId(), 
topic.getName(), ledgerId, entryId, e);
         }
 
         this.positionOfLastLocalMarker = PositionFactory.create(ledgerId, 
entryId);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index 39de3f9fc02..92c93dd0112 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -130,14 +130,16 @@ public class BrokerTestUtil {
 
     /**
      * Logs the topic stats and internal stats for the given topic.
-     * @param logger logger to use
+     *
+     * @param logger      logger to use
      * @param pulsarAdmin PulsarAdmin client to use
-     * @param topic topic name
+     * @param topic       topic name
+     * @param description
      */
-    public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, 
String topic) {
+    public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, 
String topic, String description) {
         try {
-            logger.info("[{}] stats: {}", topic, 
toJson(pulsarAdmin.topics().getStats(topic)));
-            logger.info("[{}] internalStats: {}", topic,
+            logger.info("[{}] {} stats: {}", topic, description, 
toJson(pulsarAdmin.topics().getStats(topic)));
+            logger.info("[{}] {} internalStats: {}", topic, description,
                     toJson(pulsarAdmin.topics().getInternalStats(topic, 
true)));
         } catch (PulsarAdminException e) {
             logger.warn("Failed to get stats for topic {}", topic, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b349ba71922..73bb407f52a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -806,7 +806,11 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
     }
 
     protected void logTopicStats(String topic) {
-        BrokerTestUtil.logTopicStats(log, admin, topic);
+        logTopicStats(topic, "");
+    }
+
+    protected void logTopicStats(String topic, String description) {
+        BrokerTestUtil.logTopicStats(log, admin, topic, description);
     }
 
     @DataProvider(name = "trueFalse")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
index 45026ce61bf..74dbc5e5291 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java
@@ -106,11 +106,7 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
         String namespace = 
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
         String topicName = "persistent://" + namespace + "/mytopic";
         String subscriptionName = "cluster-subscription";
-        // Subscription replication produces duplicates, 
https://github.com/apache/pulsar/issues/10054
-        // TODO: duplications shouldn't be allowed, change to "false" when 
fixing the issue
-        boolean allowDuplicates = true;
-        // this setting can be used to manually run the test with subscription 
replication disabled
-        // it shows that subscription replication has no impact in behavior 
for this test case
+        boolean allowDuplicates = false;
         boolean replicateSubscriptionState = true;
 
         admin1.namespaces().createNamespace(namespace);
@@ -134,7 +130,8 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
 
         Set<String> sentMessages = new LinkedHashSet<>();
 
-        // send messages in r1
+        log.info("Send messages in r1");
+
         {
             @Cleanup
             Producer<byte[]> producer = client1.newProducer().topic(topicName)
@@ -144,32 +141,41 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
             int numMessages = 6;
             for (int i = 0; i < numMessages; i++) {
                 String body = "message" + i;
-                producer.send(body.getBytes(StandardCharsets.UTF_8));
+                MessageId messageId = 
producer.send(body.getBytes(StandardCharsets.UTF_8));
+                log.info("Sent message: {} with msgId: {}", body, messageId);
                 sentMessages.add(body);
+                if (i == 2) {
+                    // wait for subscription snapshot to be created
+                    Thread.sleep(2 * 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+                }
             }
         }
 
         Set<String> receivedMessages = new LinkedHashSet<>();
 
+        log.info("Consuming 3 messages in r1");
+
         // consume 3 messages in r1
         try (Consumer<byte[]> consumer1 = client1.newConsumer()
                 .topic(topicName)
+                .receiverQueueSize(2)
                 .subscriptionName(subscriptionName)
                 .replicateSubscriptionState(replicateSubscriptionState)
                 .subscribe()) {
             readMessages(consumer1, receivedMessages, 3, allowDuplicates);
+            log.info("Waiting after reading 3 messages in r1.");
+            
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
         }
 
-        // wait for subscription to be replicated
-        Thread.sleep(2 * 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
-
-        // consume remaining messages in r2
+        log.info("Consume remaining messages in r2");
         try (Consumer<byte[]> consumer2 = client2.newConsumer()
                 .topic(topicName)
                 .subscriptionName(subscriptionName)
                 .replicateSubscriptionState(replicateSubscriptionState)
                 .subscribe()) {
             readMessages(consumer2, receivedMessages, -1, allowDuplicates);
+        } finally {
+            printStats(topicName);
         }
 
         // assert that all messages have been received
@@ -188,6 +194,11 @@ public class ReplicatedSubscriptionTest extends 
ReplicatorTestBase {
                                 histogramPoint -> 
histogramPoint.hasSumGreaterThan(0.0))));
     }
 
+    private void printStats(String topicName) throws PulsarAdminException {
+        BrokerTestUtil.logTopicStats(log, admin1, topicName, "admin1");
+        BrokerTestUtil.logTopicStats(log, admin2, topicName, "admin2");
+    }
+
     /**
      * Tests replicated subscriptions across two regions and can read 
successful.
      */
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 676a6f125e6..fc5dff49f80 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -128,7 +128,7 @@ public abstract class ReplicatorTestBase extends 
TestRetrySupport {
     protected final String cluster2 = "r2";
     protected final String cluster3 = "r3";
     protected final String cluster4 = "r4";
-    protected String loadManagerClassName;
+    protected String loadManagerClassName = 
"org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl";
 
     protected String getLoadManagerClassName() {
         return loadManagerClassName;

Reply via email to