This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new be1b7d2a2 [ISSUE #5313] Optimize ReceiptHandleProcessor and
GrpcChannelManager (#5275)
be1b7d2a2 is described below
commit be1b7d2a261e5a4fc876dcf5d3fcb699106bbfd6
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Oct 14 16:53:41 2022 +0800
[ISSUE #5313] Optimize ReceiptHandleProcessor and GrpcChannelManager (#5275)
* [ISSUE #5313] Optimize proxy module methods and interfaces
Use channel to manage MessageReceiptHandle
Remove group in GrpcChannelManager
* [ISSUE #5313] Add get method for ReceiptHandleGroup
---
.../rocketmq/broker/client/ConsumerManager.java | 8 +
.../processor/ReplyMessageProcessorTest.java | 2 +-
.../rocketmq/proxy/common/ContextVariable.java | 1 +
.../apache/rocketmq/proxy/common/ProxyContext.java | 10 +
.../rocketmq/proxy/common/ReceiptHandleGroup.java | 24 ++
.../proxy/grpc/v2/channel/GrpcChannelManager.java | 34 +--
.../proxy/grpc/v2/channel/GrpcClientChannel.java | 20 +-
.../proxy/grpc/v2/client/ClientActivity.java | 13 +-
.../proxy/grpc/v2/consumer/AckMessageActivity.java | 2 +-
.../consumer/ChangeInvisibleDurationActivity.java | 2 +-
.../grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
.../v2/producer/ForwardMessageToDLQActivity.java | 2 +-
.../rocketmq/proxy/processor/ClientProcessor.java | 4 +-
.../proxy/processor/DefaultMessagingProcessor.java | 4 +-
.../proxy/processor/MessagingProcessor.java | 2 +-
.../proxy/processor/ReceiptHandleProcessor.java | 35 ++-
.../proxy/common/ReceiptHandleGroupTest.java | 79 ++++++
.../proxy/grpc/v2/client/ClientActivityTest.java | 1 -
.../ChangeInvisibleDurationActivityTest.java | 2 +-
.../producer/ForwardMessageToDLQActivityTest.java | 2 +-
.../processor/ReceiptHandleProcessorTest.java | 312 ++++++++++++++++++---
.../mqclient/ProxyClientRemotingProcessorTest.java | 2 +-
22 files changed, 451 insertions(+), 112 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index ee11329e4..50c3729ba 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -65,6 +65,14 @@ public class ConsumerManager {
return null;
}
+ public ClientChannelInfo findChannel(final String group, final Channel
channel) {
+ ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+ if (consumerGroupInfo != null) {
+ return consumerGroupInfo.findChannel(channel);
+ }
+ return null;
+ }
+
public SubscriptionData findSubscriptionData(final String group, final
String topic) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
index e604b6336..899105873 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
@@ -96,7 +96,7 @@ public class ReplyMessageProcessorTest {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
brokerController.getProducerManager().registerProducer(group,
clientInfo);
final RemotingCommand request =
createSendMessageRequestHeaderCommand(RequestCode.SEND_REPLY_MESSAGE);
-
when(brokerController.getBroker2Client().callClient(any(Channel.class),
any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS,
request));
+ when(brokerController.getBroker2Client().callClient(any(),
any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS,
request));
RemotingCommand responseToReturn =
replyMessageProcessor.processRequest(handlerContext, request);
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
index dcfc52909..00b3e76c7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
@@ -21,6 +21,7 @@ public class ContextVariable {
public static final String REMOTE_ADDRESS = "remote-address";
public static final String LOCAL_ADDRESS = "local-address";
public static final String CLIENT_ID = "client-id";
+ public static final String CHANNEL = "channel";
public static final String LANGUAGE = "language";
public static final String CLIENT_VERSION = "client-version";
public static final String REMAINING_MS = "remaining-ms";
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
index 6a35993fe..8fb9f4d53 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.proxy.common;
+import io.netty.channel.Channel;
import java.util.HashMap;
import java.util.Map;
@@ -76,6 +77,15 @@ public class ProxyContext {
return this.getVal(ContextVariable.CLIENT_ID);
}
+ public ProxyContext setChannel(Channel channel) {
+ this.withVal(ContextVariable.CHANNEL, channel);
+ return this;
+ }
+
+ public Channel getChannel() {
+ return this.getVal(ContextVariable.CHANNEL);
+ }
+
public ProxyContext setLanguage(String language) {
this.withVal(ContextVariable.LANGUAGE, language);
return this;
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index 07d32445f..05867c334 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -100,6 +100,30 @@ public class ReceiptHandleGroup {
return this.receiptHandleMap.isEmpty();
}
+ public MessageReceiptHandle get(String msgID, String handle) {
+ Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
+ if (handleMap == null) {
+ return null;
+ }
+ long timeout =
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+ AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
+ handleMap.computeIfPresent(handle, (handleKey, handleData) -> {
+ if (!handleData.lock(timeout)) {
+ throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to get handle
failed");
+ }
+ try {
+ if (handleData.needRemove) {
+ return null;
+ }
+ res.set(handleData.messageReceiptHandle);
+ } finally {
+ handleData.unlock();
+ }
+ return handleData;
+ });
+ return res.get();
+ }
+
public MessageReceiptHandle remove(String msgID, String handle) {
Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
if (handleMap == null) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
index 57a7b1104..c25727743 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.proxy.grpc.v2.channel;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,7 +25,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.proxy.common.ProxyContext;
@@ -38,7 +36,7 @@ import
org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
public class GrpcChannelManager implements StartAndShutdown {
private final ProxyRelayService proxyRelayService;
- protected final ConcurrentMap<String /* group */, Map<String,
GrpcClientChannel>/* clientId */> groupClientIdChannelMap = new
ConcurrentHashMap<>();
+ protected final ConcurrentMap<String, GrpcClientChannel>
clientIdChannelMap = new ConcurrentHashMap<>();
protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
protected final ConcurrentMap<String /* nonce */, ResultFuture>
resultNonceFutureMap = new ConcurrentHashMap<>();
@@ -58,35 +56,17 @@ public class GrpcChannelManager implements StartAndShutdown
{
);
}
- public GrpcClientChannel createChannel(ProxyContext ctx, String group,
String clientId) {
- this.groupClientIdChannelMap.compute(group, (groupKey, clientIdMap) ->
{
- if (clientIdMap == null) {
- clientIdMap = new ConcurrentHashMap<>();
- }
- clientIdMap.computeIfAbsent(clientId, clientIdKey -> new
GrpcClientChannel(proxyRelayService, this, ctx, group, clientId));
- return clientIdMap;
- });
- return getChannel(group, clientId);
+ public GrpcClientChannel createChannel(ProxyContext ctx, String clientId) {
+ return this.clientIdChannelMap.computeIfAbsent(clientId,
+ k -> new GrpcClientChannel(proxyRelayService, this, ctx,
clientId));
}
- public GrpcClientChannel getChannel(String group, String clientId) {
- Map<String, GrpcClientChannel> clientIdChannelMap =
this.groupClientIdChannelMap.get(group);
- if (clientIdChannelMap == null) {
- return null;
- }
+ public GrpcClientChannel getChannel(String clientId) {
return clientIdChannelMap.get(clientId);
}
- public GrpcClientChannel removeChannel(String group, String clientId) {
- AtomicReference<GrpcClientChannel> channelRef = new
AtomicReference<>();
- this.groupClientIdChannelMap.computeIfPresent(group, (groupKey,
clientIdMap) -> {
- channelRef.set(clientIdMap.remove(clientId));
- if (clientIdMap.isEmpty()) {
- return null;
- }
- return clientIdMap;
- });
- return channelRef.get();
+ public GrpcClientChannel removeChannel(String clientId) {
+ return this.clientIdChannelMap.remove(clientId);
}
public <T> String addResponseFuture(CompletableFuture<ProxyRelayResult<T>>
responseFuture) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index 810534bd2..6459f8977 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -47,33 +47,27 @@ import
org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class GrpcClientChannel extends ProxyChannel {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- protected static final String SEPARATOR = "@";
private final GrpcChannelManager grpcChannelManager;
private final AtomicReference<StreamObserver<TelemetryCommand>>
telemetryCommandRef = new AtomicReference<>();
private final Object telemetryWriteLock = new Object();
- private final String group;
private final String clientId;
public GrpcClientChannel(ProxyRelayService proxyRelayService,
GrpcChannelManager grpcChannelManager,
- ProxyContext ctx,
- String group, String clientId) {
- super(proxyRelayService, null, new GrpcChannelId(group, clientId),
+ ProxyContext ctx, String clientId) {
+ super(proxyRelayService, null, new GrpcChannelId(clientId),
ctx.getRemoteAddress(),
ctx.getLocalAddress());
this.grpcChannelManager = grpcChannelManager;
- this.group = group;
this.clientId = clientId;
}
protected static class GrpcChannelId implements ChannelId {
- private final String group;
private final String clientId;
- public GrpcChannelId(String group, String clientId) {
- this.group = group;
+ public GrpcChannelId(String clientId) {
this.clientId = clientId;
}
@@ -84,7 +78,7 @@ public class GrpcClientChannel extends ProxyChannel {
@Override
public String asLongText() {
- return this.group + SEPARATOR + this.clientId;
+ return this.clientId;
}
@Override
@@ -95,7 +89,6 @@ public class GrpcClientChannel extends ProxyChannel {
if (o instanceof GrpcChannelId) {
GrpcChannelId other = (GrpcChannelId) o;
return ComparisonChain.start()
- .compare(this.group, other.group)
.compare(this.clientId, other.clientId)
.result();
}
@@ -184,10 +177,6 @@ public class GrpcClientChannel extends ProxyChannel {
return CompletableFuture.completedFuture(null);
}
- public String getGroup() {
- return group;
- }
-
public String getClientId() {
return clientId;
}
@@ -224,7 +213,6 @@ public class GrpcClientChannel extends ProxyChannel {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("group", group)
.add("clientId", clientId)
.add("remoteAddress", getRemoteAddress())
.add("localAddress", getLocalAddress())
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 2192014b5..8bef2c852 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -140,8 +140,7 @@ public class ClientActivity extends AbstractMessingActivity
{
case PRODUCER:
for (Resource topic :
clientSettings.getPublishing().getTopicsList()) {
String topicName =
GrpcConverter.getInstance().wrapResourceWithNamespace(topic);
- // user topic name as producer group
- GrpcClientChannel channel =
this.grpcChannelManager.removeChannel(topicName, clientId);
+ GrpcClientChannel channel =
this.grpcChannelManager.removeChannel(clientId);
if (channel != null) {
ClientChannelInfo clientChannelInfo = new
ClientChannelInfo(channel, clientId, languageCode,
MQVersion.Version.V5_0_0.ordinal());
this.messagingProcessor.unRegisterProducer(ctx,
topicName, clientChannelInfo);
@@ -152,7 +151,7 @@ public class ClientActivity extends AbstractMessingActivity
{
case SIMPLE_CONSUMER:
validateConsumerGroup(request.getGroup());
String consumerGroup =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
- GrpcClientChannel channel =
this.grpcChannelManager.removeChannel(consumerGroup, clientId);
+ GrpcClientChannel channel =
this.grpcChannelManager.removeChannel(clientId);
if (channel != null) {
ClientChannelInfo clientChannelInfo = new
ClientChannelInfo(channel, clientId, languageCode,
MQVersion.Version.V5_0_0.ordinal());
this.messagingProcessor.unRegisterConsumer(ctx,
consumerGroup, clientChannelInfo);
@@ -281,7 +280,7 @@ public class ClientActivity extends AbstractMessingActivity
{
String clientId = ctx.getClientID();
LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage());
- GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx,
topicName, clientId);
+ GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx,
clientId);
// use topic name as producer group
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel,
clientId, languageCode, parseClientVersion(ctx.getClientVersion()));
this.messagingProcessor.registerProducer(ctx, topicName,
clientChannelInfo);
@@ -296,7 +295,7 @@ public class ClientActivity extends AbstractMessingActivity
{
String clientId = ctx.getClientID();
LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage());
- GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx,
consumerGroup, clientId);
+ GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx,
clientId);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel,
clientId, languageCode, parseClientVersion(ctx.getClientVersion()));
this.messagingProcessor.registerConsumer(
@@ -420,7 +419,7 @@ public class ClientActivity extends AbstractMessingActivity
{
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo = (ClientChannelInfo)
args[0];
- grpcChannelManager.removeChannel(group,
clientChannelInfo.getClientId());
+
grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
}
}
@@ -437,7 +436,7 @@ public class ClientActivity extends AbstractMessingActivity
{
@Override
public void handle(ProducerGroupEvent event, String group,
ClientChannelInfo clientChannelInfo) {
if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
- grpcChannelManager.removeChannel(group,
clientChannelInfo.getClientId());
+
grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index fa2241cb3..fb31a6062 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -98,7 +98,7 @@ public class AckMessageActivity extends
AbstractMessingActivity {
String handleString = ackMessageEntry.getReceiptHandle();
String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
- MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group,
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+ MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
if (messageReceiptHandle != null) {
handleString = messageReceiptHandle.getReceiptHandleStr();
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index 680364072..0f33cc7aa 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -55,7 +55,7 @@ public class ChangeInvisibleDurationActivity extends
AbstractMessingActivity {
ReceiptHandle receiptHandle =
ReceiptHandle.decode(request.getReceiptHandle());
String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
- MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group,
request.getMessageId(), receiptHandle.getReceiptHandle());
+ MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, request.getMessageId(), receiptHandle.getReceiptHandle());
if (messageReceiptHandle != null) {
receiptHandle =
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 49763dcdb..76fca3bba 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -124,7 +124,7 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic,
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
messageExt.getQueueOffset(),
messageExt.getReconsumeTimes());
-
receiptHandleProcessor.addReceiptHandle(ctx.getClientID(), group,
messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index dec52f3c2..789927d69 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -48,7 +48,7 @@ public class ForwardMessageToDLQActivity extends
AbstractMessingActivity {
String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
String handleString = request.getReceiptHandle();
- MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group,
request.getMessageId(), request.getReceiptHandle());
+ MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, request.getMessageId(), request.getReceiptHandle());
if (messageReceiptHandle != null) {
handleString = messageReceiptHandle.getReceiptHandleStr();
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
index 922528982..26d13ae18 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
@@ -88,9 +88,9 @@ public class ClientProcessor extends AbstractProcessor {
public ClientChannelInfo findConsumerChannel(
ProxyContext ctx,
String consumerGroup,
- String clientId
+ Channel channel
) {
- return
this.serviceManager.getConsumerManager().findChannel(consumerGroup, clientId);
+ return
this.serviceManager.getConsumerManager().findChannel(consumerGroup, channel);
}
public void unRegisterConsumer(
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 5234237a2..00ef17f9a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -246,8 +246,8 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
}
@Override
- public ClientChannelInfo findConsumerChannel(ProxyContext ctx, String
consumerGroup, String clientId) {
- return this.clientProcessor.findConsumerChannel(ctx, consumerGroup,
clientId);
+ public ClientChannelInfo findConsumerChannel(ProxyContext ctx, String
consumerGroup, Channel channel) {
+ return this.clientProcessor.findConsumerChannel(ctx, consumerGroup,
channel);
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index e0ae71471..f366d1357 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -266,7 +266,7 @@ public interface MessagingProcessor extends
StartAndShutdown {
ClientChannelInfo findConsumerChannel(
ProxyContext ctx,
String consumerGroup,
- String clientId
+ Channel channel
);
void unRegisterConsumer(
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 15c4385fd..d1186cc87 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.proxy.processor;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Stopwatch;
+import io.netty.channel.Channel;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -104,7 +105,7 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
}
if (args[0] instanceof ClientChannelInfo) {
ClientChannelInfo clientChannelInfo =
(ClientChannelInfo) args[0];
- clearGroup(new
ReceiptHandleGroupKey(clientChannelInfo.getClientId(), group));
+ clearGroup(new
ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
}
}
}
@@ -227,12 +228,12 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
}
protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
- return
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"),
groupKey.group, groupKey.clientId) == null;
+ return
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"),
groupKey.group, groupKey.channel) == null;
}
- public void addReceiptHandle(String clientID, String group, String msgID,
String receiptHandle,
+ public void addReceiptHandle(Channel channel, String group, String msgID,
String receiptHandle,
MessageReceiptHandle messageReceiptHandle) {
- this.addReceiptHandle(new ReceiptHandleGroupKey(clientID, group),
msgID, receiptHandle, messageReceiptHandle);
+ this.addReceiptHandle(new ReceiptHandleGroupKey(channel, group),
msgID, receiptHandle, messageReceiptHandle);
}
protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID,
String receiptHandle,
@@ -244,8 +245,8 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
k -> new ReceiptHandleGroup()).put(msgID, receiptHandle,
messageReceiptHandle);
}
- public MessageReceiptHandle removeReceiptHandle(String clientID, String
group, String msgID, String receiptHandle) {
- return this.removeReceiptHandle(new ReceiptHandleGroupKey(clientID,
group), msgID, receiptHandle);
+ public MessageReceiptHandle removeReceiptHandle(Channel channel, String
group, String msgID, String receiptHandle) {
+ return this.removeReceiptHandle(new ReceiptHandleGroupKey(channel,
group), msgID, receiptHandle);
}
protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey
key, String msgID, String receiptHandle) {
@@ -299,22 +300,26 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
}
public static class ReceiptHandleGroupKey {
- private final String clientId;
- private final String group;
+ protected final Channel channel;
+ protected final String group;
- public ReceiptHandleGroupKey(String clientId, String group) {
- this.clientId = clientId;
+ public ReceiptHandleGroupKey(Channel channel, String group) {
+ this.channel = channel;
this.group = group;
}
- public String getClientId() {
- return clientId;
+ protected String getChannelId() {
+ return channel.id().asLongText();
}
public String getGroup() {
return group;
}
+ public Channel getChannel() {
+ return channel;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -324,18 +329,18 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
return false;
}
ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
- return Objects.equal(clientId, key.clientId) &&
Objects.equal(group, key.group);
+ return Objects.equal(getChannelId(), key.getChannelId()) &&
Objects.equal(group, key.group);
}
@Override
public int hashCode() {
- return Objects.hashCode(clientId, group);
+ return Objects.hashCode(getChannelId(), group);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("clientId", clientId)
+ .add("channelId", getChannelId())
.add("group", group)
.toString();
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
index 6aca7ac6f..8bdff6566 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
@@ -33,6 +33,7 @@ import org.junit.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -65,6 +66,84 @@ public class ReceiptHandleGroupTest extends
InitConfigAndLoggerTest {
.build().encode();
}
+ @Test
+ public void testGetWhenComputeIfPresent() {
+ String handle1 = createHandle();
+ String handle2 = createHandle();
+ AtomicReference<MessageReceiptHandle> getHandleRef = new
AtomicReference<>();
+
+ receiptHandleGroup.put(msgID, handle1,
createMessageReceiptHandle(handle1, msgID));
+ CountDownLatch latch = new CountDownLatch(2);
+ Thread getThread = new Thread(() -> {
+ try {
+ latch.countDown();
+ latch.await();
+ getHandleRef.set(receiptHandleGroup.get(msgID, handle1));
+ } catch (Exception ignored) {
+ }
+ }, "getThread");
+ Thread computeThread = new Thread(() -> {
+ try {
+ receiptHandleGroup.computeIfPresent(msgID, handle1,
messageReceiptHandle -> {
+ try {
+ latch.countDown();
+ latch.await();
+ } catch (Exception ignored) {
+ }
+ messageReceiptHandle.updateReceiptHandle(handle2);
+ return
FutureUtils.addExecutor(CompletableFuture.completedFuture(messageReceiptHandle),
Executors.newCachedThreadPool());
+ });
+ } catch (Exception ignored) {
+ }
+ }, "computeThread");
+ getThread.start();
+ computeThread.start();
+
+ await().atMost(Duration.ofSeconds(1)).until(() -> getHandleRef.get()
!= null);
+ assertEquals(handle2, getHandleRef.get().getReceiptHandleStr());
+ assertFalse(receiptHandleGroup.isEmpty());
+ }
+
+ @Test
+ public void testGetWhenComputeIfPresentReturnNull() {
+ String handle1 = createHandle();
+ AtomicBoolean getCalled = new AtomicBoolean(false);
+ AtomicReference<MessageReceiptHandle> getHandleRef = new
AtomicReference<>();
+
+ receiptHandleGroup.put(msgID, handle1,
createMessageReceiptHandle(handle1, msgID));
+ CountDownLatch latch = new CountDownLatch(2);
+ Thread getThread = new Thread(() -> {
+ try {
+ latch.countDown();
+ latch.await();
+ getHandleRef.set(receiptHandleGroup.get(msgID, handle1));
+ getCalled.set(true);
+ } catch (Exception ignored) {
+ }
+ }, "getThread");
+ Thread computeThread = new Thread(() -> {
+ try {
+ receiptHandleGroup.computeIfPresent(msgID, handle1,
messageReceiptHandle -> {
+ try {
+ latch.countDown();
+ latch.await();
+ } catch (Exception ignored) {
+ }
+ return
FutureUtils.addExecutor(CompletableFuture.completedFuture(null),
Executors.newCachedThreadPool());
+ });
+ } catch (Exception ignored) {
+ }
+ }, "computeThread");
+ getThread.start();
+ computeThread.start();
+
+ await().atMost(Duration.ofSeconds(1)).until(getCalled::get);
+ assertNull(getHandleRef.get());
+ assertTrue(receiptHandleGroup.isEmpty());
+ }
+
+
+
@Test
public void testRemoveWhenComputeIfPresent() {
String handle1 = createHandle();
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index ea045774f..bfff5ee59 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -208,7 +208,6 @@ public class ClientActivityTest extends BaseActivityTest {
GrpcClientChannel channel = (GrpcClientChannel)
clientChannelInfo.getChannel();
assertEquals(REMOTE_ADDR, channel.getRemoteAddress());
assertEquals(LOCAL_ADDR, channel.getLocalAddress());
- assertEquals(group, channel.getGroup());
}
@Test
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
index 012649465..a861e8c13 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
@@ -92,7 +92,7 @@ public class ChangeInvisibleDurationActivityTest extends
BaseActivityTest {
when(this.messagingProcessor.changeInvisibleTime(
any(), receiptHandleCaptor.capture(), anyString(), anyString(),
anyString(), invisibleTimeArgumentCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(ackResult));
- when(receiptHandleProcessor.removeReceiptHandle(anyString(),
anyString(), anyString(), anyString()))
+ when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(),
anyString(), anyString()))
.thenReturn(new MessageReceiptHandle("group", "topic", 0,
savedHandleStr, "msgId", 0, 0));
ChangeInvisibleDurationResponse response =
this.changeInvisibleDurationActivity.changeInvisibleDuration(
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
index cd3c48e1a..5603d6958 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
@@ -75,7 +75,7 @@ public class ForwardMessageToDLQActivityTest extends
BaseActivityTest {
.thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
"")));
String savedHandleStr = buildReceiptHandle("topic",
System.currentTimeMillis(),3000);
- when(receiptHandleProcessor.removeReceiptHandle(anyString(),
anyString(), anyString(), anyString()))
+ when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(),
anyString(), anyString()))
.thenReturn(new MessageReceiptHandle("group", "topic", 0,
savedHandleStr, "msgId", 0, 0));
ForwardMessageToDeadLetterQueueResponse response =
this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue(
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 93ab0210c..cc294e859 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -17,6 +17,19 @@
package org.apache.rocketmq.proxy.processor;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -82,6 +95,7 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
.commitLogOffset(0L)
.build().encode();
PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
+ PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new MockChannel());
receiptHandleProcessor = new
ReceiptHandleProcessor(messagingProcessor);
Mockito.doNothing().when(messagingProcessor).registerConsumerListener(Mockito.any(ConsumerIdsChangeListener.class));
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
@@ -90,10 +104,10 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testAddReceiptHandle() {
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(new
SubscriptionGroupConfig());
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class),
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
@@ -103,11 +117,11 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testRenewReceiptHandle() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
long newInvisibleTime = 2000L;
ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
.startOffset(0L)
@@ -140,9 +154,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testRenewExceedMaxRenewTimes() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new MQClientException(0,
"error"));
@@ -166,9 +180,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testRenewWithInvalidHandle() {
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -190,9 +204,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testRenewWithErrorThenOK() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
AtomicInteger count = new AtomicInteger(0);
List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -261,9 +275,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
.build().encode();
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(),
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyLong()))
@@ -293,9 +307,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
.build().encode();
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(null);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(),
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyLong()))
.thenReturn(CompletableFuture.completedFuture(new AckResult()));
@@ -329,11 +343,11 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
.build().encode();
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
- Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
.changeInvisibleTime(Mockito.any(ProxyContext.class),
Mockito.any(ReceiptHandle.class), Mockito.anyString(),
@@ -342,9 +356,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testRemoveReceiptHandle() {
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
- receiptHandleProcessor.removeReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.removeReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleProcessor.scheduleRenewTask();
@@ -355,9 +369,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testClearGroup() {
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
- receiptHandleProcessor.clearGroup(new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channelId, GROUP));
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.clearGroup(new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleProcessor.scheduleRenewTask();
@@ -370,9 +384,241 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
public void testClientOffline() {
ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor =
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
Mockito.verify(messagingProcessor,
Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture());
- String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
- receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
-
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER,
GROUP, new ClientChannelInfo(null, channelId, LanguageCode.JAVA, 0));
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER,
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty());
}
+
+ class MockChannel implements Channel {
+ @Override
+ public ChannelId id() {
+ return new ChannelId() {
+ @Override
+ public String asShortText() {
+ return "short";
+ }
+
+ @Override
+ public String asLongText() {
+ return "long";
+ }
+
+ @Override
+ public int compareTo(ChannelId o) {
+ return 1;
+ }
+ };
+ }
+
+ @Override
+ public EventLoop eventLoop() {
+ return null;
+ }
+
+ @Override
+ public Channel parent() {
+ return null;
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return false;
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture closeFuture() {
+ return null;
+ }
+
+ @Override
+ public boolean isWritable() {
+ return false;
+ }
+
+ @Override
+ public long bytesBeforeUnwritable() {
+ return 0;
+ }
+
+ @Override
+ public long bytesBeforeWritable() {
+ return 0;
+ }
+
+ @Override
+ public Unsafe unsafe() {
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress, ChannelPromise
promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public Channel read() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public Channel flush() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise)
{
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ return null;
+ }
+
+ @Override
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+ return null;
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+ return false;
+ }
+
+ @Override
+ public int compareTo(Channel o) {
+ return 1;
+ }
+ }
}
\ No newline at end of file
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
index c365aa9d0..0b5542fbe 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -75,7 +75,7 @@ public class ProxyClientRemotingProcessorTest {
proxyRelayResultFuture));
GrpcClientChannel grpcClientChannel = new
GrpcClientChannel(proxyRelayService, null,
-
ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"),
"group", "clientId");
+
ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"),
"clientId");
when(producerManager.getAvailableChannel(anyString()))
.thenReturn(grpcClientChannel);