This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5262358140 [ISSUE #7699] Refector NamespaceRpcHook (#7769)
5262358140 is described below
commit 5262358140bcf7b283754a71dd16c2a5c6dbf821
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Jan 23 13:56:26 2024 +0800
[ISSUE #7699] Refector NamespaceRpcHook (#7769)
* [ISSUE #7699] Refector NamespaceRpcHook
* fix
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 6 +++--
.../rocketmq/client/impl/MQClientAPIImpl.java | 6 +++--
.../client/impl/mqclient/MQClientAPIExt.java | 6 +++--
.../rocketmq/client/rpchook/NamespaceRpcHook.java | 13 ++++------
.../client/rpchook/NamespaceRpcHookTest.java | 8 +++----
.../java/org/apache/rocketmq/common/MixAll.java | 2 ++
.../protocol/header/LockBatchMqRequestHeader.java | 28 ++++++++++++++++++++++
.../header/UnlockBatchMqRequestHeader.java | 28 ++++++++++++++++++++++
8 files changed, 78 insertions(+), 19 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 3827beb5b6..01745a2b79 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -102,11 +102,13 @@ import
org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
@@ -906,7 +908,7 @@ public class BrokerOuterAPI {
final LockBatchRequestBody requestBody,
final long timeoutMillis,
final LockCallback callback) throws RemotingException,
InterruptedException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new
LockBatchMqRequestHeader());
request.setBody(requestBody.encode());
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new
InvokeCallback() {
@@ -945,7 +947,7 @@ public class BrokerOuterAPI {
final UnlockBatchRequestBody requestBody,
final long timeoutMillis,
final UnlockCallback callback) throws RemotingException,
InterruptedException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new
UnlockBatchMqRequestHeader());
request.setBody(requestBody.encode());
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index f46dbe3124..1b4b3878c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -173,6 +173,7 @@ import
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetTopicsByClusterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
@@ -195,6 +196,7 @@ import
org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
@@ -1613,7 +1615,7 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
final String addr,
final LockBatchRequestBody requestBody,
final long timeoutMillis) throws RemotingException, MQBrokerException,
InterruptedException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new
LockBatchMqRequestHeader());
request.setBody(requestBody.encode());
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
@@ -1637,7 +1639,7 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
final long timeoutMillis,
final boolean oneway
) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new
UnlockBatchMqRequestHeader());
request.setBody(requestBody.encode());
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index 3d8625937c..b97e00c577 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -67,6 +67,7 @@ import
org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.HeartbeatRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.LockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
@@ -77,6 +78,7 @@ import
org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
@@ -543,7 +545,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
public CompletableFuture<Set<MessageQueue>> lockBatchMQWithFuture(String
brokerAddr,
LockBatchRequestBody requestBody, long timeoutMillis) {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, new
LockBatchMqRequestHeader());
request.setBody(requestBody.encode());
return this.getRemotingClient().invoke(brokerAddr, request,
timeoutMillis).thenCompose(response -> {
CompletableFuture<Set<MessageQueue>> future0 = new
CompletableFuture<>();
@@ -565,7 +567,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
public CompletableFuture<Void> unlockBatchMQOneway(String brokerAddr,
UnlockBatchRequestBody requestBody, long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, new
UnlockBatchMqRequestHeader());
request.setBody(requestBody.encode());
try {
this.getRemotingClient().invokeOneway(brokerAddr, request,
timeoutMillis);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java
b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java
index 7deee0a9f3..0178b2ca91 100644
---
a/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java
+++
b/client/src/main/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHook.java
@@ -19,10 +19,9 @@ package org.apache.rocketmq.client.rpchook;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
public class NamespaceRpcHook implements RPCHook {
private final ClientConfig clientConfig;
@@ -33,13 +32,9 @@ public class NamespaceRpcHook implements RPCHook {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
- CommandCustomHeader customHeader = request.readCustomHeader();
- if (customHeader instanceof RpcRequestHeader) {
- RpcRequestHeader requestHeader = (RpcRequestHeader) customHeader;
- if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) {
- requestHeader.setNamespaced(true);
- requestHeader.setNamespace(clientConfig.getNamespaceV2());
- }
+ if (StringUtils.isNotEmpty(clientConfig.getNamespaceV2())) {
+ request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD,
"true");
+ request.addExtField(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD,
clientConfig.getNamespaceV2());
}
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
index 1551ce0935..385c2ceec0 100644
---
a/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/rpchook/NamespaceRpcHookTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.client.rpchook;
import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
@@ -39,8 +40,8 @@ public class NamespaceRpcHookTest {
PullMessageRequestHeader pullMessageRequestHeader = new
PullMessageRequestHeader();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,
pullMessageRequestHeader);
namespaceRpcHook.doBeforeRequest("", request);
- assertThat(pullMessageRequestHeader.getNamespaced()).isTrue();
-
assertThat(pullMessageRequestHeader.getNamespace()).isEqualTo(namespace);
+
assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACED_FIELD)).isEqualTo("true");
+
assertThat(request.getExtFields().get(MixAll.RPC_REQUEST_HEADER_NAMESPACE_FIELD)).isEqualTo(namespace);
}
@Test
@@ -50,7 +51,6 @@ public class NamespaceRpcHookTest {
PullMessageRequestHeader pullMessageRequestHeader = new
PullMessageRequestHeader();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,
pullMessageRequestHeader);
namespaceRpcHook.doBeforeRequest("", request);
- assertThat(pullMessageRequestHeader.getNamespaced()).isNull();
- assertThat(pullMessageRequestHeader.getNamespace()).isNull();
+ assertThat(request.getExtFields()).isNull();
}
}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index c11eb377b9..cdcc54cd92 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -108,6 +108,8 @@ public class MixAll {
public static final String ROCKETMQ_ZONE_MODE_PROPERTY =
"rocketmq.zone.mode";
public static final String ZONE_NAME = "__ZONE_NAME";
public static final String ZONE_MODE = "__ZONE_MODE";
+ public final static String RPC_REQUEST_HEADER_NAMESPACED_FIELD = "nsd";
+ public final static String RPC_REQUEST_HEADER_NAMESPACE_FIELD = "ns";
private static final Logger log =
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java
new file mode 100644
index 0000000000..3484fa7d3e
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/LockBatchMqRequestHeader.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
+
+public class LockBatchMqRequestHeader extends RpcRequestHeader {
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java
new file mode 100644
index 0000000000..e7a44f2f8b
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/UnlockBatchMqRequestHeader.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
+
+public class UnlockBatchMqRequestHeader extends RpcRequestHeader {
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+}