This is an automated email from the ASF dual-hosted git repository.

iluo pushed a commit to branch 2.7.0-release
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/2.7.0-release by this push:
     new 347f154  Async enhancement (#3184)
347f154 is described below

commit 347f154b7ffe83c3d652c4bf7af6df66cd935864
Author: ken.lj <[email protected]>
AuthorDate: Fri Jan 11 23:28:14 2019 +0800

    Async enhancement (#3184)
    
    * Provider async enhancement, user doesn't have to use 'async=true' on 
provider side.
    
    * only keep zero-args constructor
    
    * stop router tag from being transferred in the RPC chain
    
    * Code review: use 'this.future= new Future()''; remove setAsyncContext() 
from RpcContext
    
    * revert getInternalFuture()
    
    * Fix UT
---
 .../src/main/java/org/apache/dubbo/rpc/AsyncContext.java    | 10 ----------
 .../main/java/org/apache/dubbo/rpc/AsyncContextImpl.java    | 11 ++++-------
 .../src/main/java/org/apache/dubbo/rpc/RpcContext.java      | 13 ++++---------
 .../java/org/apache/dubbo/rpc/filter/ContextFilter.java     |  5 ++++-
 .../org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java    |  3 ++-
 .../src/test/java/org/apache/dubbo/rpc/RpcContextTest.java  | 12 +++---------
 .../org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java  |  6 ------
 7 files changed, 17 insertions(+), 43 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
index cc9ff1a..1f51a54 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
@@ -16,8 +16,6 @@
  */
 package org.apache.dubbo.rpc;
 
-import java.util.concurrent.CompletableFuture;
-
 /**
  * AsyncContext works like {@see javax.servlet.AsyncContext} in the Servlet 
3.0.
  * An AsyncContext is stated by a call to {@link RpcContext#startAsync()}.
@@ -28,14 +26,6 @@ import java.util.concurrent.CompletableFuture;
 public interface AsyncContext {
 
     /**
-     * get the internal future which is binding to this async context
-     *
-     * @return the internal future
-     */
-    // FIXME
-    CompletableFuture getInternalFuture();
-
-    /**
      * write value and complete the async context.
      *
      * @param value invoke result
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
index 77cf0bd..f8f6743 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
@@ -34,10 +34,6 @@ public class AsyncContextImpl implements AsyncContext {
     private RpcContext storedServerContext;
 
     public AsyncContextImpl() {
-    }
-
-    public AsyncContextImpl(CompletableFuture<Object> future) {
-        this.future = future;
         this.storedContext = RpcContext.getContext();
         this.storedServerContext = RpcContext.getServerContext();
     }
@@ -68,7 +64,9 @@ public class AsyncContextImpl implements AsyncContext {
 
     @Override
     public void start() {
-        this.started.set(true);
+        if (this.started.compareAndSet(false, true)) {
+            this.future = new CompletableFuture<>();
+        }
     }
 
     @Override
@@ -78,8 +76,7 @@ public class AsyncContextImpl implements AsyncContext {
         // Restore any other contexts in here if necessary.
     }
 
-    @Override
-    public CompletableFuture getInternalFuture() {
+    public CompletableFuture<Object> getInternalFuture() {
         return future;
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index a9000ec..aaa92c8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -731,12 +731,11 @@ public class RpcContext {
     @SuppressWarnings("unchecked")
     public static AsyncContext startAsync() throws IllegalStateException {
         RpcContext currentContext = getContext();
-        if (currentContext.asyncContext != null) {
-            currentContext.asyncContext.start();
-            return currentContext.asyncContext;
-        } else {
-            throw new IllegalStateException("This service does not support 
asynchronous operations, you should open async explicitly before use.");
+        if (currentContext.asyncContext == null) {
+            currentContext.asyncContext = new AsyncContextImpl();
         }
+        currentContext.asyncContext.start();
+        return currentContext.asyncContext;
     }
 
     public boolean isAsyncStarted() {
@@ -750,10 +749,6 @@ public class RpcContext {
         return asyncContext.stop();
     }
 
-    public void setAsyncContext(AsyncContext asyncContext) {
-        this.asyncContext = asyncContext;
-    }
-
     public AsyncContext getAsyncContext() {
         return asyncContext;
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index 7dd5522..db68ea1 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -49,7 +49,10 @@ public class ContextFilter implements Filter {
             attachments.remove(Constants.DUBBO_VERSION_KEY);
             attachments.remove(Constants.TOKEN_KEY);
             attachments.remove(Constants.TIMEOUT_KEY);
-            attachments.remove(Constants.ASYNC_KEY);// Remove async property 
to avoid being passed to the following invoke chain.
+            // Remove async property to avoid being passed to the following 
invoke chain.
+            attachments.remove(Constants.ASYNC_KEY);
+            attachments.remove(Constants.TAG_KEY);
+            attachments.remove(Constants.FORCE_USE_TAG);
         }
         RpcContext.getContext()
                 .setInvoker(invoker)
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 0f5a5df..6a562d6 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.proxy;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.AsyncContextImpl;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
@@ -86,7 +87,7 @@ public abstract class AbstractProxyInvoker<T> implements 
Invoker<T> {
             if (RpcUtils.isReturnTypeFuture(invocation)) {
                 return new AsyncRpcResult((CompletableFuture<Object>) obj);
             } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of 
RpcContext.startAsync()? always rely on user to write back.
-                return new 
AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
+                return new 
AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
             } else {
                 return new RpcResult(obj);
             }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
index 281f96d..0d4d95f 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
@@ -17,12 +17,12 @@
 package org.apache.dubbo.rpc;
 
 import org.apache.dubbo.common.URL;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 public class RpcContextTest {
 
@@ -142,20 +142,14 @@ public class RpcContextTest {
     @Test
     public void testAsync() {
 
-        CompletableFuture<Object> future = new CompletableFuture<>();
-        AsyncContext asyncContext = new AsyncContextImpl(future);
-
         RpcContext rpcContext = RpcContext.getContext();
         Assert.assertFalse(rpcContext.isAsyncStarted());
 
-        rpcContext.setAsyncContext(asyncContext);
-        Assert.assertFalse(rpcContext.isAsyncStarted());
-
-        RpcContext.startAsync();
+        AsyncContext asyncContext = RpcContext.startAsync();
         Assert.assertTrue(rpcContext.isAsyncStarted());
 
         asyncContext.write(new Object());
-        Assert.assertTrue(future.isDone());
+        
Assert.assertTrue(((AsyncContextImpl)asyncContext).getInternalFuture().isDone());
 
         rpcContext.stopAsync();
         Assert.assertTrue(rpcContext.isAsyncStarted());
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index d3ac220..86eb013 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -34,7 +34,6 @@ import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 import org.apache.dubbo.remoting.exchange.ExchangeServer;
 import org.apache.dubbo.remoting.exchange.Exchangers;
 import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
-import org.apache.dubbo.rpc.AsyncContextImpl;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Exporter;
 import org.apache.dubbo.rpc.Invocation;
@@ -105,11 +104,6 @@ public class DubboProtocol extends AbstractProtocol {
                     }
                 }
                 RpcContext rpcContext = RpcContext.getContext();
-                boolean supportServerAsync = 
invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, 
false);
-                if (supportServerAsync) {
-                    CompletableFuture<Object> future = new 
CompletableFuture<>();
-                    rpcContext.setAsyncContext(new AsyncContextImpl(future));
-                }
                 rpcContext.setRemoteAddress(channel.getRemoteAddress());
                 Result result = invoker.invoke(inv);
 

Reply via email to