This is an automated email from the ASF dual-hosted git repository. victory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push: new 92cafe9 register callback once for the full filter chain. (#4127) 92cafe9 is described below commit 92cafe962d8fd3df034c16629954425f4640f343 Author: ken.lj <ken.lj...@gmail.com> AuthorDate: Fri May 24 14:49:25 2019 +0800 register callback once for the full filter chain. (#4127) --- .../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 18 ++---- .../dubbo/rpc/protocol/ProtocolFilterWrapper.java | 73 +++++++++++++++++----- .../dubbo/internal/org.apache.dubbo.rpc.Filter | 3 +- 3 files changed, 65 insertions(+), 29 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java index 9546e2a..fc26247 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java @@ -137,20 +137,10 @@ public class AsyncRpcResult extends AbstractResult { } public Result thenApplyWithContext(Function<Result, Result> fn) { - CompletableFuture<Result> future = this.thenApply(fn.compose(beforeContext).andThen(afterContext)); - AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this); - nextAsyncRpcResult.subscribeTo(future); - return nextAsyncRpcResult; - } - - public void subscribeTo(CompletableFuture<?> future) { - future.whenComplete((obj, t) -> { - if (t != null) { - this.completeExceptionally(t); - } else { - this.complete((Result) obj); - } - }); + this.thenApply(fn.compose(beforeContext).andThen(afterContext)); + // You may need to return a new Result instance representing the next async stage, + // like thenApply will return a new CompletableFuture. + return this; } @Override diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java index 9ee7d3e..f54d076 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java @@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.RpcException; import java.util.List; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL; - import static org.apache.dubbo.rpc.Constants.REFERENCE_FILTER_KEY; import static org.apache.dubbo.rpc.Constants.SERVICE_FILTER_KEY; @@ -49,9 +48,12 @@ public class ProtocolFilterWrapper implements Protocol { this.protocol = protocol; } + + private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); + if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); @@ -88,18 +90,7 @@ public class ProtocolFilterWrapper implements Protocol { } throw e; } - return asyncResult.thenApplyWithContext(r -> { - // onResponse callback - if (filter instanceof ListenableFilter) { - Filter.Listener listener = ((ListenableFilter) filter).listener(); - if (listener != null) { - listener.onResponse(r, invoker, invocation); - } - } else { - filter.onResponse(r, invoker, invocation); - } - return r; - }); + return asyncResult; } @Override @@ -114,7 +105,8 @@ public class ProtocolFilterWrapper implements Protocol { }; } } - return last; + + return new CallbackRegistrationInvoker<>(last, filters); } @Override @@ -143,4 +135,57 @@ public class ProtocolFilterWrapper implements Protocol { protocol.destroy(); } + static class CallbackRegistrationInvoker<T> implements Invoker<T> { + + private final Invoker<T> filterInvoker; + private final List<Filter> filters; + + public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) { + this.filterInvoker = filterInvoker; + this.filters = filters; + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + Result asyncResult = filterInvoker.invoke(invocation); + + asyncResult.thenApplyWithContext(r -> { + for (int i = filters.size() - 1; i >= 0; i--) { + Filter filter = filters.get(i); + // onResponse callback + if (filter instanceof ListenableFilter) { + Filter.Listener listener = ((ListenableFilter) filter).listener(); + if (listener != null) { + listener.onResponse(r, filterInvoker, invocation); + } + } else { + filter.onResponse(r, filterInvoker, invocation); + } + } + return r; + }); + + return asyncResult; + } + + @Override + public Class<T> getInterface() { + return filterInvoker.getInterface(); + } + + @Override + public URL getUrl() { + return filterInvoker.getUrl(); + } + + @Override + public boolean isAvailable() { + return filterInvoker.isAvailable(); + } + + @Override + public void destroy() { + filterInvoker.destroy(); + } + } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index 376f966..2406521 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -11,4 +11,5 @@ exception=org.apache.dubbo.rpc.filter.ExceptionFilter executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter compatible=org.apache.dubbo.rpc.filter.CompatibleFilter -timeout=org.apache.dubbo.rpc.filter.TimeoutFilter \ No newline at end of file +timeout=org.apache.dubbo.rpc.filter.TimeoutFilter +callback-registration=org.apache.dubbo.rpc.filter.CallbackRegistrationFilter \ No newline at end of file