This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 95b1c51f86 Add Utils for put header to Metadata to avoid duplicate
data. (#8792)
95b1c51f86 is described below
commit 95b1c51f86592cbdb04b3827f95d571e883af804
Author: dingshuangxi888 <[email protected]>
AuthorDate: Wed Oct 9 10:37:19 2024 +0800
Add Utils for put header to Metadata to avoid duplicate data. (#8792)
---
.../rocketmq/proxy/common/utils/GrpcUtils.java | 45 ++++++++++++++++++++++
.../interceptor/AuthenticationInterceptor.java | 7 ++--
.../proxy/grpc/interceptor/HeaderInterceptor.java | 9 +++--
.../grpc/pipeline/AuthenticationPipeline.java | 3 +-
4 files changed, 56 insertions(+), 8 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java
new file mode 100644
index 0000000000..5c50de4426
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common.utils;
+
+import io.grpc.Attributes;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+
+public class GrpcUtils {
+
+ private GrpcUtils() {
+ }
+
+ public static <T> void putHeaderIfNotExist(Metadata headers,
Metadata.Key<T> key, T value) {
+ if (headers == null) {
+ return;
+ }
+ if (!headers.containsKey(key) && value != null) {
+ headers.put(key, value);
+ }
+ }
+
+ public static <R, W, T> T getAttribute(ServerCall<R, W> call,
Attributes.Key<T> key) {
+ Attributes attributes = call.getAttributes();
+ if (attributes == null) {
+ return null;
+ }
+ return attributes.get(key);
+ }
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
index 28ee019fae..e082ba6e28 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AuthenticationHeader;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.apache.rocketmq.common.constant.GrpcConstants;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
public class AuthenticationInterceptor implements ServerInterceptor {
@@ -49,8 +50,8 @@ public class AuthenticationInterceptor implements
ServerInterceptor {
@Override
public void onMessage(R message) {
GeneratedMessageV3 messageV3 = (GeneratedMessageV3) message;
- headers.put(GrpcConstants.RPC_NAME,
messageV3.getDescriptorForType().getFullName());
- headers.put(GrpcConstants.SIMPLE_RPC_NAME,
messageV3.getDescriptorForType().getName());
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.RPC_NAME,
messageV3.getDescriptorForType().getFullName());
+ GrpcUtils.putHeaderIfNotExist(headers,
GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName());
if (ConfigurationManager.getProxyConfig().isEnableACL()) {
try {
AuthenticationHeader authenticationHeader =
AuthenticationHeader.builder()
@@ -85,7 +86,7 @@ public class AuthenticationInterceptor implements
ServerInterceptor {
if (accessResource instanceof PlainAccessResource) {
PlainAccessResource plainAccessResource =
(PlainAccessResource) accessResource;
- headers.put(GrpcConstants.AUTHORIZATION_AK,
plainAccessResource.getAccessKey());
+ GrpcUtils.putHeaderIfNotExist(headers,
GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey());
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
index 1de2ce4f98..e3e7884155 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
@@ -27,6 +27,7 @@ import io.grpc.ServerInterceptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.GrpcConstants;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
import java.net.InetSocketAddress;
@@ -44,11 +45,11 @@ public class HeaderInterceptor implements ServerInterceptor
{
SocketAddress remoteSocketAddress =
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
remoteAddress = parseSocketAddress(remoteSocketAddress);
}
- headers.put(GrpcConstants.REMOTE_ADDRESS, remoteAddress);
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.REMOTE_ADDRESS,
remoteAddress);
SocketAddress localSocketAddress =
call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
String localAddress = parseSocketAddress(localSocketAddress);
- headers.put(GrpcConstants.LOCAL_ADDRESS, localAddress);
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.LOCAL_ADDRESS,
localAddress);
for (Attributes.Key<?> key : call.getAttributes().keys()) {
if (!StringUtils.startsWith(key.toString(),
HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
@@ -57,12 +58,12 @@ public class HeaderInterceptor implements ServerInterceptor
{
Metadata.Key<String> headerKey
= Metadata.Key.of(key.toString(),
Metadata.ASCII_STRING_MARSHALLER);
String headerValue = String.valueOf(call.getAttributes().get(key));
- headers.put(headerKey, headerValue);
+ GrpcUtils.putHeaderIfNotExist(headers, headerKey, headerValue);
}
String channelId = call.getAttributes().get(AttributeKeys.CHANNEL_ID);
if (StringUtils.isNotBlank(channelId)) {
- headers.put(GrpcConstants.CHANNEL_ID, channelId);
+ GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.CHANNEL_ID,
channelId);
}
return next.startCall(call, headers);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
index 58eed91c9f..e317b48f1e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
public class AuthenticationPipeline implements RequestPipeline {
@@ -73,7 +74,7 @@ public class AuthenticationPipeline implements
RequestPipeline {
if (result instanceof DefaultAuthenticationContext) {
DefaultAuthenticationContext defaultAuthenticationContext =
(DefaultAuthenticationContext) result;
if
(StringUtils.isNotBlank(defaultAuthenticationContext.getUsername())) {
- headers.put(GrpcConstants.AUTHORIZATION_AK,
defaultAuthenticationContext.getUsername());
+ GrpcUtils.putHeaderIfNotExist(headers,
GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername());
}
}
return result;