This is an automated email from the ASF dual-hosted git repository.
iluo pushed a commit to branch 2.7.0-release
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/2.7.0-release by this push:
new 347f154 Async enhancement (#3184)
347f154 is described below
commit 347f154b7ffe83c3d652c4bf7af6df66cd935864
Author: ken.lj <[email protected]>
AuthorDate: Fri Jan 11 23:28:14 2019 +0800
Async enhancement (#3184)
* Provider async enhancement, user doesn't have to use 'async=true' on
provider side.
* only keep zero-args constructor
* stop router tag from being transferred in the RPC chain
* Code review: use 'this.future= new Future()''; remove setAsyncContext()
from RpcContext
* revert getInternalFuture()
* Fix UT
---
.../src/main/java/org/apache/dubbo/rpc/AsyncContext.java | 10 ----------
.../main/java/org/apache/dubbo/rpc/AsyncContextImpl.java | 11 ++++-------
.../src/main/java/org/apache/dubbo/rpc/RpcContext.java | 13 ++++---------
.../java/org/apache/dubbo/rpc/filter/ContextFilter.java | 5 ++++-
.../org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java | 3 ++-
.../src/test/java/org/apache/dubbo/rpc/RpcContextTest.java | 12 +++---------
.../org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java | 6 ------
7 files changed, 17 insertions(+), 43 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
index cc9ff1a..1f51a54 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
@@ -16,8 +16,6 @@
*/
package org.apache.dubbo.rpc;
-import java.util.concurrent.CompletableFuture;
-
/**
* AsyncContext works like {@see javax.servlet.AsyncContext} in the Servlet
3.0.
* An AsyncContext is stated by a call to {@link RpcContext#startAsync()}.
@@ -28,14 +26,6 @@ import java.util.concurrent.CompletableFuture;
public interface AsyncContext {
/**
- * get the internal future which is binding to this async context
- *
- * @return the internal future
- */
- // FIXME
- CompletableFuture getInternalFuture();
-
- /**
* write value and complete the async context.
*
* @param value invoke result
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
index 77cf0bd..f8f6743 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
@@ -34,10 +34,6 @@ public class AsyncContextImpl implements AsyncContext {
private RpcContext storedServerContext;
public AsyncContextImpl() {
- }
-
- public AsyncContextImpl(CompletableFuture<Object> future) {
- this.future = future;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}
@@ -68,7 +64,9 @@ public class AsyncContextImpl implements AsyncContext {
@Override
public void start() {
- this.started.set(true);
+ if (this.started.compareAndSet(false, true)) {
+ this.future = new CompletableFuture<>();
+ }
}
@Override
@@ -78,8 +76,7 @@ public class AsyncContextImpl implements AsyncContext {
// Restore any other contexts in here if necessary.
}
- @Override
- public CompletableFuture getInternalFuture() {
+ public CompletableFuture<Object> getInternalFuture() {
return future;
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index a9000ec..aaa92c8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -731,12 +731,11 @@ public class RpcContext {
@SuppressWarnings("unchecked")
public static AsyncContext startAsync() throws IllegalStateException {
RpcContext currentContext = getContext();
- if (currentContext.asyncContext != null) {
- currentContext.asyncContext.start();
- return currentContext.asyncContext;
- } else {
- throw new IllegalStateException("This service does not support
asynchronous operations, you should open async explicitly before use.");
+ if (currentContext.asyncContext == null) {
+ currentContext.asyncContext = new AsyncContextImpl();
}
+ currentContext.asyncContext.start();
+ return currentContext.asyncContext;
}
public boolean isAsyncStarted() {
@@ -750,10 +749,6 @@ public class RpcContext {
return asyncContext.stop();
}
- public void setAsyncContext(AsyncContext asyncContext) {
- this.asyncContext = asyncContext;
- }
-
public AsyncContext getAsyncContext() {
return asyncContext;
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index 7dd5522..db68ea1 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -49,7 +49,10 @@ public class ContextFilter implements Filter {
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
- attachments.remove(Constants.ASYNC_KEY);// Remove async property
to avoid being passed to the following invoke chain.
+ // Remove async property to avoid being passed to the following
invoke chain.
+ attachments.remove(Constants.ASYNC_KEY);
+ attachments.remove(Constants.TAG_KEY);
+ attachments.remove(Constants.FORCE_USE_TAG);
}
RpcContext.getContext()
.setInvoker(invoker)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 0f5a5df..6a562d6 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.proxy;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.AsyncContextImpl;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -86,7 +87,7 @@ public abstract class AbstractProxyInvoker<T> implements
Invoker<T> {
if (RpcUtils.isReturnTypeFuture(invocation)) {
return new AsyncRpcResult((CompletableFuture<Object>) obj);
} else if (rpcContext.isAsyncStarted()) { // ignore obj in case of
RpcContext.startAsync()? always rely on user to write back.
- return new
AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
+ return new
AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
} else {
return new RpcResult(obj);
}
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
index 281f96d..0d4d95f 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
@@ -17,12 +17,12 @@
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.URL;
+
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
public class RpcContextTest {
@@ -142,20 +142,14 @@ public class RpcContextTest {
@Test
public void testAsync() {
- CompletableFuture<Object> future = new CompletableFuture<>();
- AsyncContext asyncContext = new AsyncContextImpl(future);
-
RpcContext rpcContext = RpcContext.getContext();
Assert.assertFalse(rpcContext.isAsyncStarted());
- rpcContext.setAsyncContext(asyncContext);
- Assert.assertFalse(rpcContext.isAsyncStarted());
-
- RpcContext.startAsync();
+ AsyncContext asyncContext = RpcContext.startAsync();
Assert.assertTrue(rpcContext.isAsyncStarted());
asyncContext.write(new Object());
- Assert.assertTrue(future.isDone());
+
Assert.assertTrue(((AsyncContextImpl)asyncContext).getInternalFuture().isDone());
rpcContext.stopAsync();
Assert.assertTrue(rpcContext.isAsyncStarted());
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index d3ac220..86eb013 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -34,7 +34,6 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
-import org.apache.dubbo.rpc.AsyncContextImpl;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
@@ -105,11 +104,6 @@ public class DubboProtocol extends AbstractProtocol {
}
}
RpcContext rpcContext = RpcContext.getContext();
- boolean supportServerAsync =
invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY,
false);
- if (supportServerAsync) {
- CompletableFuture<Object> future = new
CompletableFuture<>();
- rpcContext.setAsyncContext(new AsyncContextImpl(future));
- }
rpcContext.setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);