This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 95b1c51f86 Add Utils for put header to Metadata to avoid duplicate 
data. (#8792)
95b1c51f86 is described below

commit 95b1c51f86592cbdb04b3827f95d571e883af804
Author: dingshuangxi888 <[email protected]>
AuthorDate: Wed Oct 9 10:37:19 2024 +0800

    Add Utils for put header to Metadata to avoid duplicate data. (#8792)
---
 .../rocketmq/proxy/common/utils/GrpcUtils.java     | 45 ++++++++++++++++++++++
 .../interceptor/AuthenticationInterceptor.java     |  7 ++--
 .../proxy/grpc/interceptor/HeaderInterceptor.java  |  9 +++--
 .../grpc/pipeline/AuthenticationPipeline.java      |  3 +-
 4 files changed, 56 insertions(+), 8 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java
new file mode 100644
index 0000000000..5c50de4426
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/utils/GrpcUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.common.utils;
+
+import io.grpc.Attributes;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+
+public class GrpcUtils {
+
+    private GrpcUtils() {
+    }
+
+    public static <T> void putHeaderIfNotExist(Metadata headers, 
Metadata.Key<T> key, T value) {
+        if (headers == null) {
+            return;
+        }
+        if (!headers.containsKey(key) && value != null) {
+            headers.put(key, value);
+        }
+    }
+
+    public static <R, W, T> T getAttribute(ServerCall<R, W> call, 
Attributes.Key<T> key) {
+        Attributes attributes = call.getAttributes();
+        if (attributes == null) {
+            return null;
+        }
+        return attributes.get(key);
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
index 28ee019fae..e082ba6e28 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/AuthenticationInterceptor.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.acl.common.AclException;
 import org.apache.rocketmq.acl.common.AuthenticationHeader;
 import org.apache.rocketmq.acl.plain.PlainAccessResource;
 import org.apache.rocketmq.common.constant.GrpcConstants;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 
 public class AuthenticationInterceptor implements ServerInterceptor {
@@ -49,8 +50,8 @@ public class AuthenticationInterceptor implements 
ServerInterceptor {
             @Override
             public void onMessage(R message) {
                 GeneratedMessageV3 messageV3 = (GeneratedMessageV3) message;
-                headers.put(GrpcConstants.RPC_NAME, 
messageV3.getDescriptorForType().getFullName());
-                headers.put(GrpcConstants.SIMPLE_RPC_NAME, 
messageV3.getDescriptorForType().getName());
+                GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.RPC_NAME, 
messageV3.getDescriptorForType().getFullName());
+                GrpcUtils.putHeaderIfNotExist(headers, 
GrpcConstants.SIMPLE_RPC_NAME, messageV3.getDescriptorForType().getName());
                 if (ConfigurationManager.getProxyConfig().isEnableACL()) {
                     try {
                         AuthenticationHeader authenticationHeader = 
AuthenticationHeader.builder()
@@ -85,7 +86,7 @@ public class AuthenticationInterceptor implements 
ServerInterceptor {
 
             if (accessResource instanceof PlainAccessResource) {
                 PlainAccessResource plainAccessResource = 
(PlainAccessResource) accessResource;
-                headers.put(GrpcConstants.AUTHORIZATION_AK, 
plainAccessResource.getAccessKey());
+                GrpcUtils.putHeaderIfNotExist(headers, 
GrpcConstants.AUTHORIZATION_AK, plainAccessResource.getAccessKey());
             }
         }
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
index 1de2ce4f98..e3e7884155 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/interceptor/HeaderInterceptor.java
@@ -27,6 +27,7 @@ import io.grpc.ServerInterceptor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.constant.HAProxyConstants;
 import org.apache.rocketmq.common.constant.GrpcConstants;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
 import org.apache.rocketmq.proxy.grpc.constant.AttributeKeys;
 
 import java.net.InetSocketAddress;
@@ -44,11 +45,11 @@ public class HeaderInterceptor implements ServerInterceptor 
{
             SocketAddress remoteSocketAddress = 
call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
             remoteAddress = parseSocketAddress(remoteSocketAddress);
         }
-        headers.put(GrpcConstants.REMOTE_ADDRESS, remoteAddress);
+        GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.REMOTE_ADDRESS, 
remoteAddress);
 
         SocketAddress localSocketAddress = 
call.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
         String localAddress = parseSocketAddress(localSocketAddress);
-        headers.put(GrpcConstants.LOCAL_ADDRESS, localAddress);
+        GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.LOCAL_ADDRESS, 
localAddress);
 
         for (Attributes.Key<?> key : call.getAttributes().keys()) {
             if (!StringUtils.startsWith(key.toString(), 
HAProxyConstants.PROXY_PROTOCOL_PREFIX)) {
@@ -57,12 +58,12 @@ public class HeaderInterceptor implements ServerInterceptor 
{
             Metadata.Key<String> headerKey
                     = Metadata.Key.of(key.toString(), 
Metadata.ASCII_STRING_MARSHALLER);
             String headerValue = String.valueOf(call.getAttributes().get(key));
-            headers.put(headerKey, headerValue);
+            GrpcUtils.putHeaderIfNotExist(headers, headerKey, headerValue);
         }
 
         String channelId = call.getAttributes().get(AttributeKeys.CHANNEL_ID);
         if (StringUtils.isNotBlank(channelId)) {
-            headers.put(GrpcConstants.CHANNEL_ID, channelId);
+            GrpcUtils.putHeaderIfNotExist(headers, GrpcConstants.CHANNEL_ID, 
channelId);
         }
 
         return next.startCall(call, headers);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
index 58eed91c9f..e317b48f1e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/pipeline/AuthenticationPipeline.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.utils.GrpcUtils;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
 
 public class AuthenticationPipeline implements RequestPipeline {
@@ -73,7 +74,7 @@ public class AuthenticationPipeline implements 
RequestPipeline {
         if (result instanceof DefaultAuthenticationContext) {
             DefaultAuthenticationContext defaultAuthenticationContext = 
(DefaultAuthenticationContext) result;
             if 
(StringUtils.isNotBlank(defaultAuthenticationContext.getUsername())) {
-                headers.put(GrpcConstants.AUTHORIZATION_AK, 
defaultAuthenticationContext.getUsername());
+                GrpcUtils.putHeaderIfNotExist(headers, 
GrpcConstants.AUTHORIZATION_AK, defaultAuthenticationContext.getUsername());
             }
         }
         return result;

Reply via email to