This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new ce73b2d Add Header Filter for Triple Protocol (#8585)
ce73b2d is described below
commit ce73b2d06e47196a397af8ab62b8821e079347fe
Author: Albumen Kevin <[email protected]>
AuthorDate: Mon Aug 30 10:38:30 2021 +0800
Add Header Filter for Triple Protocol (#8585)
* Add Header Filter for Triple Protocol
use to check token
* exact header filter to buildInvocation
* change exception to FORBIDDEN_EXCEPTION
---
.../dubbo/common/constants/CommonConstants.java | 2 +
.../java/org/apache/dubbo/rpc/HeaderFilter.java | 25 +++++++++++
.../apache/dubbo/rpc/filter/TokenHeaderFilter.java | 49 ++++++++++++++++++++++
.../internal/org.apache.dubbo.rpc.HeaderFilter | 1 +
.../rpc/protocol/tri/AbstractServerStream.java | 12 ++++++
.../dubbo/rpc/protocol/tri/ServerStream.java | 26 +++++++-----
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 4 +-
7 files changed, 106 insertions(+), 13 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 2f93414..7c59e88 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -298,6 +298,8 @@ public interface CommonConstants {
String REFERENCE_FILTER_KEY = "reference.filter";
+ String HEADER_FILTER_KEY = "header.filter";
+
String INVOCATION_INTERCEPTOR_KEY = "invocation.interceptor";
String INVOKER_LISTENER_KEY = "invoker.listener";
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/HeaderFilter.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/HeaderFilter.java
new file mode 100644
index 0000000..e82b697
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/HeaderFilter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.extension.SPI;
+
+@SPI
+public interface HeaderFilter {
+
+ RpcInvocation invoke(Invoker<?> invoker, RpcInvocation invocation) throws
RpcException;
+}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenHeaderFilter.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenHeaderFilter.java
new file mode 100644
index 0000000..63b0051
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenHeaderFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.utils.ConfigUtils;
+import org.apache.dubbo.rpc.HeaderFilter;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+
+import java.util.Map;
+
+import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+import static org.apache.dubbo.rpc.RpcException.FORBIDDEN_EXCEPTION;
+
+@Activate
+public class TokenHeaderFilter implements HeaderFilter {
+ @Override
+ public RpcInvocation invoke(Invoker<?> invoker, RpcInvocation invocation)
throws RpcException {
+ String token = invoker.getUrl().getParameter(TOKEN_KEY);
+ if (ConfigUtils.isNotEmpty(token)) {
+ Class<?> serviceType = invoker.getInterface();
+ Map<String, Object> attachments =
invocation.getObjectAttachments();
+ String remoteToken = (attachments == null ? null : (String)
attachments.get(TOKEN_KEY));
+ if (!token.equals(remoteToken)) {
+ throw new RpcException(FORBIDDEN_EXCEPTION, "Forbid invoke
remote service " + serviceType + " method " + invocation.getMethodName() +
+ "() from consumer " +
RpcContext.getServiceContext().getRemoteHost() + " to provider " +
+ RpcContext.getServiceContext().getLocalHost() + ",
consumer incorrect token is " + remoteToken);
+ }
+ }
+ return invocation;
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.HeaderFilter
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.HeaderFilter
new file mode 100644
index 0000000..0512e68
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.HeaderFilter
@@ -0,0 +1 @@
+token=org.apache.dubbo.rpc.filter.TokenHeaderFilter
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index ce155ba..c1df0ef 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -40,6 +41,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import static
org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KEY;
+
public abstract class AbstractServerStream extends AbstractStream implements
Stream {
protected static final ExecutorRepository EXECUTOR_REPOSITORY =
@@ -47,6 +50,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
private final ProviderModel providerModel;
private List<MethodDescriptor> methodDescriptors;
private Invoker<?> invoker;
+ private List<HeaderFilter> headerFilters;
protected AbstractServerStream(URL url) {
this(url, lookupProviderModel(url));
@@ -60,6 +64,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
super(url, executor);
this.providerModel = providerModel;
this.serialize(getUrl().getParameter(Constants.SERIALIZATION_KEY,
Constants.DEFAULT_REMOTING_SERIALIZATION));
+ this.headerFilters =
ExtensionLoader.getExtensionLoader(HeaderFilter.class).getActivateExtension(url,
HEADER_FILTER_KEY);
}
private static Executor lookupExecutor(URL url, ProviderModel
providerModel) {
@@ -107,6 +112,10 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
return invoker;
}
+ public List<HeaderFilter> getHeaderFilters() {
+ return headerFilters;
+ }
+
public ProviderModel getProviderModel() {
return providerModel;
}
@@ -122,6 +131,9 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
final Map<String, Object> attachments =
parseMetadataToAttachmentMap(metadata);
inv.setObjectAttachments(attachments);
+ for (HeaderFilter headerFilter : getHeaderFilters()) {
+ inv = headerFilter.invoke(getInvoker(), inv);
+ }
return inv;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 98f3ec3..75e01d9 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -54,8 +54,8 @@ public class ServerStream extends AbstractServerStream
implements Stream {
@Override
public void onError(Throwable throwable) {
final GrpcStatus status =
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(throwable)
- .withDescription("Biz exception");
+ .withCause(throwable)
+ .withDescription("Biz exception");
transportError(status);
}
@@ -83,27 +83,31 @@ public class ServerStream extends AbstractServerStream
implements Stream {
subscribe((StreamObserver<Object>) result.getValue());
} catch (Throwable t) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Failed to create server's
observer"));
+ .withDescription("Failed to create server's observer"));
}
}
@Override
public void onData(byte[] in, boolean endStream, OperationHandler
handler) {
try {
- final Object[] arguments = deserializeRequest(in);
- if (arguments != null) {
- if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
- final RpcInvocation inv =
buildInvocation(getHeaders());
+ if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
+ RpcInvocation inv = buildInvocation(getHeaders());
+ final Object[] arguments = deserializeRequest(in);
+ if (arguments != null) {
inv.setArguments(new Object[]{arguments[0],
asStreamObserver()});
getInvoker().invoke(inv);
- } else {
+ }
+ } else {
+ final Object[] arguments = deserializeRequest(in);
+ if (arguments != null) {
getStreamSubscriber().onNext(arguments[0]);
}
}
- } catch (Throwable t) {
+ } catch (
+ Throwable t) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Deserialize request failed")
- .withCause(t));
+ .withDescription("Deserialize request failed")
+ .withCause(t));
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index e7cb9fb..c9de0cd 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -66,11 +66,11 @@ public class UnaryServerStream extends AbstractServerStream
implements Stream {
public void invoke() {
- final RpcInvocation invocation;
+ RpcInvocation invocation;
try {
+ invocation = buildInvocation(getHeaders());
final Object[] arguments = deserializeRequest(getData());
if (arguments != null) {
- invocation = buildInvocation(getHeaders());
invocation.setArguments(arguments);
} else {
return;