This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new ce73b2d  Add Header Filter for Triple Protocol (#8585)
ce73b2d is described below

commit ce73b2d06e47196a397af8ab62b8821e079347fe
Author: Albumen Kevin <[email protected]>
AuthorDate: Mon Aug 30 10:38:30 2021 +0800

    Add Header Filter for Triple Protocol (#8585)
    
    * Add Header Filter for Triple Protocol
    use to check token
    
    * exact header filter to buildInvocation
    
    * change exception to FORBIDDEN_EXCEPTION
---
 .../dubbo/common/constants/CommonConstants.java    |  2 +
 .../java/org/apache/dubbo/rpc/HeaderFilter.java    | 25 +++++++++++
 .../apache/dubbo/rpc/filter/TokenHeaderFilter.java | 49 ++++++++++++++++++++++
 .../internal/org.apache.dubbo.rpc.HeaderFilter     |  1 +
 .../rpc/protocol/tri/AbstractServerStream.java     | 12 ++++++
 .../dubbo/rpc/protocol/tri/ServerStream.java       | 26 +++++++-----
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  |  4 +-
 7 files changed, 106 insertions(+), 13 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 2f93414..7c59e88 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -298,6 +298,8 @@ public interface CommonConstants {
 
     String REFERENCE_FILTER_KEY = "reference.filter";
 
+    String HEADER_FILTER_KEY = "header.filter";
+
     String INVOCATION_INTERCEPTOR_KEY = "invocation.interceptor";
 
     String INVOKER_LISTENER_KEY = "invoker.listener";
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/HeaderFilter.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/HeaderFilter.java
new file mode 100644
index 0000000..e82b697
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/HeaderFilter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.extension.SPI;
+
+@SPI
+public interface HeaderFilter {
+
+    RpcInvocation invoke(Invoker<?> invoker, RpcInvocation invocation) throws 
RpcException;
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenHeaderFilter.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenHeaderFilter.java
new file mode 100644
index 0000000..63b0051
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TokenHeaderFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.filter;
+
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.utils.ConfigUtils;
+import org.apache.dubbo.rpc.HeaderFilter;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+
+import java.util.Map;
+
+import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+import static org.apache.dubbo.rpc.RpcException.FORBIDDEN_EXCEPTION;
+
+@Activate
+public class TokenHeaderFilter implements HeaderFilter {
+    @Override
+    public RpcInvocation invoke(Invoker<?> invoker, RpcInvocation invocation) 
throws RpcException {
+        String token = invoker.getUrl().getParameter(TOKEN_KEY);
+        if (ConfigUtils.isNotEmpty(token)) {
+            Class<?> serviceType = invoker.getInterface();
+            Map<String, Object> attachments = 
invocation.getObjectAttachments();
+            String remoteToken = (attachments == null ? null : (String) 
attachments.get(TOKEN_KEY));
+            if (!token.equals(remoteToken)) {
+                throw new RpcException(FORBIDDEN_EXCEPTION, "Forbid invoke 
remote service " + serviceType + " method " + invocation.getMethodName() +
+                    "() from consumer " + 
RpcContext.getServiceContext().getRemoteHost() + " to provider " +
+                    RpcContext.getServiceContext().getLocalHost() + ", 
consumer incorrect token is " + remoteToken);
+            }
+        }
+        return invocation;
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.HeaderFilter
 
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.HeaderFilter
new file mode 100644
index 0000000..0512e68
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.HeaderFilter
@@ -0,0 +1 @@
+token=org.apache.dubbo.rpc.filter.TokenHeaderFilter
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index ce155ba..c1df0ef 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.remoting.Constants;
+import org.apache.dubbo.rpc.HeaderFilter;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -40,6 +41,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 
+import static 
org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KEY;
+
 public abstract class AbstractServerStream extends AbstractStream implements 
Stream {
 
     protected static final ExecutorRepository EXECUTOR_REPOSITORY =
@@ -47,6 +50,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
     private final ProviderModel providerModel;
     private List<MethodDescriptor> methodDescriptors;
     private Invoker<?> invoker;
+    private List<HeaderFilter> headerFilters;
 
     protected AbstractServerStream(URL url) {
         this(url, lookupProviderModel(url));
@@ -60,6 +64,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
         super(url, executor);
         this.providerModel = providerModel;
         this.serialize(getUrl().getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
+        this.headerFilters = 
ExtensionLoader.getExtensionLoader(HeaderFilter.class).getActivateExtension(url,
 HEADER_FILTER_KEY);
     }
 
     private static Executor lookupExecutor(URL url, ProviderModel 
providerModel) {
@@ -107,6 +112,10 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
         return invoker;
     }
 
+    public List<HeaderFilter> getHeaderFilters() {
+        return headerFilters;
+    }
+
     public ProviderModel getProviderModel() {
         return providerModel;
     }
@@ -122,6 +131,9 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
         final Map<String, Object> attachments = 
parseMetadataToAttachmentMap(metadata);
         inv.setObjectAttachments(attachments);
 
+        for (HeaderFilter headerFilter : getHeaderFilters()) {
+            inv = headerFilter.invoke(getInvoker(), inv);
+        }
         return inv;
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 98f3ec3..75e01d9 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -54,8 +54,8 @@ public class ServerStream extends AbstractServerStream 
implements Stream {
         @Override
         public void onError(Throwable throwable) {
             final GrpcStatus status = 
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withCause(throwable)
-                    .withDescription("Biz exception");
+                .withCause(throwable)
+                .withDescription("Biz exception");
             transportError(status);
         }
 
@@ -83,27 +83,31 @@ public class ServerStream extends AbstractServerStream 
implements Stream {
                 subscribe((StreamObserver<Object>) result.getValue());
             } catch (Throwable t) {
                 transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                        .withDescription("Failed to create server's 
observer"));
+                    .withDescription("Failed to create server's observer"));
             }
         }
 
         @Override
         public void onData(byte[] in, boolean endStream, OperationHandler 
handler) {
             try {
-                final Object[] arguments = deserializeRequest(in);
-                if (arguments != null) {
-                    if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
-                        final RpcInvocation inv = 
buildInvocation(getHeaders());
+                if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
+                    RpcInvocation inv = buildInvocation(getHeaders());
+                    final Object[] arguments = deserializeRequest(in);
+                    if (arguments != null) {
                         inv.setArguments(new Object[]{arguments[0], 
asStreamObserver()});
                         getInvoker().invoke(inv);
-                    } else {
+                    }
+                } else {
+                    final Object[] arguments = deserializeRequest(in);
+                    if (arguments != null) {
                         getStreamSubscriber().onNext(arguments[0]);
                     }
                 }
-            } catch (Throwable t) {
+            } catch (
+                Throwable t) {
                 transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                        .withDescription("Deserialize request failed")
-                        .withCause(t));
+                    .withDescription("Deserialize request failed")
+                    .withCause(t));
             }
         }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index e7cb9fb..c9de0cd 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -66,11 +66,11 @@ public class UnaryServerStream extends AbstractServerStream 
implements Stream {
 
         public void invoke() {
 
-            final RpcInvocation invocation;
+            RpcInvocation invocation;
             try {
+                invocation = buildInvocation(getHeaders());
                 final Object[] arguments = deserializeRequest(getData());
                 if (arguments != null) {
-                    invocation = buildInvocation(getHeaders());
                     invocation.setArguments(arguments);
                 } else {
                     return;

Reply via email to