This is an automated email from the ASF dual-hosted git repository. liujieqin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push: new 85ad18f bugfix:fix alibaba RpcContext setAttachment(String,String) (#6052) 85ad18f is described below commit 85ad18fe22c447568f1c9bfb1c58d645c3dd7b9a Author: ken.lj <ken.lj...@gmail.com> AuthorDate: Mon Apr 27 14:24:08 2020 +0800 bugfix:fix alibaba RpcContext setAttachment(String,String) (#6052) --- .../java/com/alibaba/dubbo/rpc/RpcContext.java | 371 +++++++++++++++++++-- 1 file changed, 348 insertions(+), 23 deletions(-) diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java index db3c7cb..08cc647 100644 --- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java +++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java @@ -17,45 +17,371 @@ package com.alibaba.dubbo.rpc; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.rpc.FutureContext; +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Deprecated -public class RpcContext extends org.apache.dubbo.rpc.RpcContext { +public class RpcContext { public static RpcContext getContext() { - return newInstance(org.apache.dubbo.rpc.RpcContext.getContext()); + return new RpcContext(org.apache.dubbo.rpc.RpcContext.getContext()); } - private static RpcContext newInstance(org.apache.dubbo.rpc.RpcContext rpcContext) { - RpcContext copy = new RpcContext(); - copy.getAttachments().putAll(rpcContext.getAttachments()); - copy.get().putAll(rpcContext.get()); + public static RpcContext getServerContext() { + return new RpcContext(org.apache.dubbo.rpc.RpcContext.getServerContext()); + } + + public static void removeServerContext() { + org.apache.dubbo.rpc.RpcContext.removeServerContext(); + } + + public static void removeContext() { + org.apache.dubbo.rpc.RpcContext.removeContext(); + } + + private org.apache.dubbo.rpc.RpcContext newRpcContext; + + public RpcContext(org.apache.dubbo.rpc.RpcContext newRpcContext) { + this.newRpcContext = newRpcContext; + } + + public Object getRequest() { + return newRpcContext.getRequest(); + } + + public <T> T getRequest(Class<T> clazz) { + return newRpcContext.getRequest(clazz); + } + + + public void setRequest(Object request) { + newRpcContext.setRequest(request); + } + + /** + * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse + * + * @return null if the underlying protocol doesn't provide support for getting response + */ + public Object getResponse() { + return newRpcContext.getResponse(); + } + + /** + * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse + * + * @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type + */ + @SuppressWarnings("unchecked") + public <T> T getResponse(Class<T> clazz) { + return newRpcContext.getResponse(clazz); + } + + public void setResponse(Object response) { + newRpcContext.setResponse(response); + } + + /** + * is provider side. + * + * @return provider side. + */ + public boolean isProviderSide() { + return newRpcContext.isProviderSide(); + } + + /** + * is consumer side. + * + * @return consumer side. + */ + public boolean isConsumerSide() { + return newRpcContext.isConsumerSide(); + } + + public List<URL> getUrls() { + List<org.apache.dubbo.common.URL> newUrls = newRpcContext.getUrls(); + if (CollectionUtils.isNotEmpty(newUrls)) { + List<URL> urls = new ArrayList<>(newUrls.size()); + for (org.apache.dubbo.common.URL newUrl : newUrls) { + urls.add(new URL(newUrl)); + } + return urls; + } + return Collections.emptyList(); + } + + public void setUrls(List<URL> urls) { + if (CollectionUtils.isNotEmpty(urls)) { + List<org.apache.dubbo.common.URL> newUrls = new ArrayList<>(urls.size()); + for (URL url : urls) { + newUrls.add(url.getOriginalURL()); + } + newRpcContext.setUrls(newUrls); + } + } + + public URL getUrl() { + return new URL(newRpcContext.getUrl()); + } + + public void setUrl(URL url) { + newRpcContext.setUrl(url.getOriginalURL()); + } + + public String getMethodName() { + return newRpcContext.getMethodName(); + } + + public void setMethodName(String methodName) { + newRpcContext.setMethodName(methodName); + } + + public Class<?>[] getParameterTypes() { + return newRpcContext.getParameterTypes(); + } + + public void setParameterTypes(Class<?>[] parameterTypes) { + newRpcContext.setParameterTypes(parameterTypes); + } + + public Object[] getArguments() { + return newRpcContext.getArguments(); + } + + public void setArguments(Object[] arguments) { + newRpcContext.setArguments(arguments); + } + + public RpcContext setLocalAddress(String host, int port) { + newRpcContext.setLocalAddress(host, port); + return this; + } + + /** + * get local address. + * + * @return local address + */ + public InetSocketAddress getLocalAddress() { + return newRpcContext.getLocalAddress(); + } + + public RpcContext setLocalAddress(InetSocketAddress address) { + newRpcContext.setLocalAddress(address); + return this; + } - copy.setUrls(rpcContext.getUrls()); - copy.setUrl(rpcContext.getUrl()); - copy.setMethodName(rpcContext.getMethodName()); - copy.setParameterTypes(rpcContext.getParameterTypes()); - copy.setArguments(rpcContext.getArguments()); - copy.setLocalAddress(rpcContext.getLocalAddress()); - copy.setRemoteAddress(rpcContext.getRemoteAddress()); - copy.setRemoteApplicationName(rpcContext.getRemoteApplicationName()); - copy.setInvokers(rpcContext.getInvokers()); - copy.setInvoker(rpcContext.getInvoker()); - copy.setInvocation(rpcContext.getInvocation()); + public String getLocalAddressString() { + return newRpcContext.getLocalAddressString(); + } + + public String getLocalHostName() { + return newRpcContext.getLocalHostName(); + } + + public RpcContext setRemoteAddress(String host, int port) { + newRpcContext.setRemoteAddress(host, port); + return this; + } + + public InetSocketAddress getRemoteAddress() { + return newRpcContext.getRemoteAddress(); + } - copy.setRequest(rpcContext.getRequest()); - copy.setResponse(rpcContext.getResponse()); - copy.setAsyncContext(rpcContext.getAsyncContext()); + public RpcContext setRemoteAddress(InetSocketAddress address) { + newRpcContext.setRemoteAddress(address); + return this; + } + + public String getRemoteAddressString() { + return newRpcContext.getRemoteAddressString(); + } + + public String getRemoteHostName() { + return newRpcContext.getRemoteHostName(); + } + + public String getLocalHost() { + return newRpcContext.getLocalHost(); + } - return copy; + public int getLocalPort() { + return newRpcContext.getLocalPort(); + } + + public String getRemoteHost() { + return newRpcContext.getRemoteHost(); + } + + public int getRemotePort() { + return newRpcContext.getRemotePort(); + } + + public String getAttachment(String key) { + return newRpcContext.getAttachment(key); + } + + public RpcContext setAttachment(String key, String value) { + newRpcContext.setAttachment(key, value); + return this; + } + + public RpcContext removeAttachment(String key) { + newRpcContext.removeAttachment(key); + return this; + } + + public Map<String, String> getAttachments() { + return newRpcContext.getAttachments(); + } + + public RpcContext setAttachments(Map<String, String> attachment) { + newRpcContext.setAttachments(attachment); + return this; + } + + public void clearAttachments() { + newRpcContext.clearAttachments(); + } + + /** + * get values. + * + * @return values + */ + public Map<String, Object> get() { + return newRpcContext.get(); + } + + /** + * set value. + * + * @param key + * @param value + * @return context + */ + public RpcContext set(String key, Object value) { + newRpcContext.set(key, value); + return this; + } + + public RpcContext remove(String key) { + newRpcContext.remove(key); + return this; + } + + public Object get(String key) { + return newRpcContext.get(key); + } + + @Deprecated + public boolean isServerSide() { + return isProviderSide(); + } + + @Deprecated + public boolean isClientSide() { + return isConsumerSide(); + } + + /** + * Async invocation. Timeout will be handled even if <code>Future.get()</code> is not called. + * + * @param callable + * @return get the return result from <code>future.get()</code> + */ + @SuppressWarnings("unchecked") + public <T> Future<T> asyncCall(Callable<T> callable) { + try { + try { + setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); + final T o = callable.call(); + //local invoke will return directly + if (o != null) { + FutureTask<T> f = new FutureTask<T>(new Callable<T>() { + @Override + public T call() throws Exception { + return o; + } + }); + f.run(); + return f; + } else { + + } + } catch (Exception e) { + throw new RpcException(e); + } finally { + removeAttachment(Constants.ASYNC_KEY); + } + } catch (final RpcException e) { + return new Future<T>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + throw new ExecutionException(e.getCause()); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, + TimeoutException { + return get(); + } + }; + } + return ((Future<T>) getContext().getFuture()); + } + + /** + * one way async call, send request only, and result is not required + * + * @param runnable + */ + public void asyncCall(Runnable runnable) { + try { + setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString()); + runnable.run(); + } catch (Throwable e) { + // FIXME should put exception in future? + throw new RpcException("oneway call error ." + e.getMessage(), e); + } finally { + removeAttachment(Constants.RETURN_KEY); + } } - @Override public <T> Future<T> getFuture() { CompletableFuture completableFuture = FutureContext.getContext().getCompatibleCompletableFuture(); if (completableFuture == null) { @@ -64,7 +390,6 @@ public class RpcContext extends org.apache.dubbo.rpc.RpcContext { return new FutureAdapter(completableFuture); } - @Override public void setFuture(CompletableFuture<?> future) { FutureContext.getContext().setCompatibleFuture(future); }