This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new ca77ac91 fix(ClientImpl.java): broadcast SyncLiteSubscriptionRequest
to all route endpoints (#1245)
ca77ac91 is described below
commit ca77ac914fe98fc82320aa6c0d6a4f581c35421e
Author: Quan <[email protected]>
AuthorDate: Thu May 14 14:44:01 2026 +0800
fix(ClientImpl.java): broadcast SyncLiteSubscriptionRequest to all route
endpoints (#1245)
LiteSubscriptionManager#syncLiteSubscription previously only sent the
request to the client's initial endpoint, causing lite subscription
state to be inconsistent across brokers in a multi-broker cluster.
Broadcast the request to all route endpoints and aggregate the results.
Widen ClientImpl#getTotalRouteEndpoints() visibility to public accordingly.
---
.../rocketmq/client/java/impl/ClientImpl.java | 2 +-
.../impl/consumer/LiteSubscriptionManager.java | 23 ++++++++++++++--------
.../impl/consumer/LiteSubscriptionManagerTest.java | 4 +++-
3 files changed, 19 insertions(+), 10 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 691056e1..4d772104 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -669,7 +669,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}, MoreExecutors.directExecutor());
}
- protected Set<Endpoints> getTotalRouteEndpoints() {
+ public Set<Endpoints> getTotalRouteEndpoints() {
Set<Endpoints> totalRouteEndpoints = new HashSet<>();
for (TopicRouteData topicRouteData : topicRouteCache.values()) {
totalRouteEndpoints.addAll(topicRouteData.getTotalEndpoints());
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
index 07881144..7ce9a4aa 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManager.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.client.java.impl.consumer;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.LiteSubscriptionAction;
import apache.rocketmq.v2.NotifyUnsubscribeLiteCommand;
-import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
@@ -30,8 +29,10 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import
org.apache.rocketmq.client.java.exception.LiteSubscriptionQuotaExceededEx
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.misc.ProtobufUtils;
+import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,14 +161,19 @@ public class LiteSubscriptionManager {
builder.setOffsetOption(ProtobufUtils.toProtobufOffsetOption(offsetOption));
}
+ final SyncLiteSubscriptionRequest request = builder.build();
final Duration requestTimeout =
consumerImpl.getClientConfiguration().getRequestTimeout();
- RpcFuture<SyncLiteSubscriptionRequest, SyncLiteSubscriptionResponse>
future = consumerImpl.getClientManager()
- .syncLiteSubscription(consumerImpl.getEndpoints(),
builder.build(), requestTimeout);
- return Futures.transformAsync(future, response -> {
- final Status status = response.getStatus();
- StatusChecker.check(status, future);
- return Futures.immediateVoidFuture();
- }, MoreExecutors.directExecutor());
+ final Set<Endpoints> totalRouteEndpoints =
consumerImpl.getTotalRouteEndpoints();
+ List<ListenableFuture<Void>> futures = new ArrayList<>();
+ for (Endpoints endpoints : totalRouteEndpoints) {
+ final RpcFuture<SyncLiteSubscriptionRequest,
SyncLiteSubscriptionResponse> rpcFuture =
+
consumerImpl.getClientManager().syncLiteSubscription(endpoints, request,
requestTimeout);
+ futures.add(Futures.transformAsync(rpcFuture, response -> {
+ StatusChecker.check(response.getStatus(), rpcFuture);
+ return Futures.immediateVoidFuture();
+ }, MoreExecutors.directExecutor()));
+ }
+ return Futures.transform(Futures.allAsList(futures), input -> null,
MoreExecutors.directExecutor());
}
void onNotifyUnsubscribeLiteCommand(NotifyUnsubscribeLiteCommand command) {
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
index 6087b6d7..b8e18a97 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/LiteSubscriptionManagerTest.java
@@ -37,6 +37,7 @@ import apache.rocketmq.v2.SyncLiteSubscriptionRequest;
import apache.rocketmq.v2.SyncLiteSubscriptionResponse;
import com.google.common.util.concurrent.ListenableFuture;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
@@ -84,7 +85,8 @@ public class LiteSubscriptionManagerTest {
when(consumerImpl.getClientConfiguration()).thenReturn(clientConfiguration);
when(clientConfiguration.getRequestTimeout()).thenReturn(Duration.ofSeconds(30));
when(consumerImpl.getClientManager()).thenReturn(clientManager);
- when(consumerImpl.getEndpoints()).thenReturn(endpoints);
+ lenient().when(consumerImpl.getEndpoints()).thenReturn(endpoints);
+
when(consumerImpl.getTotalRouteEndpoints()).thenReturn(Collections.singleton(endpoints));
// Mock successful response
SyncLiteSubscriptionResponse successResponse =
SyncLiteSubscriptionResponse.newBuilder()