This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 610acb5da2 [ISSUE #10384] Enforce Lite exclusive subscription eviction 
via server-side tombstones (#10386)
610acb5da2 is described below

commit 610acb5da2d6449d781bf43fd9a01129a526c29e
Author: Quan <[email protected]>
AuthorDate: Tue Jun 2 20:10:32 2026 +0800

    [ISSUE #10384] Enforce Lite exclusive subscription eviction via server-side 
tombstones (#10386)
---
 .../broker/lite/ExclusiveEvictionTombstones.java   |  93 +++++++++
 .../broker/lite/LiteSubscriptionRegistry.java      |   2 +
 .../broker/lite/LiteSubscriptionRegistryImpl.java  |  36 ++++
 .../broker/processor/PopLiteMessageProcessor.java  |   7 +
 .../lite/ExclusiveEvictionTombstonesTest.java      | 158 +++++++++++++++
 .../lite/LiteSubscriptionRegistryImplTest.java     | 225 +++++++++++++++++++++
 .../processor/PopLiteMessageProcessorTest.java     |  70 +++++++
 7 files changed, 591 insertions(+)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstones.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstones.java
new file mode 100644
index 0000000000..0efaf03c67
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstones.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.lite;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.lite.LiteUtil;
+
+/**
+ * Manages tombstones for exclusive subscription eviction.
+ * <p>
+ * When a client is evicted from a liteTopic (exclusive mode), an entry is 
placed here
+ * to prevent the evicted client from pulling messages until the tombstone is 
cleared
+ * during the next full subscription sync or client removal.
+ * <p>
+ * Key format: clientId$lmqName
+ */
+public class ExclusiveEvictionTombstones {
+
+    private static final char KEY_SEPARATOR = LiteUtil.SEPARATOR;
+
+    private final Set<String> tombstones = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Check whether a tombstone exists for the given client and lmqName.
+     */
+    public boolean contains(String clientId, String lmqName) {
+        return tombstones.contains(buildKey(clientId, lmqName));
+    }
+
+    /**
+     * Add a tombstone for the given client and lmqName.
+     */
+    public void add(String clientId, String lmqName) {
+        tombstones.add(buildKey(clientId, lmqName));
+    }
+
+    /**
+     * Remove the tombstone for the given client and lmqName, if present.
+     */
+    public void remove(String clientId, String lmqName) {
+        tombstones.remove(buildKey(clientId, lmqName));
+    }
+
+    /**
+     * Remove all tombstones belonging to the specified client.
+     */
+    public void removeAllOf(String clientId) {
+        String prefix = clientId + KEY_SEPARATOR;
+        tombstones.removeIf(key -> key.startsWith(prefix));
+    }
+
+    /**
+     * For a given client, remove tombstones whose lmqName is NOT in the 
provided active set.
+     * This is used during full subscription sync to clear stale tombstones.
+     */
+    public void removeStale(String clientId, Set<String> activeLmqNames) {
+        String prefix = clientId + KEY_SEPARATOR;
+        tombstones.removeIf(key -> {
+            if (!key.startsWith(prefix)) {
+                return false;
+            }
+            String lmqName = key.substring(prefix.length());
+            return !activeLmqNames.contains(lmqName);
+        });
+    }
+
+    /**
+     * Return the current number of tombstones (for monitoring/testing).
+     */
+    public int size() {
+        return tombstones.size();
+    }
+
+    private static String buildKey(String clientId, String lmqName) {
+        return clientId + KEY_SEPARATOR + lmqName;
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
index 50fbc373f2..965ed180fc 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
@@ -50,6 +50,8 @@ public interface LiteSubscriptionRegistry {
 
     void cleanSubscription(String lmqName, boolean notifyClient);
 
+    boolean hasExclusiveEvictionTombstone(String clientId, String lmqName);
+
     void start();
 
     void shutdown();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
index 3ced933127..b487b8757f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
@@ -61,6 +61,8 @@ public class LiteSubscriptionRegistryImpl extends 
ServiceThread implements LiteS
     private final BrokerController brokerController;
     private final AbstractLiteLifecycleManager liteLifecycleManager;
 
+    private final ExclusiveEvictionTombstones exclusiveEvictionTombstones = 
new ExclusiveEvictionTombstones();
+
     public LiteSubscriptionRegistryImpl(BrokerController brokerController,
         AbstractLiteLifecycleManager liteLifecycleManager) {
         this.brokerController = brokerController;
@@ -99,6 +101,10 @@ public class LiteSubscriptionRegistryImpl extends 
ServiceThread implements LiteS
             // First remove the old subscription
             if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
                 excludeClientByLmqName(clientId, group, lmqName);
+                // Boundary case: this client may have a stale tombstone from 
a previous eviction.
+                // Since it is now actively re-claiming the lmqName, clear its 
own tombstone so
+                // subsequent popLiteTopic is not blocked by the stale mark.
+                exclusiveEvictionTombstones.remove(clientId, lmqName);
             }
             resetOffset(lmqName, group, clientId, offsetOption);
             addTopicGroup(clientGroup, lmqName);
@@ -144,12 +150,31 @@ public class LiteSubscriptionRegistryImpl extends 
ServiceThread implements LiteS
             thisSub.addLiteTopic(lmqName);
             addTopicGroup(clientGroup, lmqName);
         });
+        // Tombstone operations only apply to exclusive groups.
+        if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
+            // Boundary case: if any lmqName in the client's reported full 
subscription still has
+            // a tombstone, the previous notifyUnsubscribeLite was likely 
lost. Re-send the
+            // unsubscribe notification to drive the client's local state to 
converge.
+            lmqNameNew.stream()
+                .filter(lmqName -> 
exclusiveEvictionTombstones.contains(clientId, lmqName))
+                .forEach(lmqName -> {
+                    LOGGER.info("re-notify unsubscribe for tombstoned lmqName, 
clientId:{}, group:{}, lmqName:{}",
+                        clientId, group, lmqName);
+                    notifyUnsubscribeLite(clientId, group, lmqName);
+                });
+            // Clean exclusive-eviction tombstones for liteTopics no longer in 
the client's full subscription set
+            exclusiveEvictionTombstones.removeStale(clientId, lmqNameNew);
+        }
     }
 
     @Override
     public void removeCompleteSubscription(String clientId) {
         clientChannels.remove(clientId);
         LiteSubscription thisSub = client2Subscription.remove(clientId);
+        // Only clean tombstones for exclusive groups.
+        if (thisSub == null || 
LiteMetadataUtil.isSubLiteExclusive(thisSub.getGroup(), brokerController)) {
+            exclusiveEvictionTombstones.removeAllOf(clientId);
+        }
         if (thisSub == null) {
             return;
         }
@@ -299,6 +324,7 @@ public class LiteSubscriptionRegistryImpl extends 
ServiceThread implements LiteS
                     client2Subscription.remove(clientGroup.clientId);
                 }
             }
+            exclusiveEvictionTombstones.add(clientGroup.clientId, lmqName);
             notifyUnsubscribeLite(clientGroup.clientId, clientGroup.group, 
lmqName);
             boolean resetOffset = 
LiteMetadataUtil.isResetOffsetInExclusiveMode(group, brokerController);
             LOGGER.info("excludeClientByLmqName group:{}, lmqName:{}, 
resetOffset:{}, clientId:{} -> {}",
@@ -464,6 +490,16 @@ public class LiteSubscriptionRegistryImpl extends 
ServiceThread implements LiteS
             LOGGER.info("Remove expired LiteSubscription, topic: {}, group: 
{}, clientId: {}, timeout: {}ms, expired: {}ms",
                 topic, group, clientId, checkTimeout, 
System.currentTimeMillis() - liteSubscription.getUpdateTime());
         });
+
+        int tombstoneSize = exclusiveEvictionTombstones.size();
+        if (tombstoneSize > 0) {
+            LOGGER.info("ExclusiveEvictionTombstones size: {}", tombstoneSize);
+        }
+    }
+
+    @Override
+    public boolean hasExclusiveEvictionTombstone(String clientId, String 
lmqName) {
+        return exclusiveEvictionTombstones.contains(clientId, lmqName);
     }
 
 }
\ No newline at end of file
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
index 9314dab734..7baca87694 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.opentelemetry.api.common.Attributes;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
+import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
 import org.apache.rocketmq.broker.longpolling.PollingResult;
 import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
 import org.apache.rocketmq.broker.metrics.LiteConsumerLagCalculator;
@@ -253,6 +254,7 @@ public class PopLiteMessageProcessor implements 
NettyRequestProcessor {
         StringBuilder orderCountInfoAll = new StringBuilder();
         AtomicLong total = new AtomicLong(0);
 
+        boolean isExclusiveGroup = LiteMetadataUtil.isSubLiteExclusive(group, 
brokerController);
         Set<String> processed = new HashSet<>(); // deduplication in one 
request
         Iterator<String> iterator = 
liteEventDispatcher.getEventIterator(clientId);
         while (total.get() < maxNum && iterator.hasNext()) {
@@ -263,6 +265,11 @@ public class PopLiteMessageProcessor implements 
NettyRequestProcessor {
             if (!processed.add(lmqName)) {
                 continue; // wait for next pop request or re-fetch in current 
process, here prefer the former approach
             }
+            // Tombstone check: reject pull if this client was evicted from 
the liteTopic (exclusive mode)
+            if (isExclusiveGroup && 
brokerController.getLiteSubscriptionRegistry().hasExclusiveEvictionTombstone(clientId,
 lmqName)) {
+                LOGGER.info("popLiteTopic rejected by tombstone: clientId={}, 
group={}, lmqName={}", clientId, group, lmqName);
+                continue;
+            }
             Pair<StringBuilder, GetMessageResult> pair = 
popLiteTopic(parentTopic, clientHost, group, lmqName,
                 maxNum - total.get(), popTime, invisibleTime, attemptId);
             if (null == pair || pair.getObject2().getMessageCount() <= 0) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstonesTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstonesTest.java
new file mode 100644
index 0000000000..70f88ee14f
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstonesTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.lite;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExclusiveEvictionTombstonesTest {
+
+    private ExclusiveEvictionTombstones tombstones;
+
+    @Before
+    public void setUp() {
+        tombstones = new ExclusiveEvictionTombstones();
+    }
+
+    @Test
+    public void testAddAndContains() {
+        // empty store
+        assertFalse(tombstones.contains("client1", "lmq1"));
+        assertEquals(0, tombstones.size());
+
+        // basic add
+        tombstones.add("client1", "lmq1");
+        assertTrue(tombstones.contains("client1", "lmq1"));
+        assertFalse(tombstones.contains("client1", "lmq2")); // same client, 
different lmq
+        assertFalse(tombstones.contains("client2", "lmq1")); // different 
client, same lmq
+        assertEquals(1, tombstones.size());
+
+        // duplicate add is idempotent
+        tombstones.add("client1", "lmq1");
+        assertEquals(1, tombstones.size());
+    }
+
+    @Test
+    public void testRemoveAllOf() {
+        tombstones.add("client1", "lmq1");
+        tombstones.add("client1", "lmq2");
+        tombstones.add("client2", "lmq1");
+
+        // non-existent client is no-op
+        tombstones.removeAllOf("client3");
+        assertEquals(3, tombstones.size());
+
+        // removes only target client
+        tombstones.removeAllOf("client1");
+        assertFalse(tombstones.contains("client1", "lmq1"));
+        assertFalse(tombstones.contains("client1", "lmq2"));
+        assertTrue(tombstones.contains("client2", "lmq1"));
+        assertEquals(1, tombstones.size());
+    }
+
+    @Test
+    public void testRemoveStale() {
+        tombstones.add("client1", "lmq1");
+        tombstones.add("client1", "lmq2");
+        tombstones.add("client1", "lmq3");
+        tombstones.add("client2", "lmq1");
+
+        // removes stale, keeps active, does not affect other clients
+        Set<String> activeSet = new HashSet<>();
+        activeSet.add("lmq2");
+        activeSet.add("lmq3");
+        tombstones.removeStale("client1", activeSet);
+
+        assertFalse(tombstones.contains("client1", "lmq1")); // removed (not 
in activeSet)
+        assertTrue(tombstones.contains("client1", "lmq2"));  // retained
+        assertTrue(tombstones.contains("client1", "lmq3"));  // retained
+        assertTrue(tombstones.contains("client2", "lmq1"));  // unaffected
+        assertEquals(3, tombstones.size());
+
+        // empty activeSet clears all for that client
+        tombstones.removeStale("client1", new HashSet<>());
+        assertFalse(tombstones.contains("client1", "lmq2"));
+        assertFalse(tombstones.contains("client1", "lmq3"));
+        assertTrue(tombstones.contains("client2", "lmq1")); // still unaffected
+    }
+
+    @Test
+    public void testSize() {
+        assertEquals(0, tombstones.size());
+
+        tombstones.add("c1", "l1");
+        assertEquals(1, tombstones.size());
+
+        tombstones.add("c1", "l2");
+        assertEquals(2, tombstones.size());
+
+        tombstones.add("c2", "l1");
+        assertEquals(3, tombstones.size());
+
+        tombstones.removeAllOf("c1");
+        assertEquals(1, tombstones.size());
+    }
+
+    /**
+     * Verifies that real RocketMQ clientId formats (containing '@') and 
lmqName formats
+     * (e.g. topic@group for wildcard) do not cause cross-client collisions.
+     */
+    @Test
+    public void testRealClientIdFormat_NoCrossClientCollision() {
+        // RocketMQ clientId: IP@instanceName vs IP@instanceName@unitName
+        // clientA is a strict prefix of clientB if '@' were the separator
+        String clientA = "10.0.0.1@DEFAULT";
+        String clientB = "10.0.0.1@DEFAULT@unit1";
+        String lmqPlain = "lmq1";
+        String lmqWildcard = "parentTopic@wildcardGroup"; // lmqName also 
contains '@'
+
+        tombstones.add(clientA, lmqPlain);
+        tombstones.add(clientA, lmqWildcard);
+        tombstones.add(clientB, lmqPlain);
+
+        // basic isolation
+        assertTrue(tombstones.contains(clientA, lmqPlain));
+        assertTrue(tombstones.contains(clientA, lmqWildcard));
+        assertTrue(tombstones.contains(clientB, lmqPlain));
+        assertFalse(tombstones.contains(clientA, "parentTopic")); // partial 
lmqName no match
+        assertFalse(tombstones.contains("10.0.0.1@DEFAULT", lmqPlain + 
"@extra")); // no false match
+        assertEquals(3, tombstones.size());
+
+        // removeStale for clientA does NOT affect clientB
+        Set<String> activeSet = new HashSet<>();
+        activeSet.add(lmqWildcard);
+        tombstones.removeStale(clientA, activeSet);
+
+        assertFalse(tombstones.contains(clientA, lmqPlain)); // removed
+        assertTrue(tombstones.contains(clientA, lmqWildcard)); // retained
+        assertTrue(tombstones.contains(clientB, lmqPlain)); // unaffected
+        assertEquals(2, tombstones.size());
+
+        // removeAllOf clientA does NOT affect clientB
+        tombstones.removeAllOf(clientA);
+        assertFalse(tombstones.contains(clientA, lmqWildcard));
+        assertTrue(tombstones.contains(clientB, lmqPlain));
+        assertEquals(1, tombstones.size());
+    }
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
index d301e13463..7645a47096 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
@@ -699,4 +699,229 @@ public class LiteSubscriptionRegistryImplTest {
         assertFalse(registry.client2Subscription.containsKey(clientId));
         assertEquals(0, registry.getActiveSubscriptionNum());
     }
+
+    // ==================== Exclusive Eviction Tombstone Tests 
====================
+
+    /**
+     * Test: When clientB takes over lmq in exclusive mode, clientA gets a 
tombstone
+     */
+    @Test
+    public void testExclusiveEviction_TombstoneCreatedOnEviction() {
+        String clientA = "clientA";
+        String clientB = "clientB";
+        String group = "exclusiveGroup";
+        String topic = "testTopic";
+        String lmqName = "lmq1";
+        Set<String> lmqNameSet = Collections.singleton(lmqName);
+
+        // Configure exclusive mode
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+        when(mockLifecycleManager.isSubscriptionActive(topic, 
lmqName)).thenReturn(true);
+
+        // clientA subscribes
+        registry.updateClientChannel(clientA, mock(Channel.class));
+        registry.addPartialSubscription(clientA, group, topic, lmqNameSet, 
null);
+
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+        // clientB takes over → clientA should get tombstone
+        registry.updateClientChannel(clientB, mock(Channel.class));
+        registry.addPartialSubscription(clientB, group, topic, lmqNameSet, 
null);
+
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientB, lmqName));
+    }
+
+    /**
+     * Test: addCompleteSubscription clears stale tombstones but keeps active 
ones
+     */
+    @Test
+    public void testExclusiveEviction_CompleteSyncClearsStale() {
+        String clientA = "clientA";
+        String clientB = "clientB";
+        String group = "exclusiveGroup";
+        String topic = "testTopic";
+
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+        when(mockLifecycleManager.isSubscriptionActive(eq(topic), 
anyString())).thenReturn(true);
+
+        // clientA subscribes to lmq1 and lmq2
+        Set<String> initialSet = new HashSet<>();
+        initialSet.add("lmq1");
+        initialSet.add("lmq2");
+        registry.updateClientChannel(clientA, mock(Channel.class));
+        registry.addPartialSubscription(clientA, group, topic, initialSet, 
null);
+
+        // clientB takes over lmq1 and lmq2 from clientA
+        registry.updateClientChannel(clientB, mock(Channel.class));
+        registry.addPartialSubscription(clientB, group, topic, 
Collections.singleton("lmq1"), null);
+        registry.addPartialSubscription(clientB, group, topic, 
Collections.singleton("lmq2"), null);
+
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq1"));
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));
+
+        // clientA does full sync with only lmq2 in active set → lmq1 
tombstone should be cleaned
+        Set<String> newActiveSet = new HashSet<>();
+        newActiveSet.add("lmq2");
+        newActiveSet.add("lmq3");
+        registry.addCompleteSubscription(clientA, group, topic, newActiveSet, 
2L);
+
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientA, "lmq1")); 
// cleared (not in newActiveSet)
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));  
// retained (in newActiveSet)
+    }
+
+    /**
+     * Test: removeCompleteSubscription clears all tombstones for the client
+     */
+    @Test
+    public void testExclusiveEviction_RemoveClientClearsTombstones() {
+        String clientA = "clientA";
+        String clientB = "clientB";
+        String group = "exclusiveGroup";
+        String topic = "testTopic";
+
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+        when(mockLifecycleManager.isSubscriptionActive(eq(topic), 
anyString())).thenReturn(true);
+
+        // Setup: clientA subscribes then gets evicted by clientB
+        registry.updateClientChannel(clientA, mock(Channel.class));
+        registry.addPartialSubscription(clientA, group, topic, 
Collections.singleton("lmq1"), null);
+        registry.addPartialSubscription(clientA, group, topic, 
Collections.singleton("lmq2"), null);
+
+        registry.updateClientChannel(clientB, mock(Channel.class));
+        registry.addPartialSubscription(clientB, group, topic, 
Collections.singleton("lmq1"), null);
+        registry.addPartialSubscription(clientB, group, topic, 
Collections.singleton("lmq2"), null);
+
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq1"));
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));
+
+        // clientA disconnects
+        registry.removeCompleteSubscription(clientA);
+
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientA, "lmq1"));
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));
+        // clientB unaffected
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientB, "lmq1"));
+    }
+
+    /**
+     * Test: expired subscription cleanup also clears tombstones
+     */
+    @Test
+    public void testExclusiveEviction_ExpiredCleanupClearsTombstones() {
+        String clientA = "clientA";
+        String clientB = "clientB";
+        String group = "exclusiveGroup";
+        String topic = "testTopic";
+        String lmqName = "lmq1";
+
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+        when(mockLifecycleManager.isSubscriptionActive(topic, 
lmqName)).thenReturn(true);
+
+        // clientA subscribes, then gets evicted
+        registry.updateClientChannel(clientA, mock(Channel.class));
+        registry.addPartialSubscription(clientA, group, topic, 
Collections.singleton(lmqName), null);
+
+        registry.updateClientChannel(clientB, mock(Channel.class));
+        registry.addPartialSubscription(clientB, group, topic, 
Collections.singleton(lmqName), null);
+
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+        // Simulate clientA becoming expired (sub was already removed, 
manually add back for timeout test)
+        LiteSubscription expiredSub = new LiteSubscription();
+        expiredSub.setGroup(group);
+        expiredSub.setTopic(topic);
+        expiredSub.setUpdateTime(System.currentTimeMillis() - 60000L);
+        registry.client2Subscription.put(clientA, expiredSub);
+
+        registry.cleanupExpiredSubscriptions(10000L);
+
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+    }
+
+    /**
+     * Test: addPartialSubscription clears stale tombstone when client 
re-claims an lmqName
+     */
+    @Test
+    public void testExclusiveEviction_ReClaimClearsStaleTombstone() {
+        String clientA = "clientA";
+        String clientB = "clientB";
+        String group = "exclusiveGroup";
+        String topic = "testTopic";
+        String lmqName = "lmq1";
+        Set<String> lmqNameSet = Collections.singleton(lmqName);
+
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+        when(mockLifecycleManager.isSubscriptionActive(topic, 
lmqName)).thenReturn(true);
+
+        // clientA subscribes first
+        registry.updateClientChannel(clientA, mock(Channel.class));
+        registry.addPartialSubscription(clientA, group, topic, lmqNameSet, 
null);
+
+        // clientB takes over → clientA gets tombstone
+        registry.updateClientChannel(clientB, mock(Channel.class));
+        registry.addPartialSubscription(clientB, group, topic, lmqNameSet, 
null);
+
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+        // clientA re-claims the lmqName → tombstone should be cleared
+        registry.addPartialSubscription(clientA, group, topic, lmqNameSet, 
null);
+
+        assertFalse(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+    }
+
+    /**
+     * Test: addCompleteSubscription re-sends unsubscribe for tombstoned 
lmqNames still in active set
+     */
+    @Test
+    public void testExclusiveEviction_CompleteSyncReNotifiesForTombstonedLmq() 
{
+        String clientA = "clientA";
+        String clientB = "clientB";
+        String group = "exclusiveGroup";
+        String topic = "testTopic";
+        String lmqName = "lmq1";
+
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+        when(mockLifecycleManager.isSubscriptionActive(eq(topic), 
anyString())).thenReturn(true);
+
+        Channel clientAChannel = mock(Channel.class);
+        registry.updateClientChannel(clientA, clientAChannel);
+        registry.addPartialSubscription(clientA, group, topic, 
Collections.singleton(lmqName), null);
+
+        // clientB takes over → clientA gets tombstone
+        registry.updateClientChannel(clientB, mock(Channel.class));
+        registry.addPartialSubscription(clientB, group, topic, 
Collections.singleton(lmqName), null);
+
+        assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+        // clientA does full sync still reporting lmq1 → re-notify should be 
triggered
+        Set<String> fullSet = new HashSet<>();
+        fullSet.add(lmqName);
+        registry.addCompleteSubscription(clientA, group, topic, fullSet, 2L);
+
+        // Verify notifyUnsubscribeLite was called for the tombstoned lmqName 
during full sync
+        // The first call is during eviction, the second is during re-notify
+        ArgumentCaptor<NotifyUnsubscribeLiteRequestHeader> captor =
+            ArgumentCaptor.forClass(NotifyUnsubscribeLiteRequestHeader.class);
+        verify(mockBroker2Client, org.mockito.Mockito.atLeast(2))
+            .notifyUnsubscribeLite(eq(clientAChannel), captor.capture());
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
index 453cb8fd14..fa9f2dbfaf 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.lite.AbstractLiteLifecycleManager;
 import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
+import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistry;
 import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.pop.PopConsumerLockService;
@@ -96,6 +97,8 @@ public class PopLiteMessageProcessorTest {
     private SubscriptionGroupManager subscriptionGroupManager;
     @Mock
     private AbstractLiteLifecycleManager liteLifecycleManager;
+    @Mock
+    private LiteSubscriptionRegistry liteSubscriptionRegistry;
 
     private BrokerConfig brokerConfig;
     private PopLiteMessageProcessor popLiteMessageProcessor;
@@ -110,6 +113,7 @@ public class PopLiteMessageProcessorTest {
         
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
         
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
         
when(brokerController.getLiteLifecycleManager()).thenReturn(liteLifecycleManager);
+        
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
 
         PopLiteMessageProcessor testObject = new 
PopLiteMessageProcessor(brokerController, liteEventDispatcher);
         FieldUtils.writeDeclaredField(testObject, "popLiteLongPollingService", 
popLiteLongPollingService, true);
@@ -487,4 +491,70 @@ public class PopLiteMessageProcessorTest {
         }
         return getMessageResult;
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPopByClientId_tombstoneRejectsPull() {
+        String clientId = "clientId";
+        String group = "exclusiveGroup";
+        String lmqName = "lmqName";
+
+        // Configure exclusive mode
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(subscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+
+        // Tombstone exists for this client+lmqName
+        when(liteSubscriptionRegistry.hasExclusiveEvictionTombstone(clientId, 
lmqName)).thenReturn(true);
+
+        Iterator<String> mockIterator = mock(Iterator.class);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next()).thenReturn(lmqName);
+        
when(liteEventDispatcher.getEventIterator(clientId)).thenReturn(mockIterator);
+
+        Pair<StringBuilder, GetMessageResult> result = 
popLiteMessageProcessor.popByClientId(
+            "clientHost", "parentTopic", group, clientId, 
System.currentTimeMillis(), 6000L, 32, "attemptId");
+
+        // Should return empty result since tombstone blocks the pull
+        assertEquals(0, result.getObject2().getMessageCount());
+        // popLiteTopic should never be called for the tombstoned lmqName
+        verify(popLiteMessageProcessor, never())
+            .popLiteTopic(anyString(), anyString(), anyString(), anyString(), 
anyLong(), anyLong(), anyLong(), anyString());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPopByClientId_noTombstoneAllowsPull() {
+        String clientId = "clientId";
+        String group = "exclusiveGroup";
+        String lmqName = "lmqName";
+        int msgCount = 1;
+        GetMessageResult mockResult = 
mockGetMessageResult(GetMessageStatus.FOUND, msgCount, 100L);
+        long pollTime = System.currentTimeMillis();
+
+        // Configure exclusive mode
+        SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+        groupConfig.setGroupName(group);
+        groupConfig.setLiteSubExclusive(true);
+        
when(subscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+
+        // No tombstone
+        when(liteSubscriptionRegistry.hasExclusiveEvictionTombstone(clientId, 
lmqName)).thenReturn(false);
+
+        Iterator<String> mockIterator = mock(Iterator.class);
+        when(mockIterator.hasNext()).thenReturn(true, false);
+        when(mockIterator.next()).thenReturn(lmqName);
+        
when(liteEventDispatcher.getEventIterator(clientId)).thenReturn(mockIterator);
+        doReturn(new Pair<>(new StringBuilder("0"), mockResult))
+            .when(popLiteMessageProcessor)
+            .popLiteTopic(anyString(), anyString(), anyString(), anyString(), 
anyLong(), anyLong(), anyLong(), anyString());
+
+        Pair<StringBuilder, GetMessageResult> result = 
popLiteMessageProcessor.popByClientId(
+            "clientHost", "parentTopic", group, clientId, pollTime, 6000L, 32, 
"attemptId");
+
+        // Should return the message since no tombstone blocks
+        assertEquals(msgCount, result.getObject2().getMessageCount());
+        verify(popLiteMessageProcessor).popLiteTopic("parentTopic", 
"clientHost", group, lmqName, 32L, pollTime, 6000L, "attemptId");
+    }
 }


Reply via email to