This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new f37502c005 triple support upper header key (#10761)
f37502c005 is described below
commit f37502c0058e62bc994805088ee4564b56ce1d03
Author: earthchen <[email protected]>
AuthorDate: Fri Oct 21 13:59:33 2022 +0800
triple support upper header key (#10761)
* tri support un lower header
* change config only one
* fix ut
* revert format
* revert format
* refactor
* ci
---
.../main/java/org/apache/dubbo/rpc/Constants.java | 2 ++
.../dubbo/rpc/protocol/tri/RequestMetadata.java | 4 +--
.../dubbo/rpc/protocol/tri/TripleHeaderEnum.java | 6 ++++-
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 1 +
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 12 +++++++++
.../dubbo/rpc/protocol/tri/stream/StreamUtils.java | 31 +++++++++++++++++++---
.../protocol/tri/stream/TripleClientStream.java | 28 ++++++++++++++++++-
.../protocol/tri/stream/TripleServerStream.java | 3 ++-
8 files changed, 79 insertions(+), 8 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index 8bb27cbde3..2d1e125cbb 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -103,4 +103,6 @@ public interface Constants {
String H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY =
"dubbo.rpc.tri.initial-window-size";
String H2_SETTINGS_MAX_FRAME_SIZE_KEY = "dubbo.rpc.tri.max-frame-size";
String H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY =
"dubbo.rpc.tri.max-header-list-size";
+
+ String H2_SUPPORT_NO_LOWER_HEADER_KEY =
"dubbo.rpc.tri.support-no-lower-header";
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
index 0097b78fe4..b1ea629aa9 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/RequestMetadata.java
@@ -47,7 +47,7 @@ public class RequestMetadata {
public MethodDescriptor method;
public PackableMethod packableMethod;
public Map<String, Object> attachments;
-
+ public boolean convertNoLowerHeader;
public DefaultHttp2Headers toHeaders() {
DefaultHttp2Headers header = new DefaultHttp2Headers(false);
@@ -70,7 +70,7 @@ public class RequestMetadata {
setIfNotNull(header, TripleHeaderEnum.GRPC_ENCODING.getHeader(),
compressor.getMessageEncoding());
}
- StreamUtils.convertAttachment(header, attachments);
+ StreamUtils.convertAttachment(header, attachments,
convertNoLowerHeader);
return header;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 4927b3e6d1..b01e89cbbe 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -40,7 +40,11 @@ public enum TripleHeaderEnum {
GRPC_ACCEPT_ENCODING("grpc-accept-encoding"),
CONSUMER_APP_NAME_KEY("tri-consumer-appname"),
SERVICE_VERSION("tri-service-version"),
- SERVICE_GROUP("tri-service-group");
+ SERVICE_GROUP("tri-service-group"),
+
+ TRI_HEADER_CONVERT("tri-header-convert"),
+
+ ;
static final Map<String, TripleHeaderEnum> enumMap = new HashMap<>();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index c9ea8c375c..63df5735a6 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -239,6 +239,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
} else {
meta.packableMethod =
ReflectionPackableMethod.init(methodDescriptor, url);
}
+ meta.convertNoLowerHeader = TripleProtocol.CONVERT_NO_LOWER_HEADER;
meta.method = methodDescriptor;
meta.scheme = getSchemeFromUrl(url);
// TODO read compressor from config
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index a7b0683d9d..5244e9de32 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -18,6 +18,7 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
@@ -48,6 +49,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import static org.apache.dubbo.rpc.Constants.H2_SUPPORT_NO_LOWER_HEADER_KEY;
+
public class TripleProtocol extends AbstractProtocol {
@@ -57,13 +60,22 @@ public class TripleProtocol extends AbstractProtocol {
private final TriBuiltinService triBuiltinService;
private final ConnectionManager connectionManager;
private final String acceptEncodings;
+
+ /**
+ * There is only one
+ */
+ public static boolean CONVERT_NO_LOWER_HEADER = false;
+
private boolean versionChecked = false;
+
public TripleProtocol(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
this.triBuiltinService = new TriBuiltinService(frameworkModel);
this.pathResolver =
frameworkModel.getExtensionLoader(PathResolver.class)
.getDefaultExtension();
+ CONVERT_NO_LOWER_HEADER =
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel())
+ .getBoolean(H2_SUPPORT_NO_LOWER_HEADER_KEY, false);
Set<String> supported =
frameworkModel.getExtensionLoader(DeCompressor.class)
.getSupportedExtensions();
this.acceptEncodings = String.join(",", supported);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
index 228c651e91..e0c6f61400 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/StreamUtils.java
@@ -19,6 +19,8 @@ package org.apache.dubbo.rpc.protocol.tri.stream;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.JsonUtils;
+import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
@@ -31,6 +33,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.stream.Collectors;
public class StreamUtils {
@@ -73,14 +76,17 @@ public class StreamUtils {
* Parse and put the KV pairs into metadata. Ignore Http2 PseudoHeaderName
and internal name.
* Only raw byte array or string value will be put.
*
- * @param headers the metadata holder
- * @param attachments KV pairs
+ * @param headers the metadata holder
+ * @param attachments KV pairs
+ * @param needConvertHeaderKey convert flag
*/
public static void convertAttachment(DefaultHttp2Headers headers,
- Map<String, Object> attachments) {
+ Map<String, Object> attachments,
+ boolean needConvertHeaderKey) {
if (attachments == null) {
return;
}
+
for (Map.Entry<String, Object> entry : attachments.entrySet()) {
final String key = entry.getKey().toLowerCase(Locale.ROOT);
if (Http2Headers.PseudoHeaderName.isPseudoHeader(key)) {
@@ -92,6 +98,22 @@ public class StreamUtils {
final Object v = entry.getValue();
convertSingleAttachment(headers, key, v);
}
+ if (needConvertHeaderKey) {
+ Map<String, String> needConvertKey = attachments.entrySet()
+ .stream()
+ .filter(it -> !headers.contains(it.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, it ->
it.getKey().toLowerCase(Locale.ROOT)));
+ if (!needConvertKey.isEmpty()) {
+ String needConvertJson =
JsonUtils.getJson().toJson(needConvertKey);
+ headers.add(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader(),
TriRpcStatus.encodeMessage(needConvertJson));
+ }
+ }
+ }
+
+
+ public static void convertAttachment(DefaultHttp2Headers headers,
+ Map<String, Object> attachments) {
+ convertAttachment(headers, attachments, false);
}
/**
@@ -109,6 +131,8 @@ public class StreamUtils {
} else if (v instanceof byte[]) {
String str = encodeBase64ASCII((byte[]) v);
headers.set(key + TripleConstant.HEADER_BIN_SUFFIX, str);
+ } else {
+ LOGGER.warn("Unsupported attachment k: " + key + " class: " +
v.getClass().getName());
}
} catch (Throwable t) {
LOGGER.warn("Meet exception when convert single attachment key:" +
key + " value=" + v,
@@ -117,4 +141,5 @@ public class StreamUtils {
}
+
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index a3d19abb56..41815f6fab 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -17,6 +17,9 @@
package org.apache.dubbo.rpc.protocol.tri.stream;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
@@ -57,6 +60,8 @@ import java.util.concurrent.Executor;
*/
public class TripleClientStream extends AbstractStream implements ClientStream
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TripleClientStream.class);
+
public final ClientStream.Listener listener;
private final WriteQueue writeQueue;
private Deframer deframer;
@@ -189,7 +194,28 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
final Map<String, String> reserved =
filterReservedHeaders(trailers);
final Map<String, Object> attachments = headersToMap(trailers);
- listener.onComplete(status, attachments, reserved);
+ final Map<String, Object> finalAttachments =
convertNoLowerCaseHeader(attachments);
+ listener.onComplete(status, finalAttachments, reserved);
+ }
+
+ private Map<String, Object> convertNoLowerCaseHeader(Map<String,
Object> attachments) {
+ Object obj =
attachments.remove(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
+ if (obj == null) {
+ return attachments;
+ }
+ if (obj instanceof String) {
+ String json = TriRpcStatus.decodeMessage((String) obj);
+ Map<String, String> map =
JsonUtils.getJson().toJavaObject(json, Map.class);
+ map.forEach((originalKey, lowerCaseKey) -> {
+ Object val = attachments.remove(lowerCaseKey);
+ if (val != null) {
+ attachments.put(originalKey, val);
+ }
+ });
+ } else {
+ LOGGER.error("Triple convertNoLowerCaseHeader error, obj is
not String");
+ }
+ return attachments;
}
private TriRpcStatus validateHeaderStatus(Http2Headers headers) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index a9aa76c2aa..f289bb1706 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -29,6 +29,7 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
+import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
import org.apache.dubbo.rpc.protocol.tri.call.ReflectionAbstractServerCall;
import org.apache.dubbo.rpc.protocol.tri.call.StubAbstractServerCall;
import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
@@ -173,7 +174,7 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
headers.status(HttpResponseStatus.OK.codeAsText());
headers.set(HttpHeaderNames.CONTENT_TYPE,
TripleConstant.CONTENT_PROTO);
}
- StreamUtils.convertAttachment(headers, attachments);
+ StreamUtils.convertAttachment(headers, attachments,
TripleProtocol.CONVERT_NO_LOWER_HEADER);
headers.set(TripleHeaderEnum.STATUS_KEY.getHeader(),
String.valueOf(rpcStatus.code.code));
if (rpcStatus.isOk()) {
return headers;