This is an automated email from the ASF dual-hosted git repository.
victory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 92cafe9 register callback once for the full filter chain. (#4127)
92cafe9 is described below
commit 92cafe962d8fd3df034c16629954425f4640f343
Author: ken.lj <[email protected]>
AuthorDate: Fri May 24 14:49:25 2019 +0800
register callback once for the full filter chain. (#4127)
---
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 18 ++----
.../dubbo/rpc/protocol/ProtocolFilterWrapper.java | 73 +++++++++++++++++-----
.../dubbo/internal/org.apache.dubbo.rpc.Filter | 3 +-
3 files changed, 65 insertions(+), 29 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 9546e2a..fc26247 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -137,20 +137,10 @@ public class AsyncRpcResult extends AbstractResult {
}
public Result thenApplyWithContext(Function<Result, Result> fn) {
- CompletableFuture<Result> future =
this.thenApply(fn.compose(beforeContext).andThen(afterContext));
- AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this);
- nextAsyncRpcResult.subscribeTo(future);
- return nextAsyncRpcResult;
- }
-
- public void subscribeTo(CompletableFuture<?> future) {
- future.whenComplete((obj, t) -> {
- if (t != null) {
- this.completeExceptionally(t);
- } else {
- this.complete((Result) obj);
- }
- });
+ this.thenApply(fn.compose(beforeContext).andThen(afterContext));
+ // You may need to return a new Result instance representing the next
async stage,
+ // like thenApply will return a new CompletableFuture.
+ return this;
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
index 9ee7d3e..f54d076 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
@@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.RpcException;
import java.util.List;
import static
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
-
import static org.apache.dubbo.rpc.Constants.REFERENCE_FILTER_KEY;
import static org.apache.dubbo.rpc.Constants.SERVICE_FILTER_KEY;
@@ -49,9 +48,12 @@ public class ProtocolFilterWrapper implements Protocol {
this.protocol = protocol;
}
+
+
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker,
String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters =
ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(),
key, group);
+
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
@@ -88,18 +90,7 @@ public class ProtocolFilterWrapper implements Protocol {
}
throw e;
}
- return asyncResult.thenApplyWithContext(r -> {
- // onResponse callback
- if (filter instanceof ListenableFilter) {
- Filter.Listener listener = ((ListenableFilter)
filter).listener();
- if (listener != null) {
- listener.onResponse(r, invoker,
invocation);
- }
- } else {
- filter.onResponse(r, invoker, invocation);
- }
- return r;
- });
+ return asyncResult;
}
@Override
@@ -114,7 +105,8 @@ public class ProtocolFilterWrapper implements Protocol {
};
}
}
- return last;
+
+ return new CallbackRegistrationInvoker<>(last, filters);
}
@Override
@@ -143,4 +135,57 @@ public class ProtocolFilterWrapper implements Protocol {
protocol.destroy();
}
+ static class CallbackRegistrationInvoker<T> implements Invoker<T> {
+
+ private final Invoker<T> filterInvoker;
+ private final List<Filter> filters;
+
+ public CallbackRegistrationInvoker(Invoker<T> filterInvoker,
List<Filter> filters) {
+ this.filterInvoker = filterInvoker;
+ this.filters = filters;
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ Result asyncResult = filterInvoker.invoke(invocation);
+
+ asyncResult.thenApplyWithContext(r -> {
+ for (int i = filters.size() - 1; i >= 0; i--) {
+ Filter filter = filters.get(i);
+ // onResponse callback
+ if (filter instanceof ListenableFilter) {
+ Filter.Listener listener = ((ListenableFilter)
filter).listener();
+ if (listener != null) {
+ listener.onResponse(r, filterInvoker, invocation);
+ }
+ } else {
+ filter.onResponse(r, filterInvoker, invocation);
+ }
+ }
+ return r;
+ });
+
+ return asyncResult;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return filterInvoker.getInterface();
+ }
+
+ @Override
+ public URL getUrl() {
+ return filterInvoker.getUrl();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return filterInvoker.isAvailable();
+ }
+
+ @Override
+ public void destroy() {
+ filterInvoker.destroy();
+ }
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 376f966..2406521 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -11,4 +11,5 @@ exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
-timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
\ No newline at end of file
+timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
+callback-registration=org.apache.dubbo.rpc.filter.CallbackRegistrationFilter
\ No newline at end of file