This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 610acb5da2 [ISSUE #10384] Enforce Lite exclusive subscription eviction
via server-side tombstones (#10386)
610acb5da2 is described below
commit 610acb5da2d6449d781bf43fd9a01129a526c29e
Author: Quan <[email protected]>
AuthorDate: Tue Jun 2 20:10:32 2026 +0800
[ISSUE #10384] Enforce Lite exclusive subscription eviction via server-side
tombstones (#10386)
---
.../broker/lite/ExclusiveEvictionTombstones.java | 93 +++++++++
.../broker/lite/LiteSubscriptionRegistry.java | 2 +
.../broker/lite/LiteSubscriptionRegistryImpl.java | 36 ++++
.../broker/processor/PopLiteMessageProcessor.java | 7 +
.../lite/ExclusiveEvictionTombstonesTest.java | 158 +++++++++++++++
.../lite/LiteSubscriptionRegistryImplTest.java | 225 +++++++++++++++++++++
.../processor/PopLiteMessageProcessorTest.java | 70 +++++++
7 files changed, 591 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstones.java
b/broker/src/main/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstones.java
new file mode 100644
index 0000000000..0efaf03c67
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstones.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.lite;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.lite.LiteUtil;
+
+/**
+ * Manages tombstones for exclusive subscription eviction.
+ * <p>
+ * When a client is evicted from a liteTopic (exclusive mode), an entry is
placed here
+ * to prevent the evicted client from pulling messages until the tombstone is
cleared
+ * during the next full subscription sync or client removal.
+ * <p>
+ * Key format: clientId$lmqName
+ */
+public class ExclusiveEvictionTombstones {
+
+ private static final char KEY_SEPARATOR = LiteUtil.SEPARATOR;
+
+ private final Set<String> tombstones = ConcurrentHashMap.newKeySet();
+
+ /**
+ * Check whether a tombstone exists for the given client and lmqName.
+ */
+ public boolean contains(String clientId, String lmqName) {
+ return tombstones.contains(buildKey(clientId, lmqName));
+ }
+
+ /**
+ * Add a tombstone for the given client and lmqName.
+ */
+ public void add(String clientId, String lmqName) {
+ tombstones.add(buildKey(clientId, lmqName));
+ }
+
+ /**
+ * Remove the tombstone for the given client and lmqName, if present.
+ */
+ public void remove(String clientId, String lmqName) {
+ tombstones.remove(buildKey(clientId, lmqName));
+ }
+
+ /**
+ * Remove all tombstones belonging to the specified client.
+ */
+ public void removeAllOf(String clientId) {
+ String prefix = clientId + KEY_SEPARATOR;
+ tombstones.removeIf(key -> key.startsWith(prefix));
+ }
+
+ /**
+ * For a given client, remove tombstones whose lmqName is NOT in the
provided active set.
+ * This is used during full subscription sync to clear stale tombstones.
+ */
+ public void removeStale(String clientId, Set<String> activeLmqNames) {
+ String prefix = clientId + KEY_SEPARATOR;
+ tombstones.removeIf(key -> {
+ if (!key.startsWith(prefix)) {
+ return false;
+ }
+ String lmqName = key.substring(prefix.length());
+ return !activeLmqNames.contains(lmqName);
+ });
+ }
+
+ /**
+ * Return the current number of tombstones (for monitoring/testing).
+ */
+ public int size() {
+ return tombstones.size();
+ }
+
+ private static String buildKey(String clientId, String lmqName) {
+ return clientId + KEY_SEPARATOR + lmqName;
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
index 50fbc373f2..965ed180fc 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistry.java
@@ -50,6 +50,8 @@ public interface LiteSubscriptionRegistry {
void cleanSubscription(String lmqName, boolean notifyClient);
+ boolean hasExclusiveEvictionTombstone(String clientId, String lmqName);
+
void start();
void shutdown();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
index 3ced933127..b487b8757f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImpl.java
@@ -61,6 +61,8 @@ public class LiteSubscriptionRegistryImpl extends
ServiceThread implements LiteS
private final BrokerController brokerController;
private final AbstractLiteLifecycleManager liteLifecycleManager;
+ private final ExclusiveEvictionTombstones exclusiveEvictionTombstones =
new ExclusiveEvictionTombstones();
+
public LiteSubscriptionRegistryImpl(BrokerController brokerController,
AbstractLiteLifecycleManager liteLifecycleManager) {
this.brokerController = brokerController;
@@ -99,6 +101,10 @@ public class LiteSubscriptionRegistryImpl extends
ServiceThread implements LiteS
// First remove the old subscription
if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
excludeClientByLmqName(clientId, group, lmqName);
+ // Boundary case: this client may have a stale tombstone from
a previous eviction.
+ // Since it is now actively re-claiming the lmqName, clear its
own tombstone so
+ // subsequent popLiteTopic is not blocked by the stale mark.
+ exclusiveEvictionTombstones.remove(clientId, lmqName);
}
resetOffset(lmqName, group, clientId, offsetOption);
addTopicGroup(clientGroup, lmqName);
@@ -144,12 +150,31 @@ public class LiteSubscriptionRegistryImpl extends
ServiceThread implements LiteS
thisSub.addLiteTopic(lmqName);
addTopicGroup(clientGroup, lmqName);
});
+ // Tombstone operations only apply to exclusive groups.
+ if (LiteMetadataUtil.isSubLiteExclusive(group, brokerController)) {
+ // Boundary case: if any lmqName in the client's reported full
subscription still has
+ // a tombstone, the previous notifyUnsubscribeLite was likely
lost. Re-send the
+ // unsubscribe notification to drive the client's local state to
converge.
+ lmqNameNew.stream()
+ .filter(lmqName ->
exclusiveEvictionTombstones.contains(clientId, lmqName))
+ .forEach(lmqName -> {
+ LOGGER.info("re-notify unsubscribe for tombstoned lmqName,
clientId:{}, group:{}, lmqName:{}",
+ clientId, group, lmqName);
+ notifyUnsubscribeLite(clientId, group, lmqName);
+ });
+ // Clean exclusive-eviction tombstones for liteTopics no longer in
the client's full subscription set
+ exclusiveEvictionTombstones.removeStale(clientId, lmqNameNew);
+ }
}
@Override
public void removeCompleteSubscription(String clientId) {
clientChannels.remove(clientId);
LiteSubscription thisSub = client2Subscription.remove(clientId);
+ // Only clean tombstones for exclusive groups.
+ if (thisSub == null ||
LiteMetadataUtil.isSubLiteExclusive(thisSub.getGroup(), brokerController)) {
+ exclusiveEvictionTombstones.removeAllOf(clientId);
+ }
if (thisSub == null) {
return;
}
@@ -299,6 +324,7 @@ public class LiteSubscriptionRegistryImpl extends
ServiceThread implements LiteS
client2Subscription.remove(clientGroup.clientId);
}
}
+ exclusiveEvictionTombstones.add(clientGroup.clientId, lmqName);
notifyUnsubscribeLite(clientGroup.clientId, clientGroup.group,
lmqName);
boolean resetOffset =
LiteMetadataUtil.isResetOffsetInExclusiveMode(group, brokerController);
LOGGER.info("excludeClientByLmqName group:{}, lmqName:{},
resetOffset:{}, clientId:{} -> {}",
@@ -464,6 +490,16 @@ public class LiteSubscriptionRegistryImpl extends
ServiceThread implements LiteS
LOGGER.info("Remove expired LiteSubscription, topic: {}, group:
{}, clientId: {}, timeout: {}ms, expired: {}ms",
topic, group, clientId, checkTimeout,
System.currentTimeMillis() - liteSubscription.getUpdateTime());
});
+
+ int tombstoneSize = exclusiveEvictionTombstones.size();
+ if (tombstoneSize > 0) {
+ LOGGER.info("ExclusiveEvictionTombstones size: {}", tombstoneSize);
+ }
+ }
+
+ @Override
+ public boolean hasExclusiveEvictionTombstone(String clientId, String
lmqName) {
+ return exclusiveEvictionTombstones.contains(clientId, lmqName);
}
}
\ No newline at end of file
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
index 9314dab734..7baca87694 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
+import org.apache.rocketmq.broker.lite.LiteMetadataUtil;
import org.apache.rocketmq.broker.longpolling.PollingResult;
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
import org.apache.rocketmq.broker.metrics.LiteConsumerLagCalculator;
@@ -253,6 +254,7 @@ public class PopLiteMessageProcessor implements
NettyRequestProcessor {
StringBuilder orderCountInfoAll = new StringBuilder();
AtomicLong total = new AtomicLong(0);
+ boolean isExclusiveGroup = LiteMetadataUtil.isSubLiteExclusive(group,
brokerController);
Set<String> processed = new HashSet<>(); // deduplication in one
request
Iterator<String> iterator =
liteEventDispatcher.getEventIterator(clientId);
while (total.get() < maxNum && iterator.hasNext()) {
@@ -263,6 +265,11 @@ public class PopLiteMessageProcessor implements
NettyRequestProcessor {
if (!processed.add(lmqName)) {
continue; // wait for next pop request or re-fetch in current
process, here prefer the former approach
}
+ // Tombstone check: reject pull if this client was evicted from
the liteTopic (exclusive mode)
+ if (isExclusiveGroup &&
brokerController.getLiteSubscriptionRegistry().hasExclusiveEvictionTombstone(clientId,
lmqName)) {
+ LOGGER.info("popLiteTopic rejected by tombstone: clientId={},
group={}, lmqName={}", clientId, group, lmqName);
+ continue;
+ }
Pair<StringBuilder, GetMessageResult> pair =
popLiteTopic(parentTopic, clientHost, group, lmqName,
maxNum - total.get(), popTime, invisibleTime, attemptId);
if (null == pair || pair.getObject2().getMessageCount() <= 0) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstonesTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstonesTest.java
new file mode 100644
index 0000000000..70f88ee14f
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/lite/ExclusiveEvictionTombstonesTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.lite;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExclusiveEvictionTombstonesTest {
+
+ private ExclusiveEvictionTombstones tombstones;
+
+ @Before
+ public void setUp() {
+ tombstones = new ExclusiveEvictionTombstones();
+ }
+
+ @Test
+ public void testAddAndContains() {
+ // empty store
+ assertFalse(tombstones.contains("client1", "lmq1"));
+ assertEquals(0, tombstones.size());
+
+ // basic add
+ tombstones.add("client1", "lmq1");
+ assertTrue(tombstones.contains("client1", "lmq1"));
+ assertFalse(tombstones.contains("client1", "lmq2")); // same client,
different lmq
+ assertFalse(tombstones.contains("client2", "lmq1")); // different
client, same lmq
+ assertEquals(1, tombstones.size());
+
+ // duplicate add is idempotent
+ tombstones.add("client1", "lmq1");
+ assertEquals(1, tombstones.size());
+ }
+
+ @Test
+ public void testRemoveAllOf() {
+ tombstones.add("client1", "lmq1");
+ tombstones.add("client1", "lmq2");
+ tombstones.add("client2", "lmq1");
+
+ // non-existent client is no-op
+ tombstones.removeAllOf("client3");
+ assertEquals(3, tombstones.size());
+
+ // removes only target client
+ tombstones.removeAllOf("client1");
+ assertFalse(tombstones.contains("client1", "lmq1"));
+ assertFalse(tombstones.contains("client1", "lmq2"));
+ assertTrue(tombstones.contains("client2", "lmq1"));
+ assertEquals(1, tombstones.size());
+ }
+
+ @Test
+ public void testRemoveStale() {
+ tombstones.add("client1", "lmq1");
+ tombstones.add("client1", "lmq2");
+ tombstones.add("client1", "lmq3");
+ tombstones.add("client2", "lmq1");
+
+ // removes stale, keeps active, does not affect other clients
+ Set<String> activeSet = new HashSet<>();
+ activeSet.add("lmq2");
+ activeSet.add("lmq3");
+ tombstones.removeStale("client1", activeSet);
+
+ assertFalse(tombstones.contains("client1", "lmq1")); // removed (not
in activeSet)
+ assertTrue(tombstones.contains("client1", "lmq2")); // retained
+ assertTrue(tombstones.contains("client1", "lmq3")); // retained
+ assertTrue(tombstones.contains("client2", "lmq1")); // unaffected
+ assertEquals(3, tombstones.size());
+
+ // empty activeSet clears all for that client
+ tombstones.removeStale("client1", new HashSet<>());
+ assertFalse(tombstones.contains("client1", "lmq2"));
+ assertFalse(tombstones.contains("client1", "lmq3"));
+ assertTrue(tombstones.contains("client2", "lmq1")); // still unaffected
+ }
+
+ @Test
+ public void testSize() {
+ assertEquals(0, tombstones.size());
+
+ tombstones.add("c1", "l1");
+ assertEquals(1, tombstones.size());
+
+ tombstones.add("c1", "l2");
+ assertEquals(2, tombstones.size());
+
+ tombstones.add("c2", "l1");
+ assertEquals(3, tombstones.size());
+
+ tombstones.removeAllOf("c1");
+ assertEquals(1, tombstones.size());
+ }
+
+ /**
+ * Verifies that real RocketMQ clientId formats (containing '@') and
lmqName formats
+ * (e.g. topic@group for wildcard) do not cause cross-client collisions.
+ */
+ @Test
+ public void testRealClientIdFormat_NoCrossClientCollision() {
+ // RocketMQ clientId: IP@instanceName vs IP@instanceName@unitName
+ // clientA is a strict prefix of clientB if '@' were the separator
+ String clientA = "10.0.0.1@DEFAULT";
+ String clientB = "10.0.0.1@DEFAULT@unit1";
+ String lmqPlain = "lmq1";
+ String lmqWildcard = "parentTopic@wildcardGroup"; // lmqName also
contains '@'
+
+ tombstones.add(clientA, lmqPlain);
+ tombstones.add(clientA, lmqWildcard);
+ tombstones.add(clientB, lmqPlain);
+
+ // basic isolation
+ assertTrue(tombstones.contains(clientA, lmqPlain));
+ assertTrue(tombstones.contains(clientA, lmqWildcard));
+ assertTrue(tombstones.contains(clientB, lmqPlain));
+ assertFalse(tombstones.contains(clientA, "parentTopic")); // partial
lmqName no match
+ assertFalse(tombstones.contains("10.0.0.1@DEFAULT", lmqPlain +
"@extra")); // no false match
+ assertEquals(3, tombstones.size());
+
+ // removeStale for clientA does NOT affect clientB
+ Set<String> activeSet = new HashSet<>();
+ activeSet.add(lmqWildcard);
+ tombstones.removeStale(clientA, activeSet);
+
+ assertFalse(tombstones.contains(clientA, lmqPlain)); // removed
+ assertTrue(tombstones.contains(clientA, lmqWildcard)); // retained
+ assertTrue(tombstones.contains(clientB, lmqPlain)); // unaffected
+ assertEquals(2, tombstones.size());
+
+ // removeAllOf clientA does NOT affect clientB
+ tombstones.removeAllOf(clientA);
+ assertFalse(tombstones.contains(clientA, lmqWildcard));
+ assertTrue(tombstones.contains(clientB, lmqPlain));
+ assertEquals(1, tombstones.size());
+ }
+}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
index d301e13463..7645a47096 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteSubscriptionRegistryImplTest.java
@@ -699,4 +699,229 @@ public class LiteSubscriptionRegistryImplTest {
assertFalse(registry.client2Subscription.containsKey(clientId));
assertEquals(0, registry.getActiveSubscriptionNum());
}
+
+ // ==================== Exclusive Eviction Tombstone Tests
====================
+
+ /**
+ * Test: When clientB takes over lmq in exclusive mode, clientA gets a
tombstone
+ */
+ @Test
+ public void testExclusiveEviction_TombstoneCreatedOnEviction() {
+ String clientA = "clientA";
+ String clientB = "clientB";
+ String group = "exclusiveGroup";
+ String topic = "testTopic";
+ String lmqName = "lmq1";
+ Set<String> lmqNameSet = Collections.singleton(lmqName);
+
+ // Configure exclusive mode
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+ when(mockLifecycleManager.isSubscriptionActive(topic,
lmqName)).thenReturn(true);
+
+ // clientA subscribes
+ registry.updateClientChannel(clientA, mock(Channel.class));
+ registry.addPartialSubscription(clientA, group, topic, lmqNameSet,
null);
+
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+ // clientB takes over → clientA should get tombstone
+ registry.updateClientChannel(clientB, mock(Channel.class));
+ registry.addPartialSubscription(clientB, group, topic, lmqNameSet,
null);
+
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientB, lmqName));
+ }
+
+ /**
+ * Test: addCompleteSubscription clears stale tombstones but keeps active
ones
+ */
+ @Test
+ public void testExclusiveEviction_CompleteSyncClearsStale() {
+ String clientA = "clientA";
+ String clientB = "clientB";
+ String group = "exclusiveGroup";
+ String topic = "testTopic";
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+ when(mockLifecycleManager.isSubscriptionActive(eq(topic),
anyString())).thenReturn(true);
+
+ // clientA subscribes to lmq1 and lmq2
+ Set<String> initialSet = new HashSet<>();
+ initialSet.add("lmq1");
+ initialSet.add("lmq2");
+ registry.updateClientChannel(clientA, mock(Channel.class));
+ registry.addPartialSubscription(clientA, group, topic, initialSet,
null);
+
+ // clientB takes over lmq1 and lmq2 from clientA
+ registry.updateClientChannel(clientB, mock(Channel.class));
+ registry.addPartialSubscription(clientB, group, topic,
Collections.singleton("lmq1"), null);
+ registry.addPartialSubscription(clientB, group, topic,
Collections.singleton("lmq2"), null);
+
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq1"));
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));
+
+ // clientA does full sync with only lmq2 in active set → lmq1
tombstone should be cleaned
+ Set<String> newActiveSet = new HashSet<>();
+ newActiveSet.add("lmq2");
+ newActiveSet.add("lmq3");
+ registry.addCompleteSubscription(clientA, group, topic, newActiveSet,
2L);
+
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientA, "lmq1"));
// cleared (not in newActiveSet)
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));
// retained (in newActiveSet)
+ }
+
+ /**
+ * Test: removeCompleteSubscription clears all tombstones for the client
+ */
+ @Test
+ public void testExclusiveEviction_RemoveClientClearsTombstones() {
+ String clientA = "clientA";
+ String clientB = "clientB";
+ String group = "exclusiveGroup";
+ String topic = "testTopic";
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+ when(mockLifecycleManager.isSubscriptionActive(eq(topic),
anyString())).thenReturn(true);
+
+ // Setup: clientA subscribes then gets evicted by clientB
+ registry.updateClientChannel(clientA, mock(Channel.class));
+ registry.addPartialSubscription(clientA, group, topic,
Collections.singleton("lmq1"), null);
+ registry.addPartialSubscription(clientA, group, topic,
Collections.singleton("lmq2"), null);
+
+ registry.updateClientChannel(clientB, mock(Channel.class));
+ registry.addPartialSubscription(clientB, group, topic,
Collections.singleton("lmq1"), null);
+ registry.addPartialSubscription(clientB, group, topic,
Collections.singleton("lmq2"), null);
+
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq1"));
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));
+
+ // clientA disconnects
+ registry.removeCompleteSubscription(clientA);
+
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientA, "lmq1"));
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientA, "lmq2"));
+ // clientB unaffected
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientB, "lmq1"));
+ }
+
+ /**
+ * Test: expired subscription cleanup also clears tombstones
+ */
+ @Test
+ public void testExclusiveEviction_ExpiredCleanupClearsTombstones() {
+ String clientA = "clientA";
+ String clientB = "clientB";
+ String group = "exclusiveGroup";
+ String topic = "testTopic";
+ String lmqName = "lmq1";
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+ when(mockLifecycleManager.isSubscriptionActive(topic,
lmqName)).thenReturn(true);
+
+ // clientA subscribes, then gets evicted
+ registry.updateClientChannel(clientA, mock(Channel.class));
+ registry.addPartialSubscription(clientA, group, topic,
Collections.singleton(lmqName), null);
+
+ registry.updateClientChannel(clientB, mock(Channel.class));
+ registry.addPartialSubscription(clientB, group, topic,
Collections.singleton(lmqName), null);
+
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+ // Simulate clientA becoming expired (sub was already removed,
manually add back for timeout test)
+ LiteSubscription expiredSub = new LiteSubscription();
+ expiredSub.setGroup(group);
+ expiredSub.setTopic(topic);
+ expiredSub.setUpdateTime(System.currentTimeMillis() - 60000L);
+ registry.client2Subscription.put(clientA, expiredSub);
+
+ registry.cleanupExpiredSubscriptions(10000L);
+
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+ }
+
+ /**
+ * Test: addPartialSubscription clears stale tombstone when client
re-claims an lmqName
+ */
+ @Test
+ public void testExclusiveEviction_ReClaimClearsStaleTombstone() {
+ String clientA = "clientA";
+ String clientB = "clientB";
+ String group = "exclusiveGroup";
+ String topic = "testTopic";
+ String lmqName = "lmq1";
+ Set<String> lmqNameSet = Collections.singleton(lmqName);
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+ when(mockLifecycleManager.isSubscriptionActive(topic,
lmqName)).thenReturn(true);
+
+ // clientA subscribes first
+ registry.updateClientChannel(clientA, mock(Channel.class));
+ registry.addPartialSubscription(clientA, group, topic, lmqNameSet,
null);
+
+ // clientB takes over → clientA gets tombstone
+ registry.updateClientChannel(clientB, mock(Channel.class));
+ registry.addPartialSubscription(clientB, group, topic, lmqNameSet,
null);
+
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+ // clientA re-claims the lmqName → tombstone should be cleared
+ registry.addPartialSubscription(clientA, group, topic, lmqNameSet,
null);
+
+ assertFalse(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+ }
+
+ /**
+ * Test: addCompleteSubscription re-sends unsubscribe for tombstoned
lmqNames still in active set
+ */
+ @Test
+ public void testExclusiveEviction_CompleteSyncReNotifiesForTombstonedLmq()
{
+ String clientA = "clientA";
+ String clientB = "clientB";
+ String group = "exclusiveGroup";
+ String topic = "testTopic";
+ String lmqName = "lmq1";
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(mockSubscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+ when(mockLifecycleManager.isSubscriptionActive(eq(topic),
anyString())).thenReturn(true);
+
+ Channel clientAChannel = mock(Channel.class);
+ registry.updateClientChannel(clientA, clientAChannel);
+ registry.addPartialSubscription(clientA, group, topic,
Collections.singleton(lmqName), null);
+
+ // clientB takes over → clientA gets tombstone
+ registry.updateClientChannel(clientB, mock(Channel.class));
+ registry.addPartialSubscription(clientB, group, topic,
Collections.singleton(lmqName), null);
+
+ assertTrue(registry.hasExclusiveEvictionTombstone(clientA, lmqName));
+
+ // clientA does full sync still reporting lmq1 → re-notify should be
triggered
+ Set<String> fullSet = new HashSet<>();
+ fullSet.add(lmqName);
+ registry.addCompleteSubscription(clientA, group, topic, fullSet, 2L);
+
+ // Verify notifyUnsubscribeLite was called for the tombstoned lmqName
during full sync
+ // The first call is during eviction, the second is during re-notify
+ ArgumentCaptor<NotifyUnsubscribeLiteRequestHeader> captor =
+ ArgumentCaptor.forClass(NotifyUnsubscribeLiteRequestHeader.class);
+ verify(mockBroker2Client, org.mockito.Mockito.atLeast(2))
+ .notifyUnsubscribeLite(eq(clientAChannel), captor.capture());
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
index 453cb8fd14..fa9f2dbfaf 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.lite.AbstractLiteLifecycleManager;
import org.apache.rocketmq.broker.lite.LiteEventDispatcher;
+import org.apache.rocketmq.broker.lite.LiteSubscriptionRegistry;
import org.apache.rocketmq.broker.longpolling.PopLiteLongPollingService;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
@@ -96,6 +97,8 @@ public class PopLiteMessageProcessorTest {
private SubscriptionGroupManager subscriptionGroupManager;
@Mock
private AbstractLiteLifecycleManager liteLifecycleManager;
+ @Mock
+ private LiteSubscriptionRegistry liteSubscriptionRegistry;
private BrokerConfig brokerConfig;
private PopLiteMessageProcessor popLiteMessageProcessor;
@@ -110,6 +113,7 @@ public class PopLiteMessageProcessorTest {
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
when(brokerController.getLiteLifecycleManager()).thenReturn(liteLifecycleManager);
+
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
PopLiteMessageProcessor testObject = new
PopLiteMessageProcessor(brokerController, liteEventDispatcher);
FieldUtils.writeDeclaredField(testObject, "popLiteLongPollingService",
popLiteLongPollingService, true);
@@ -487,4 +491,70 @@ public class PopLiteMessageProcessorTest {
}
return getMessageResult;
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPopByClientId_tombstoneRejectsPull() {
+ String clientId = "clientId";
+ String group = "exclusiveGroup";
+ String lmqName = "lmqName";
+
+ // Configure exclusive mode
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(subscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+
+ // Tombstone exists for this client+lmqName
+ when(liteSubscriptionRegistry.hasExclusiveEvictionTombstone(clientId,
lmqName)).thenReturn(true);
+
+ Iterator<String> mockIterator = mock(Iterator.class);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next()).thenReturn(lmqName);
+
when(liteEventDispatcher.getEventIterator(clientId)).thenReturn(mockIterator);
+
+ Pair<StringBuilder, GetMessageResult> result =
popLiteMessageProcessor.popByClientId(
+ "clientHost", "parentTopic", group, clientId,
System.currentTimeMillis(), 6000L, 32, "attemptId");
+
+ // Should return empty result since tombstone blocks the pull
+ assertEquals(0, result.getObject2().getMessageCount());
+ // popLiteTopic should never be called for the tombstoned lmqName
+ verify(popLiteMessageProcessor, never())
+ .popLiteTopic(anyString(), anyString(), anyString(), anyString(),
anyLong(), anyLong(), anyLong(), anyString());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPopByClientId_noTombstoneAllowsPull() {
+ String clientId = "clientId";
+ String group = "exclusiveGroup";
+ String lmqName = "lmqName";
+ int msgCount = 1;
+ GetMessageResult mockResult =
mockGetMessageResult(GetMessageStatus.FOUND, msgCount, 100L);
+ long pollTime = System.currentTimeMillis();
+
+ // Configure exclusive mode
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setGroupName(group);
+ groupConfig.setLiteSubExclusive(true);
+
when(subscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
+
+ // No tombstone
+ when(liteSubscriptionRegistry.hasExclusiveEvictionTombstone(clientId,
lmqName)).thenReturn(false);
+
+ Iterator<String> mockIterator = mock(Iterator.class);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next()).thenReturn(lmqName);
+
when(liteEventDispatcher.getEventIterator(clientId)).thenReturn(mockIterator);
+ doReturn(new Pair<>(new StringBuilder("0"), mockResult))
+ .when(popLiteMessageProcessor)
+ .popLiteTopic(anyString(), anyString(), anyString(), anyString(),
anyLong(), anyLong(), anyLong(), anyString());
+
+ Pair<StringBuilder, GetMessageResult> result =
popLiteMessageProcessor.popByClientId(
+ "clientHost", "parentTopic", group, clientId, pollTime, 6000L, 32,
"attemptId");
+
+ // Should return the message since no tombstone blocks
+ assertEquals(msgCount, result.getObject2().getMessageCount());
+ verify(popLiteMessageProcessor).popLiteTopic("parentTopic",
"clientHost", group, lmqName, 32L, pollTime, 6000L, "attemptId");
+ }
}