This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new f37502c005 triple support upper header key (#10761)
f37502c005 is described below

commit f37502c0058e62bc994805088ee4564b56ce1d03
Author: earthchen <[email protected]>
AuthorDate: Fri Oct 21 13:59:33 2022 +0800

    triple support upper header key (#10761)
    
    * tri support un lower header
    
    * change config only one
    
    * fix ut
    
    * revert format
    
    * revert format
    
    * refactor
    
    * ci
---
 .../main/java/org/apache/dubbo/rpc/Constants.java  |  2 ++
 .../dubbo/rpc/protocol/tri/RequestMetadata.java    |  4 +--
 .../dubbo/rpc/protocol/tri/TripleHeaderEnum.java   |  6 ++++-
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |  1 +
 .../dubbo/rpc/protocol/tri/TripleProtocol.java     | 12 +++++++++
 .../dubbo/rpc/protocol/tri/stream/StreamUtils.java | 31 +++++++++++++++++++---
 .../protocol/tri/stream/TripleClientStream.java    | 28 ++++++++++++++++++-
 .../protocol/tri/stream/TripleServerStream.java    |  3 ++-
 8 files changed, 79 insertions(+), 8 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index 8bb27cbde3..2d1e125cbb 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -103,4 +103,6 @@ public interface Constants {
     String H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY = 
"dubbo.rpc.tri.initial-window-size";
     String H2_SETTINGS_MAX_FRAME_SIZE_KEY = "dubbo.rpc.tri.max-frame-size";
     String H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY = 
"dubbo.rpc.tri.max-header-list-size";
+
+    String H2_SUPPORT_NO_LOWER_HEADER_KEY = 
"dubbo.rpc.tri.support-no-lower-header";
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
index 0097b78fe4..b1ea629aa9 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
@@ -47,7 +47,7 @@ public class RequestMetadata {
     public MethodDescriptor method;
     public PackableMethod packableMethod;
     public Map<String, Object> attachments;
-
+    public boolean convertNoLowerHeader;
 
     public DefaultHttp2Headers toHeaders() {
         DefaultHttp2Headers header = new DefaultHttp2Headers(false);
@@ -70,7 +70,7 @@ public class RequestMetadata {
             setIfNotNull(header, TripleHeaderEnum.GRPC_ENCODING.getHeader(),
                 compressor.getMessageEncoding());
         }
-        StreamUtils.convertAttachment(header, attachments);
+        StreamUtils.convertAttachment(header, attachments, 
convertNoLowerHeader);
         return header;
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 4927b3e6d1..b01e89cbbe 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -40,7 +40,11 @@ public enum TripleHeaderEnum {
     GRPC_ACCEPT_ENCODING("grpc-accept-encoding"),
     CONSUMER_APP_NAME_KEY("tri-consumer-appname"),
     SERVICE_VERSION("tri-service-version"),
-    SERVICE_GROUP("tri-service-group");
+    SERVICE_GROUP("tri-service-group"),
+
+    TRI_HEADER_CONVERT("tri-header-convert"),
+
+    ;
 
     static final Map<String, TripleHeaderEnum> enumMap = new HashMap<>();
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index c9ea8c375c..63df5735a6 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -239,6 +239,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         } else {
             meta.packableMethod = 
ReflectionPackableMethod.init(methodDescriptor, url);
         }
+        meta.convertNoLowerHeader = TripleProtocol.CONVERT_NO_LOWER_HEADER;
         meta.method = methodDescriptor;
         meta.scheme = getSchemeFromUrl(url);
         // TODO read compressor from config
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index a7b0683d9d..5244e9de32 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -18,6 +18,7 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
@@ -48,6 +49,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+import static org.apache.dubbo.rpc.Constants.H2_SUPPORT_NO_LOWER_HEADER_KEY;
+
 public class TripleProtocol extends AbstractProtocol {
 
 
@@ -57,13 +60,22 @@ public class TripleProtocol extends AbstractProtocol {
     private final TriBuiltinService triBuiltinService;
     private final ConnectionManager connectionManager;
     private final String acceptEncodings;
+
+    /**
+     * There is only one
+     */
+    public static boolean CONVERT_NO_LOWER_HEADER = false;
+
     private boolean versionChecked = false;
 
+
     public TripleProtocol(FrameworkModel frameworkModel) {
         this.frameworkModel = frameworkModel;
         this.triBuiltinService = new TriBuiltinService(frameworkModel);
         this.pathResolver = 
frameworkModel.getExtensionLoader(PathResolver.class)
             .getDefaultExtension();
+        CONVERT_NO_LOWER_HEADER = 
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel())
+            .getBoolean(H2_SUPPORT_NO_LOWER_HEADER_KEY, false);
         Set<String> supported = 
frameworkModel.getExtensionLoader(DeCompressor.class)
             .getSupportedExtensions();
         this.acceptEncodings = String.join(",", supported);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
index 228c651e91..e0c6f61400 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
@@ -19,6 +19,8 @@ package org.apache.dubbo.rpc.protocol.tri.stream;
 
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.JsonUtils;
+import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
 import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
 
@@ -31,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class StreamUtils {
 
@@ -73,14 +76,17 @@ public class StreamUtils {
      * Parse and put the KV pairs into metadata. Ignore Http2 PseudoHeaderName 
and internal name.
      * Only raw byte array or string value will be put.
      *
-     * @param headers     the metadata holder
-     * @param attachments KV pairs
+     * @param headers              the metadata holder
+     * @param attachments          KV pairs
+     * @param needConvertHeaderKey convert flag
      */
     public static void convertAttachment(DefaultHttp2Headers headers,
-        Map<String, Object> attachments) {
+                                         Map<String, Object> attachments,
+                                         boolean needConvertHeaderKey) {
         if (attachments == null) {
             return;
         }
+
         for (Map.Entry<String, Object> entry : attachments.entrySet()) {
             final String key = entry.getKey().toLowerCase(Locale.ROOT);
             if (Http2Headers.PseudoHeaderName.isPseudoHeader(key)) {
@@ -92,6 +98,22 @@ public class StreamUtils {
             final Object v = entry.getValue();
             convertSingleAttachment(headers, key, v);
         }
+        if (needConvertHeaderKey) {
+            Map<String, String> needConvertKey = attachments.entrySet()
+                .stream()
+                .filter(it -> !headers.contains(it.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, it -> 
it.getKey().toLowerCase(Locale.ROOT)));
+            if (!needConvertKey.isEmpty()) {
+                String needConvertJson = 
JsonUtils.getJson().toJson(needConvertKey);
+                headers.add(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader(), 
TriRpcStatus.encodeMessage(needConvertJson));
+            }
+        }
+    }
+
+
+    public static void convertAttachment(DefaultHttp2Headers headers,
+                                         Map<String, Object> attachments) {
+        convertAttachment(headers, attachments, false);
     }
 
     /**
@@ -109,6 +131,8 @@ public class StreamUtils {
             } else if (v instanceof byte[]) {
                 String str = encodeBase64ASCII((byte[]) v);
                 headers.set(key + TripleConstant.HEADER_BIN_SUFFIX, str);
+            } else {
+                LOGGER.warn("Unsupported attachment k: " + key + " class: " + 
v.getClass().getName());
             }
         } catch (Throwable t) {
             LOGGER.warn("Meet exception when convert single attachment key:" + 
key + " value=" + v,
@@ -117,4 +141,5 @@ public class StreamUtils {
     }
 
 
+
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index a3d19abb56..41815f6fab 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -17,6 +17,9 @@
 
 package org.apache.dubbo.rpc.protocol.tri.stream;
 
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.JsonUtils;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
@@ -57,6 +60,8 @@ import java.util.concurrent.Executor;
  */
 public class TripleClientStream extends AbstractStream implements ClientStream 
{
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TripleClientStream.class);
+
     public final ClientStream.Listener listener;
     private final WriteQueue writeQueue;
     private Deframer deframer;
@@ -189,7 +194,28 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
 
             final Map<String, String> reserved = 
filterReservedHeaders(trailers);
             final Map<String, Object> attachments = headersToMap(trailers);
-            listener.onComplete(status, attachments, reserved);
+            final Map<String, Object> finalAttachments = 
convertNoLowerCaseHeader(attachments);
+            listener.onComplete(status, finalAttachments, reserved);
+        }
+
+        private Map<String, Object> convertNoLowerCaseHeader(Map<String, 
Object> attachments) {
+            Object obj = 
attachments.remove(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
+            if (obj == null) {
+                return attachments;
+            }
+            if (obj instanceof String) {
+                String json = TriRpcStatus.decodeMessage((String) obj);
+                Map<String, String> map = 
JsonUtils.getJson().toJavaObject(json, Map.class);
+                map.forEach((originalKey, lowerCaseKey) -> {
+                    Object val = attachments.remove(lowerCaseKey);
+                    if (val != null) {
+                        attachments.put(originalKey, val);
+                    }
+                });
+            } else {
+                LOGGER.error("Triple convertNoLowerCaseHeader error, obj is 
not String");
+            }
+            return attachments;
         }
 
         private TriRpcStatus validateHeaderStatus(Http2Headers headers) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index a9aa76c2aa..f289bb1706 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -29,6 +29,7 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
 import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
 import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
+import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
 import org.apache.dubbo.rpc.protocol.tri.call.ReflectionAbstractServerCall;
 import org.apache.dubbo.rpc.protocol.tri.call.StubAbstractServerCall;
 import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
@@ -173,7 +174,7 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
             headers.status(HttpResponseStatus.OK.codeAsText());
             headers.set(HttpHeaderNames.CONTENT_TYPE, 
TripleConstant.CONTENT_PROTO);
         }
-        StreamUtils.convertAttachment(headers, attachments);
+        StreamUtils.convertAttachment(headers, attachments, 
TripleProtocol.CONVERT_NO_LOWER_HEADER);
         headers.set(TripleHeaderEnum.STATUS_KEY.getHeader(), 
String.valueOf(rpcStatus.code.code));
         if (rpcStatus.isOk()) {
             return headers;

Reply via email to