This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d2b818d993 Revert "[ISSUE #7707] Refector Context with link node
implementation (#7708)" (#7742)
d2b818d993 is described below
commit d2b818d99366ad3ef8ba83d77780b7d15c318d13
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Jan 12 10:24:07 2024 +0800
Revert "[ISSUE #7707] Refector Context with link node implementation
(#7708)" (#7742)
This reverts commit 3f99b1e96bedb0dc6854c92b2f753cdf9fa68197.
---
.../common/{context => }/ContextVariable.java | 2 +-
.../apache/rocketmq/proxy/common/ProxyContext.java | 97 +++++++++++-----------
.../rocketmq/proxy/common/context/ContextNode.java | 55 ------------
.../proxy/grpc/v2/GrpcMessagingApplication.java | 16 ++--
.../proxy/processor/ReceiptHandleProcessor.java | 2 +-
.../activity/AbstractRemotingActivity.java | 36 ++++----
.../rocketmq/proxy/service/relay/ProxyChannel.java | 4 +-
.../org/apache/rocketmq/proxy/ContextNodeTest.java | 69 ---------------
.../rocketmq/proxy/common/ProxyContextTest.java | 48 -----------
.../rocketmq/proxy/grpc/v2/BaseActivityTest.java | 12 +--
.../grpc/v2/channel/GrpcClientChannelTest.java | 2 +-
.../v2/common/GrpcClientSettingsManagerTest.java | 6 +-
.../v2/consumer/ReceiveMessageActivityTest.java | 12 +--
.../service/message/LocalMessageServiceTest.java | 6 +-
.../mqclient/ProxyClientRemotingProcessorTest.java | 2 +-
.../receipt/DefaultReceiptHandleManagerTest.java | 60 ++++++-------
.../service/sysmessage/HeartbeatSyncerTest.java | 4 +-
17 files changed, 129 insertions(+), 304 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
similarity index 96%
rename from
proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java
rename to
proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
index 727f4c9bbc..0760826de7 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextVariable.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.proxy.common.context;
+package org.apache.rocketmq.proxy.common;
public class ContextVariable {
public static final String REMOTE_ADDRESS = "remote-address";
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
index 3e602d5ad0..77a6791f04 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
@@ -18,118 +18,117 @@
package org.apache.rocketmq.proxy.common;
import io.netty.channel.Channel;
-import org.apache.rocketmq.proxy.common.context.ContextNode;
-import org.apache.rocketmq.proxy.common.context.ContextVariable;
+import java.util.HashMap;
+import java.util.Map;
public class ProxyContext {
public static final String INNER_ACTION_PREFIX = "Inner";
- private final ContextNode contextNode;
-
- ProxyContext() {
- this.contextNode = new ContextNode();
- }
-
- ProxyContext(ContextNode parent) {
- this.contextNode = parent;
- }
-
- ProxyContext(ProxyContext that) {
- this.contextNode = that.contextNode;
- }
+ private final Map<String, Object> value = new HashMap<>();
public static ProxyContext create() {
return new ProxyContext();
}
public static ProxyContext createForInner(String actionName) {
- return create().withAction(INNER_ACTION_PREFIX + actionName);
+ return create().setAction(INNER_ACTION_PREFIX + actionName);
}
public static ProxyContext createForInner(Class<?> clazz) {
return createForInner(clazz.getSimpleName());
}
- public ProxyContext withValue(String key, Object val) {
- return new ProxyContext(contextNode.withValue(key, val));
+ public Map<String, Object> getValue() {
+ return this.value;
}
- public <T> T getValue(String key) {
- return (T) contextNode.getValue(key);
+ public ProxyContext withVal(String key, Object val) {
+ this.value.put(key, val);
+ return this;
}
- public <T> T getValue(String key, Class<T> classType) {
- return (T) contextNode.getValue(key, classType);
+ public <T> T getVal(String key) {
+ return (T) this.value.get(key);
}
- public ProxyContext withLocalAddress(String localAddress) {
- return this.withValue(ContextVariable.LOCAL_ADDRESS, localAddress);
+ public ProxyContext setLocalAddress(String localAddress) {
+ this.withVal(ContextVariable.LOCAL_ADDRESS, localAddress);
+ return this;
}
public String getLocalAddress() {
- return contextNode.getValue(ContextVariable.LOCAL_ADDRESS,
String.class);
+ return this.getVal(ContextVariable.LOCAL_ADDRESS);
}
- public ProxyContext withRemoteAddress(String remoteAddress) {
- return this.withValue(ContextVariable.REMOTE_ADDRESS, remoteAddress);
+ public ProxyContext setRemoteAddress(String remoteAddress) {
+ this.withVal(ContextVariable.REMOTE_ADDRESS, remoteAddress);
+ return this;
}
public String getRemoteAddress() {
- return contextNode.getValue(ContextVariable.REMOTE_ADDRESS,
String.class);
+ return this.getVal(ContextVariable.REMOTE_ADDRESS);
}
- public ProxyContext withClientID(String clientID) {
- return this.withValue(ContextVariable.CLIENT_ID, clientID);
+ public ProxyContext setClientID(String clientID) {
+ this.withVal(ContextVariable.CLIENT_ID, clientID);
+ return this;
}
public String getClientID() {
- return contextNode.getValue(ContextVariable.CLIENT_ID, String.class);
+ return this.getVal(ContextVariable.CLIENT_ID);
}
- public ProxyContext withChannel(Channel channel) {
- return this.withValue(ContextVariable.CHANNEL, channel);
+ public ProxyContext setChannel(Channel channel) {
+ this.withVal(ContextVariable.CHANNEL, channel);
+ return this;
}
public Channel getChannel() {
- return contextNode.getValue(ContextVariable.CHANNEL, Channel.class);
+ return this.getVal(ContextVariable.CHANNEL);
}
- public ProxyContext withLanguage(String language) {
- return this.withValue(ContextVariable.LANGUAGE, language);
+ public ProxyContext setLanguage(String language) {
+ this.withVal(ContextVariable.LANGUAGE, language);
+ return this;
}
public String getLanguage() {
- return contextNode.getValue(ContextVariable.LANGUAGE, String.class);
+ return this.getVal(ContextVariable.LANGUAGE);
}
- public ProxyContext withClientVersion(String clientVersion) {
- return this.withValue(ContextVariable.CLIENT_VERSION, clientVersion);
+ public ProxyContext setClientVersion(String clientVersion) {
+ this.withVal(ContextVariable.CLIENT_VERSION, clientVersion);
+ return this;
}
public String getClientVersion() {
- return contextNode.getValue(ContextVariable.CLIENT_VERSION,
String.class);
+ return this.getVal(ContextVariable.CLIENT_VERSION);
}
- public ProxyContext withRemainingMs(Long remainingMs) {
- return this.withValue(ContextVariable.REMAINING_MS, remainingMs);
+ public ProxyContext setRemainingMs(Long remainingMs) {
+ this.withVal(ContextVariable.REMAINING_MS, remainingMs);
+ return this;
}
public Long getRemainingMs() {
- return contextNode.getValue(ContextVariable.REMAINING_MS, Long.class);
+ return this.getVal(ContextVariable.REMAINING_MS);
}
- public ProxyContext withAction(String action) {
- return this.withValue(ContextVariable.ACTION, action);
+ public ProxyContext setAction(String action) {
+ this.withVal(ContextVariable.ACTION, action);
+ return this;
}
public String getAction() {
- return contextNode.getValue(ContextVariable.ACTION, String.class);
+ return this.getVal(ContextVariable.ACTION);
}
- public ProxyContext withProtocolType(String protocol) {
- return this.withValue(ContextVariable.PROTOCOL_TYPE, protocol);
+ public ProxyContext setProtocolType(String protocol) {
+ this.withVal(ContextVariable.PROTOCOL_TYPE, protocol);
+ return this;
}
public String getProtocolType() {
- return contextNode.getValue(ContextVariable.PROTOCOL_TYPE,
String.class);
+ return this.getVal(ContextVariable.PROTOCOL_TYPE);
}
+
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java
deleted file mode 100644
index 7b418516b0..0000000000
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/context/ContextNode.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.proxy.common.context;
-
-public class ContextNode {
- private final String key;
- private final Object value;
- private final ContextNode parent;
-
- public ContextNode() {
- this(null, null, null);
- }
-
- public ContextNode(ContextNode parent, String key, Object value) {
- this.parent = parent;
- this.key = key;
- this.value = value;
- }
-
- public ContextNode withValue(String key, Object value) {
- return new ContextNode(this, key, value);
- }
-
- public Object getValue(String key) {
- for (ContextNode current = this; current != null; current =
current.parent) {
- if (key.equals(current.key)) {
- return current.value;
- }
- }
- return null;
- }
-
- public <T> T getValue(String key, Class<T> classType) {
- Object value = getValue(key);
- if (classType.isInstance(value)) {
- return classType.cast(value);
- }
- return null;
- }
-}
\ No newline at end of file
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 3cd664ec55..2cb395ad60 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -169,15 +169,15 @@ public class GrpcMessagingApplication extends
MessagingServiceGrpc.MessagingServ
Context ctx = Context.current();
Metadata headers = InterceptorConstants.METADATA.get(ctx);
ProxyContext context = ProxyContext.create()
- .withLocalAddress(getDefaultStringMetadataInfo(headers,
InterceptorConstants.LOCAL_ADDRESS))
- .withRemoteAddress(getDefaultStringMetadataInfo(headers,
InterceptorConstants.REMOTE_ADDRESS))
- .withClientID(getDefaultStringMetadataInfo(headers,
InterceptorConstants.CLIENT_ID))
- .withProtocolType(ChannelProtocolType.GRPC_V2.getName())
- .withLanguage(getDefaultStringMetadataInfo(headers,
InterceptorConstants.LANGUAGE))
- .withClientVersion(getDefaultStringMetadataInfo(headers,
InterceptorConstants.CLIENT_VERSION))
- .withAction(getDefaultStringMetadataInfo(headers,
InterceptorConstants.SIMPLE_RPC_NAME));
+ .setLocalAddress(getDefaultStringMetadataInfo(headers,
InterceptorConstants.LOCAL_ADDRESS))
+ .setRemoteAddress(getDefaultStringMetadataInfo(headers,
InterceptorConstants.REMOTE_ADDRESS))
+ .setClientID(getDefaultStringMetadataInfo(headers,
InterceptorConstants.CLIENT_ID))
+ .setProtocolType(ChannelProtocolType.GRPC_V2.getName())
+ .setLanguage(getDefaultStringMetadataInfo(headers,
InterceptorConstants.LANGUAGE))
+ .setClientVersion(getDefaultStringMetadataInfo(headers,
InterceptorConstants.CLIENT_VERSION))
+ .setAction(getDefaultStringMetadataInfo(headers,
InterceptorConstants.SIMPLE_RPC_NAME));
if (ctx.getDeadline() != null) {
- context =
context.withRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS));
+
context.setRemainingMs(ctx.getDeadline().timeRemaining(TimeUnit.MILLISECONDS));
}
return context;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 71ebfe8af1..5e1be93218 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -37,7 +37,7 @@ public class ReceiptHandleProcessor extends AbstractProcessor
{
super(messagingProcessor, serviceManager);
StateEventListener<RenewEvent> eventListener = event -> {
ProxyContext context = createContext(event.getEventType().name())
- .withChannel(event.getKey().getChannel());
+ .setChannel(event.getKey().getChannel());
MessageReceiptHandle messageReceiptHandle =
event.getMessageReceiptHandle();
ReceiptHandle handle =
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
messagingProcessor.changeInvisibleTime(context, handle,
messageReceiptHandle.getMessageId(),
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index 73779eaaf0..ce4a633976 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -19,8 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -42,11 +40,14 @@ import
org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
public abstract class AbstractRemotingActivity implements
NettyRequestProcessor {
protected final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final MessagingProcessor messagingProcessor;
@@ -123,23 +124,18 @@ public abstract class AbstractRemotingActivity implements
NettyRequestProcessor
protected ProxyContext createContext(ChannelHandlerContext ctx,
RemotingCommand request) {
ProxyContext context = ProxyContext.create();
Channel channel = ctx.channel();
- LanguageCode languageCode =
RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY, channel);
- String clientId =
RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY, channel);
- Integer version =
RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY, channel);
- context =
context.withAction(RemotingHelper.getRequestCodeDesc(request.getCode()))
- .withProtocolType(ChannelProtocolType.REMOTING.getName())
- .withChannel(channel)
-
.withLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
-
.withRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- if (languageCode != null) {
- context = context.withLanguage(languageCode.name());
- }
- if (clientId != null) {
- context = context.withClientID(clientId);
- }
- if (version != null) {
- context =
context.withClientVersion(MQVersion.getVersionDesc(version));
- }
+ context.setAction(RemotingHelper.getRequestCodeDesc(request.getCode()))
+ .setProtocolType(ChannelProtocolType.REMOTING.getName())
+ .setChannel(channel)
+
.setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
+
.setRemoteAddress(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.LANGUAGE_CODE_KEY,
channel))
+ .ifPresent(language -> context.setLanguage(language.name()));
+
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.CLIENT_ID_KEY,
channel))
+ .ifPresent(context::setClientID);
+
Optional.ofNullable(RemotingHelper.getAttributeValue(AttributeKeys.VERSION_KEY,
channel))
+ .ifPresent(version ->
context.setClientVersion(MQVersion.getVersionDesc(version)));
return context;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java
index 277b8f1586..5a1185a81e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/ProxyChannel.java
@@ -77,8 +77,8 @@ public abstract class ProxyChannel extends SimpleChannel {
try {
if (msg instanceof RemotingCommand) {
ProxyContext context =
ProxyContext.createForInner(this.getClass())
- .withRemoteAddress(remoteAddress)
- .withLocalAddress(localAddress);
+ .setRemoteAddress(remoteAddress)
+ .setLocalAddress(localAddress);
RemotingCommand command = (RemotingCommand) msg;
if (command.getExtFields() == null) {
command.setExtFields(new HashMap<>());
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java
deleted file mode 100644
index 19cf179c3d..0000000000
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/ContextNodeTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.proxy;
-
-import org.apache.rocketmq.proxy.common.context.ContextNode;
-import org.junit.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class ContextNodeTest {
- private ContextNode contextNode;
-
- @Test
- public void testWithValue() {
- String key = "key";
- String value = "value";
- contextNode = new ContextNode();
- ContextNode newContextNode = contextNode.withValue(key, value);
- assertThat(newContextNode.getValue(key,
String.class)).isEqualTo(value);
- assertThat(newContextNode.getValue(key)).isEqualTo(value);
-
- assertThat(contextNode.getValue(key, String.class)).isNull();
- }
-
- @Test
- public void testRepeatedKeyForTwoContext() {
- String key1 = "key1";
- String value1 = "value1";
- String value2 = "value2";
- contextNode = new ContextNode();
- ContextNode newContextNode1 = contextNode.withValue(key1, value1);
- ContextNode newContextNode2 = contextNode.withValue(key1, value2);
- assertThat(newContextNode1.getValue(key1,
String.class)).isEqualTo(value1);
- assertThat(newContextNode1.getValue(key1)).isEqualTo(value1);
- assertThat(newContextNode2.getValue(key1,
String.class)).isEqualTo(value2);
- assertThat(newContextNode2.getValue(key1)).isEqualTo(value2);
-
- assertThat(contextNode.getValue(key1, String.class)).isNull();
- }
-
- @Test
- public void testRepeatedKeyForContextChain() {
- String key1 = "key1";
- String value1 = "value1";
- String value2 = "value2";
- contextNode = new ContextNode();
- ContextNode newContextNode1 = contextNode.withValue(key1, value1);
- ContextNode newContextNode2 = newContextNode1.withValue(key1, value2);
- assertThat(newContextNode1.getValue(key1,
String.class)).isEqualTo(value1);
- assertThat(newContextNode2.getValue(key1,
String.class)).isEqualTo(value2);
-
- assertThat(contextNode.getValue(key1, String.class)).isNull();
- }
-}
\ No newline at end of file
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java
deleted file mode 100644
index 0999440cde..0000000000
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ProxyContextTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.proxy.common;
-
-import org.junit.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class ProxyContextTest {
- private ProxyContext proxyContext;
-
- @Test
- public void testWithValue() {
- String key = "key";
- String value = "value";
- proxyContext = ProxyContext.create();
- ProxyContext newContext = proxyContext.withValue(key, value);
- assertThat(newContext.getValue(key, String.class)).isEqualTo(value);
- String actualValue = newContext.getValue(key);
- assertThat(actualValue).isEqualTo(value);
-
- assertThat(proxyContext.getValue(key, String.class)).isNull();
- }
-
- @Test
- public void testSetLocalAddress() {
- String address = "address";
- proxyContext = ProxyContext.create();
- ProxyContext newProxyContext = proxyContext.withLocalAddress(address);
- assertThat(proxyContext.getLocalAddress()).isNull();
- assertThat(newProxyContext.getLocalAddress()).isEqualTo(address);
- }
-}
\ No newline at end of file
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java
index f29d59fe4c..524945bd6f 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/BaseActivityTest.java
@@ -21,7 +21,7 @@ import io.grpc.Metadata;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
-import org.apache.rocketmq.proxy.common.context.ContextVariable;
+import org.apache.rocketmq.proxy.common.ContextVariable;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.apache.rocketmq.proxy.grpc.interceptor.InterceptorConstants;
@@ -76,11 +76,11 @@ public class BaseActivityTest extends InitConfigTest {
protected ProxyContext createContext() {
return ProxyContext.create()
- .withValue(ContextVariable.CLIENT_ID, CLIENT_ID)
- .withValue(ContextVariable.LANGUAGE, JAVA)
- .withValue(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR)
- .withValue(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR)
- .withValue(ContextVariable.REMAINING_MS,
Duration.ofSeconds(10).toMillis());
+ .withVal(ContextVariable.CLIENT_ID, CLIENT_ID)
+ .withVal(ContextVariable.LANGUAGE, JAVA)
+ .withVal(ContextVariable.REMOTE_ADDRESS, REMOTE_ADDR)
+ .withVal(ContextVariable.LOCAL_ADDRESS, LOCAL_ADDR)
+ .withVal(ContextVariable.REMAINING_MS,
Duration.ofSeconds(10).toMillis());
}
protected static String buildReceiptHandle(String topic, long popTime,
long invisibleTime) {
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
index af5e3e10dc..1bdbdd9bef 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
@@ -58,7 +58,7 @@ public class GrpcClientChannelTest extends InitConfigTest {
super.before();
this.clientId = RandomStringUtils.randomAlphabetic(10);
this.grpcClientChannel = new GrpcClientChannel(proxyRelayService,
grpcClientSettingsManager, grpcChannelManager,
-
ProxyContext.create().withRemoteAddress("10.152.39.53:9768").withLocalAddress("11.193.0.1:1210"),
+
ProxyContext.create().setRemoteAddress("10.152.39.53:9768").setLocalAddress("11.193.0.1:1210"),
this.clientId);
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
index 3c3f5bf28f..6742f094c8 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcClientSettingsManagerTest.java
@@ -25,7 +25,7 @@ import apache.rocketmq.v2.RetryPolicy;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.Subscription;
import com.google.protobuf.util.Durations;
-import org.apache.rocketmq.proxy.common.context.ContextVariable;
+import org.apache.rocketmq.proxy.common.ContextVariable;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
import
org.apache.rocketmq.remoting.protocol.subscription.CustomizedRetryPolicy;
@@ -52,7 +52,7 @@ public class GrpcClientSettingsManagerTest extends
BaseActivityTest {
@Test
public void testGetProducerData() {
- ProxyContext context =
ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID);
+ ProxyContext context =
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
this.grpcClientSettingsManager.updateClientSettings(context,
CLIENT_ID, Settings.newBuilder()
.setBackoffPolicy(RetryPolicy.getDefaultInstance())
@@ -65,7 +65,7 @@ public class GrpcClientSettingsManagerTest extends
BaseActivityTest {
@Test
public void testGetSubscriptionData() {
- ProxyContext context =
ProxyContext.create().withValue(ContextVariable.CLIENT_ID, CLIENT_ID);
+ ProxyContext context =
ProxyContext.create().withVal(ContextVariable.CLIENT_ID, CLIENT_ID);
SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
when(this.messagingProcessor.getSubscriptionGroupConfig(any(), any()))
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 70460a9419..77ae5e4d11 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -94,8 +94,9 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
.thenReturn(CompletableFuture.completedFuture(new
PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
ProxyContext context = createContext();
+ context.setRemainingMs(1L);
this.receiveMessageActivity.receiveMessage(
- context.withRemainingMs(1L),
+ context,
ReceiveMessageRequest.newBuilder()
.setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
@@ -120,9 +121,9 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
- final ProxyContext context = createContext()
- .withClientVersion("5.0.2")
- .withRemainingMs(-1L);
+ final ProxyContext context = createContext();
+ context.setClientVersion("5.0.2");
+ context.setRemainingMs(-1L);
final ReceiveMessageRequest request =
ReceiveMessageRequest.newBuilder()
.setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
@@ -143,8 +144,9 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor1 =
ArgumentCaptor.forClass(ReceiveMessageResponse.class);
doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor1.capture());
+ context.setClientVersion("5.0.3");
this.receiveMessageActivity.receiveMessage(
- context.withClientVersion("5.0.3"),
+ context,
request,
receiveStreamObserver
);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
index e959244dec..3e3d37086b 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/LocalMessageServiceTest.java
@@ -46,7 +46,7 @@ import
org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.proxy.common.context.ContextVariable;
+import org.apache.rocketmq.proxy.common.ContextVariable;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
@@ -123,8 +123,8 @@ public class LocalMessageServiceTest extends InitConfigTest
{
Mockito.when(brokerControllerMock.getEndTransactionProcessor()).thenReturn(endTransactionProcessorMock);
Mockito.when(brokerControllerMock.getBrokerConfig()).thenReturn(new
BrokerConfig());
localMessageService = new LocalMessageService(brokerControllerMock,
channelManager, null);
- proxyContext =
ProxyContext.create().withValue(ContextVariable.REMOTE_ADDRESS, "0.0.0.1")
- .withValue(ContextVariable.LOCAL_ADDRESS, "0.0.0.2");
+ proxyContext =
ProxyContext.create().withVal(ContextVariable.REMOTE_ADDRESS, "0.0.0.1")
+ .withVal(ContextVariable.LOCAL_ADDRESS, "0.0.0.2");
}
@Test
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
index 7ebad93722..a6d807937e 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -79,7 +79,7 @@ public class ProxyClientRemotingProcessorTest {
proxyRelayResultFuture));
GrpcClientChannel grpcClientChannel = new
GrpcClientChannel(proxyRelayService, grpcClientSettingsManager, null,
-
ProxyContext.create().withRemoteAddress("127.0.0.1:8888").withLocalAddress("127.0.0.1:10911"),
"clientId");
+
ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"),
"clientId");
when(producerManager.getAvailableChannel(anyString()))
.thenReturn(grpcClientChannel);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index 86a529178f..25ae1509a9 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -35,7 +35,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.state.StateEventListener;
import org.apache.rocketmq.proxy.common.RenewEvent;
-import org.apache.rocketmq.proxy.common.context.ContextVariable;
+import org.apache.rocketmq.proxy.common.ContextVariable;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
@@ -71,7 +71,7 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Mock
protected ConsumerManager consumerManager;
- private static ProxyContext proxyContext = ProxyContext.create();
+ private static final ProxyContext PROXY_CONTEXT = ProxyContext.create();
private static final String GROUP = "group";
private static final String TOPIC = "topic";
private static final String BROKER_NAME = "broker";
@@ -92,7 +92,7 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
public void fireEvent(RenewEvent event) {
MessageReceiptHandle messageReceiptHandle =
event.getMessageReceiptHandle();
ReceiptHandle handle =
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
- messagingProcessor.changeInvisibleTime(proxyContext, handle,
messageReceiptHandle.getMessageId(),
+ messagingProcessor.changeInvisibleTime(PROXY_CONTEXT, handle,
messageReceiptHandle.getMessageId(),
messageReceiptHandle.getGroup(),
messageReceiptHandle.getTopic(), event.getRenewTime())
.whenComplete((v, t) -> {
if (t != null) {
@@ -115,8 +115,8 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
.offset(OFFSET)
.commitLogOffset(0L)
.build().encode();
- proxyContext = proxyContext.withValue(ContextVariable.CLIENT_ID,
"channel-id");
- proxyContext = proxyContext.withValue(ContextVariable.CHANNEL, new
LocalChannel());
+ PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
+ PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new LocalChannel());
Mockito.doNothing().when(consumerManager).appendConsumerIdsChangeListener(Mockito.any(ConsumerIdsChangeListener.class));
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
@@ -125,7 +125,7 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testAddReceiptHandle() {
Channel channel = new LocalChannel();
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleManager.scheduleRenewTask();
@@ -137,7 +137,7 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testAddDuplicationMessage() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
{
String receiptHandle = ReceiptHandle.builder()
.startOffset(0L)
@@ -152,9 +152,9 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
.build().encode();
MessageReceiptHandle messageReceiptHandle = new
MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- receiptHandleManager.addReceiptHandle(proxyContext, channel,
GROUP, MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel,
GROUP, MSG_ID, messageReceiptHandle);
}
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleManager.scheduleRenewTask();
@@ -169,8 +169,8 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testRenewReceiptHandle() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -214,9 +214,9 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testRenewExceedMaxRenewTimes() {
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new MQClientException(0,
"error"));
@@ -244,9 +244,9 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testRenewWithInvalidHandle() {
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -268,9 +268,9 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testRenewWithErrorThenOK() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
AtomicInteger count = new AtomicInteger(0);
List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -347,8 +347,8 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
.build().encode();
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -381,8 +381,8 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
.build().encode();
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(null);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(),
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyLong()))
@@ -417,8 +417,8 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
.build().encode();
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
@@ -430,9 +430,9 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testRemoveReceiptHandle() {
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
- receiptHandleManager.removeReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, receiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ receiptHandleManager.removeReceiptHandle(PROXY_CONTEXT, channel,
GROUP, MSG_ID, receiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleManager.scheduleRenewTask();
@@ -443,8 +443,8 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
@Test
public void testClearGroup() {
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
receiptHandleManager.clearGroup(new ReceiptHandleGroupKey(channel,
GROUP));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
@@ -458,8 +458,8 @@ public class DefaultReceiptHandleManagerTest extends
BaseServiceTest {
public void testClientOffline() {
ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor =
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
Mockito.verify(consumerManager,
Mockito.times(1)).appendConsumerIdsChangeListener(listenerArgumentCaptor.capture());
- Channel channel = proxyContext.getValue(ContextVariable.CHANNEL);
- receiptHandleManager.addReceiptHandle(proxyContext, channel, GROUP,
MSG_ID, messageReceiptHandle);
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, messageReceiptHandle);
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER,
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 7e4df145df..9a2c5e3437 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -146,7 +146,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
-
ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress),
+
ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
clientId);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
grpcClientChannel,
@@ -345,7 +345,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
GrpcChannelManager grpcChannelManager = mock(GrpcChannelManager.class);
GrpcClientChannel grpcClientChannel = new GrpcClientChannel(
proxyRelayService, grpcClientSettingsManager, grpcChannelManager,
-
ProxyContext.create().withRemoteAddress(remoteAddress).withLocalAddress(localAddress),
+
ProxyContext.create().setRemoteAddress(remoteAddress).setLocalAddress(localAddress),
clientId);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
grpcClientChannel,