This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cdb2f0  Merge pull request #3141, optimize outbound event and some 
code formatting.
9cdb2f0 is described below

commit 9cdb2f0882c95272ab8e76698acdd0a6ae9d35be
Author: 时无两丶 <[email protected]>
AuthorDate: Thu Jan 24 20:15:14 2019 +0800

    Merge pull request #3141, optimize outbound event and some code formatting.
---
 .../org/apache/dubbo/common/bytecode/Proxy.java    | 19 ++-----
 .../dubbo/registry/support/AbstractRegistry.java   | 63 ++++++++++++----------
 .../registry/multicast/MulticastRegistry.java      |  2 +-
 .../apache/dubbo/registry/redis/RedisRegistry.java | 56 +++++++------------
 .../transport/netty4/NettyClientHandler.java       | 62 +++++++++++++--------
 5 files changed, 98 insertions(+), 104 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java
index d29b9f7..e72faae 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java
@@ -39,12 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 
 public abstract class Proxy {
-    public static final InvocationHandler RETURN_NULL_INVOKER = new 
InvocationHandler() {
-        @Override
-        public Object invoke(Object proxy, Method method, Object[] args) {
-            return null;
-        }
-    };
+    public static final InvocationHandler RETURN_NULL_INVOKER = (proxy, 
method, args) -> null;
     public static final InvocationHandler THROW_UNSUPPORTED_INVOKER = new 
InvocationHandler() {
         @Override
         public Object invoke(Object proxy, Method method, Object[] args) {
@@ -108,11 +103,7 @@ public abstract class Proxy {
         // get cache by class loader.
         Map<String, Object> cache;
         synchronized (ProxyCacheMap) {
-            cache = ProxyCacheMap.get(cl);
-            if (cache == null) {
-                cache = new HashMap<String, Object>();
-                ProxyCacheMap.put(cl, cache);
-            }
+            cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
         }
 
         Proxy proxy = null;
@@ -145,8 +136,8 @@ public abstract class Proxy {
         try {
             ccp = ClassGenerator.newInstance(cl);
 
-            Set<String> worked = new HashSet<String>();
-            List<Method> methods = new ArrayList<Method>();
+            Set<String> worked = new HashSet<>();
+            List<Method> methods = new ArrayList<>();
 
             for (int i = 0; i < ics.length; i++) {
                 if (!Modifier.isPublic(ics[i].getModifiers())) {
@@ -176,7 +167,7 @@ public abstract class Proxy {
                     for (int j = 0; j < pts.length; j++) {
                         code.append(" args[").append(j).append("] = 
($w)$").append(j + 1).append(";");
                     }
-                    code.append(" Object ret = handler.invoke(this, methods[" 
+ ix + "], args);");
+                    code.append(" Object ret = handler.invoke(this, 
methods[").append(ix).append("], args);");
                     if (!Void.TYPE.equals(rt)) {
                         code.append(" return ").append(asArgument(rt, 
"ret")).append(";");
                     }
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index 4ea6108..de58981 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.ConcurrentHashSet;
 import org.apache.dubbo.common.utils.ConfigUtils;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
@@ -52,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * AbstractRegistry. (SPI, Prototype, ThreadSafe)
- *
  */
 public abstract class AbstractRegistry implements Registry {
 
@@ -62,16 +62,16 @@ public abstract class AbstractRegistry implements Registry {
     private static final String URL_SPLIT = "\\s+";
     // Log output
     protected final Logger logger = LoggerFactory.getLogger(getClass());
-    // Local disk cache, where the special key value.registies records the 
list of registry centers, and the others are the list of notified service 
providers
+    // Local disk cache, where the special key value.registries records the 
list of registry centers, and the others are the list of notified service 
providers
     private final Properties properties = new Properties();
     // File cache timing writing
     private final ExecutorService registryCacheExecutor = 
Executors.newFixedThreadPool(1, new 
NamedThreadFactory("DubboSaveRegistryCache", true));
     // Is it synchronized to save the file
     private final boolean syncSaveFile;
     private final AtomicLong lastCacheChanged = new AtomicLong();
-    private final Set<URL> registered = new ConcurrentHashSet<URL>();
-    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new 
ConcurrentHashMap<URL, Set<NotifyListener>>();
-    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new 
ConcurrentHashMap<URL, Map<String, List<URL>>>();
+    private final Set<URL> registered = new ConcurrentHashSet<>();
+    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new 
ConcurrentHashMap<>();
     private URL registryUrl;
     // Local disk cache file
     private File file;
@@ -91,13 +91,15 @@ public abstract class AbstractRegistry implements Registry {
             }
         }
         this.file = file;
+        // When starting the subscription center,
+        // we need to read the local cache file for future Registry fault 
tolerance processing.
         loadProperties();
         notify(url.getBackupUrls());
     }
 
     protected static List<URL> filterEmpty(URL url, List<URL> urls) {
-        if (urls == null || urls.isEmpty()) {
-            List<URL> result = new ArrayList<URL>(1);
+        if (CollectionUtils.isEmpty(urls)) {
+            List<URL> result = new ArrayList<>(1);
             result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
             return result;
         }
@@ -222,7 +224,7 @@ public abstract class AbstractRegistry implements Registry {
                     && (Character.isLetter(key.charAt(0)) || key.charAt(0) == 
'_')
                     && value != null && value.length() > 0) {
                 String[] arr = value.trim().split(URL_SPLIT);
-                List<URL> urls = new ArrayList<URL>();
+                List<URL> urls = new ArrayList<>();
                 for (String u : arr) {
                     urls.add(URL.valueOf(u));
                 }
@@ -234,7 +236,7 @@ public abstract class AbstractRegistry implements Registry {
 
     @Override
     public List<URL> lookup(URL url) {
-        List<URL> result = new ArrayList<URL>();
+        List<URL> result = new ArrayList<>();
         Map<String, List<URL>> notifiedUrls = getNotified().get(url);
         if (notifiedUrls != null && notifiedUrls.size() > 0) {
             for (List<URL> urls : notifiedUrls.values()) {
@@ -245,7 +247,7 @@ public abstract class AbstractRegistry implements Registry {
                 }
             }
         } else {
-            final AtomicReference<List<URL>> reference = new 
AtomicReference<List<URL>>();
+            final AtomicReference<List<URL>> reference = new 
AtomicReference<>();
             NotifyListener listener = reference::set;
             subscribe(url, listener); // Subscribe logic guarantees the first 
notify to return
             List<URL> urls = reference.get();
@@ -293,10 +295,7 @@ public abstract class AbstractRegistry implements Registry 
{
         if (logger.isInfoEnabled()) {
             logger.info("Subscribe: " + url);
         }
-
-        Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, k -> {
-            return new ConcurrentHashSet<>();
-        });
+        Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> 
new ConcurrentHashSet<>());
         listeners.add(listener);
     }
 
@@ -319,7 +318,7 @@ public abstract class AbstractRegistry implements Registry {
 
     protected void recover() throws Exception {
         // register
-        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
+        Set<URL> recoverRegistered = new HashSet<>(getRegistered());
         if (!recoverRegistered.isEmpty()) {
             if (logger.isInfoEnabled()) {
                 logger.info("Recover register url " + recoverRegistered);
@@ -329,7 +328,7 @@ public abstract class AbstractRegistry implements Registry {
             }
         }
         // subscribe
-        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, 
Set<NotifyListener>>(getSubscribed());
+        Map<URL, Set<NotifyListener>> recoverSubscribed = new 
HashMap<>(getSubscribed());
         if (!recoverSubscribed.isEmpty()) {
             if (logger.isInfoEnabled()) {
                 logger.info("Recover subscribe url " + 
recoverSubscribed.keySet());
@@ -344,7 +343,7 @@ public abstract class AbstractRegistry implements Registry {
     }
 
     protected void notify(List<URL> urls) {
-        if (urls == null || urls.isEmpty()) {
+        if (CollectionUtils.isEmpty(urls)) {
             return;
         }
 
@@ -368,6 +367,13 @@ public abstract class AbstractRegistry implements Registry 
{
         }
     }
 
+    /**
+     * Notify changes from the Provider side.
+     *
+     * @param url      consumer side url
+     * @param listener listener
+     * @param urls     provider latest urls
+     */
     protected void notify(URL url, NotifyListener listener, List<URL> urls) {
         if (url == null) {
             throw new IllegalArgumentException("notify url == null");
@@ -375,7 +381,7 @@ public abstract class AbstractRegistry implements Registry {
         if (listener == null) {
             throw new IllegalArgumentException("notify listener == null");
         }
-        if ((urls == null || urls.isEmpty())
+        if ((CollectionUtils.isEmpty(urls))
                 && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
             logger.warn("Ignore empty notify urls for subscribe url " + url);
             return;
@@ -383,28 +389,27 @@ public abstract class AbstractRegistry implements 
Registry {
         if (logger.isInfoEnabled()) {
             logger.info("Notify urls for subscribe url " + url + ", urls: " + 
urls);
         }
-        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
+        // keep every provider's category.
+        Map<String, List<URL>> result = new HashMap<>();
         for (URL u : urls) {
             if (UrlUtils.isMatch(url, u)) {
                 String category = u.getParameter(Constants.CATEGORY_KEY, 
Constants.DEFAULT_CATEGORY);
-                List<URL> categoryList = result.computeIfAbsent(category, k -> 
{
-                    return new ArrayList<>();
-                });
+                List<URL> categoryList = result.computeIfAbsent(category, k -> 
new ArrayList<>());
                 categoryList.add(u);
             }
         }
         if (result.size() == 0) {
             return;
         }
-        Map<String, List<URL>> categoryNotified = 
notified.computeIfAbsent(url, k -> {
-            return new ConcurrentHashMap<>();
-        });
+        Map<String, List<URL>> categoryNotified = 
notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
         for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
             String category = entry.getKey();
             List<URL> categoryList = entry.getValue();
             categoryNotified.put(category, categoryList);
-            saveProperties(url);
             listener.notify(categoryList);
+            // We will update our cache file after each notification.
+            // When our Registry has a subscribe failure due to network 
jitter, we can return at least the existing cache URL.
+            saveProperties(url);
         }
     }
 
@@ -443,9 +448,9 @@ public abstract class AbstractRegistry implements Registry {
         if (logger.isInfoEnabled()) {
             logger.info("Destroy registry:" + getUrl());
         }
-        Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
+        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
         if (!destroyRegistered.isEmpty()) {
-            for (URL url : new HashSet<URL>(getRegistered())) {
+            for (URL url : new HashSet<>(getRegistered())) {
                 if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                     try {
                         unregister(url);
@@ -458,7 +463,7 @@ public abstract class AbstractRegistry implements Registry {
                 }
             }
         }
-        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, 
Set<NotifyListener>>(getSubscribed());
+        Map<URL, Set<NotifyListener>> destroySubscribed = new 
HashMap<>(getSubscribed());
         if (!destroySubscribed.isEmpty()) {
             for (Map.Entry<URL, Set<NotifyListener>> entry : 
destroySubscribed.entrySet()) {
                 URL url = entry.getKey();
diff --git 
a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
 
b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
index 5498e9e..de15e25 100644
--- 
a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
@@ -396,7 +396,7 @@ public class MulticastRegistry extends FailbackRegistry {
 
     @Override
     public List<URL> lookup(URL url) {
-        List<URL> urls = new ArrayList<URL>();
+        List<URL> urls = new ArrayList<>();
         Map<String, List<URL>> notifiedUrls = getNotified().get(url);
         if (notifiedUrls != null && notifiedUrls.size() > 0) {
             for (List<URL> values : notifiedUrls.values()) {
diff --git 
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
 
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
index 589e92f..e9dde9b 100644
--- 
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.common.utils.StringUtils;
@@ -38,6 +39,7 @@ import redis.clients.jedis.JedisPubSub;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -156,14 +158,11 @@ public class RedisRegistry extends FailbackRegistry {
         this.root = group;
 
         this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, 
Constants.DEFAULT_SESSION_TIMEOUT);
-        this.expireFuture = expireExecutor.scheduleWithFixedDelay(new 
Runnable() {
-            @Override
-            public void run() {
-                try {
-                    deferExpired(); // Extend the expiration time
-                } catch (Throwable t) { // Defensive fault tolerance
-                    logger.error("Unexpected exception occur at defer expire 
time, cause: " + t.getMessage(), t);
-                }
+        this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
+            try {
+                deferExpired(); // Extend the expiration time
+            } catch (Throwable t) { // Defensive fault tolerance
+                logger.error("Unexpected exception occur at defer expire time, 
cause: " + t.getMessage(), t);
             }
         }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
     }
@@ -172,9 +171,8 @@ public class RedisRegistry extends FailbackRegistry {
         for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
             JedisPool jedisPool = entry.getValue();
             try {
-                Jedis jedis = jedisPool.getResource();
-                try {
-                    for (URL url : new HashSet<URL>(getRegistered())) {
+                try (Jedis jedis = jedisPool.getResource()) {
+                    for (URL url : new HashSet<>(getRegistered())) {
                         if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                             String key = toCategoryPath(url);
                             if (jedis.hset(key, url.toFullString(), 
String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
@@ -188,8 +186,6 @@ public class RedisRegistry extends FailbackRegistry {
                     if (!replicate) {
                         break;//  If the server side has synchronized data, 
just write a single machine
                     }
-                } finally {
-                    jedis.close();
                 }
             } catch (Throwable t) {
                 logger.warn("Failed to write provider heartbeat to redis 
registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
@@ -231,13 +227,10 @@ public class RedisRegistry extends FailbackRegistry {
     public boolean isAvailable() {
         for (JedisPool jedisPool : jedisPools.values()) {
             try {
-                Jedis jedis = jedisPool.getResource();
-                try {
+                try (Jedis jedis = jedisPool.getResource()) {
                     if (jedis.isConnected()) {
                         return true; // At least one single machine is 
available.
                     }
-                } finally {
-                    jedis.close();
                 }
             } catch (Throwable t) {
             }
@@ -281,16 +274,13 @@ public class RedisRegistry extends FailbackRegistry {
         for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
             JedisPool jedisPool = entry.getValue();
             try {
-                Jedis jedis = jedisPool.getResource();
-                try {
+                try (Jedis jedis = jedisPool.getResource()) {
                     jedis.hset(key, value, expire);
                     jedis.publish(key, Constants.REGISTER);
                     success = true;
                     if (!replicate) {
                         break; //  If the server side has synchronized data, 
just write a single machine
                     }
-                } finally {
-                    jedis.close();
                 }
             } catch (Throwable t) {
                 exception = new RpcException("Failed to register service to 
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: 
" + t.getMessage(), t);
@@ -314,16 +304,13 @@ public class RedisRegistry extends FailbackRegistry {
         for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
             JedisPool jedisPool = entry.getValue();
             try {
-                Jedis jedis = jedisPool.getResource();
-                try {
+                try (Jedis jedis = jedisPool.getResource()) {
                     jedis.hdel(key, value);
                     jedis.publish(key, Constants.UNREGISTER);
                     success = true;
                     if (!replicate) {
                         break; //  If the server side has synchronized data, 
just write a single machine
                     }
-                } finally {
-                    jedis.close();
                 }
             } catch (Throwable t) {
                 exception = new RpcException("Failed to unregister service to 
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: 
" + t.getMessage(), t);
@@ -355,8 +342,7 @@ public class RedisRegistry extends FailbackRegistry {
         for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
             JedisPool jedisPool = entry.getValue();
             try {
-                Jedis jedis = jedisPool.getResource();
-                try {
+                try (Jedis jedis = jedisPool.getResource()) {
                     if (service.endsWith(Constants.ANY_VALUE)) {
                         admin = true;
                         Set<String> keys = jedis.keys(service);
@@ -364,24 +350,18 @@ public class RedisRegistry extends FailbackRegistry {
                             Map<String, Set<String>> serviceKeys = new 
HashMap<>();
                             for (String key : keys) {
                                 String serviceKey = toServicePath(key);
-                                Set<String> sk = serviceKeys.get(serviceKey);
-                                if (sk == null) {
-                                    sk = new HashSet<>();
-                                    serviceKeys.put(serviceKey, sk);
-                                }
+                                Set<String> sk = 
serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
                                 sk.add(key);
                             }
                             for (Set<String> sk : serviceKeys.values()) {
-                                doNotify(jedis, sk, url, 
Arrays.asList(listener));
+                                doNotify(jedis, sk, url, 
Collections.singletonList(listener));
                             }
                         }
                     } else {
-                        doNotify(jedis, jedis.keys(service + 
Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener));
+                        doNotify(jedis, jedis.keys(service + 
Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, 
Collections.singletonList(listener));
                     }
                     success = true;
                     break; // Just read one server's data
-                } finally {
-                    jedis.close();
                 }
             } catch (Throwable t) { // Try the next server
                 exception = new RpcException("Failed to subscribe service from 
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: 
" + t.getMessage(), t);
@@ -402,7 +382,7 @@ public class RedisRegistry extends FailbackRegistry {
 
     private void doNotify(Jedis jedis, String key) {
         for (Map.Entry<URL, Set<NotifyListener>> entry : new 
HashMap<>(getSubscribed()).entrySet()) {
-            doNotify(jedis, Arrays.asList(key), entry.getKey(), new 
HashSet<>(entry.getValue()));
+            doNotify(jedis, Collections.singletonList(key), entry.getKey(), 
new HashSet<>(entry.getValue()));
         }
     }
 
@@ -450,7 +430,7 @@ public class RedisRegistry extends FailbackRegistry {
                 logger.info("redis notify: " + key + " = " + urls);
             }
         }
-        if (result == null || result.isEmpty()) {
+        if (CollectionUtils.isEmpty(result)) {
             return;
         }
         for (NotifyListener listener : listeners) {
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
index 2d89811..265d6ef 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
@@ -16,16 +16,15 @@
  */
 package org.apache.dubbo.remoting.transport.netty4;
 
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-
 /**
  * NettyClientHandler
  */
@@ -47,7 +46,6 @@ public class NettyClientHandler extends ChannelDuplexHandler {
         this.handler = handler;
     }
 
-
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), 
url, handler);
@@ -78,27 +76,33 @@ public class NettyClientHandler extends 
ChannelDuplexHandler {
         }
     }
 
-
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
         super.write(ctx, msg, promise);
-        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), 
url, handler);
-        try {
-            // if error happens from write, mock a BAD_REQUEST response so 
that invoker can return immediately without
-            // waiting until timeout. FIXME: not sure if this is the right 
approach, but exceptionCaught doesn't work
-            // as expected.
-            if (promise.cause() != null && msg instanceof Request) {
-                Request request = (Request) msg;
-                Response response = new Response(request.getId(), 
request.getVersion());
-                response.setStatus(Response.BAD_REQUEST);
-                
response.setErrorMessage(StringUtils.toString(promise.cause()));
-                handler.received(channel, response);
-            } else {
-                handler.sent(channel, msg);
+        final NettyChannel channel = 
NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+        final boolean isRequest = msg instanceof Request;
+
+        // We add listeners to make sure our out bound event is correct.
+        // If our out bound event has an error (in most cases the encoder 
fails),
+        // we need to have the request return directly instead of blocking the 
invoke process.
+        promise.addListener(future -> {
+            try {
+                if (future.isSuccess()) {
+                    // if our future is success, mark the future to sent.
+                    handler.sent(channel, msg);
+                    return;
+                }
+
+                Throwable t = future.cause();
+                if (t != null && isRequest) {
+                    Request request = (Request) msg;
+                    Response response = buildErrorResponse(request, t);
+                    handler.received(channel, response);
+                }
+            } finally {
+                NettyChannel.removeChannelIfDisconnected(ctx.channel());
             }
-        } finally {
-            NettyChannel.removeChannelIfDisconnected(ctx.channel());
-        }
+        });
     }
 
     @Override
@@ -111,4 +115,18 @@ public class NettyClientHandler extends 
ChannelDuplexHandler {
             NettyChannel.removeChannelIfDisconnected(ctx.channel());
         }
     }
+
+    /**
+     * build a bad request's response
+     *
+     * @param request the request
+     * @param t       the throwable. In most cases, serialization fails.
+     * @return the response
+     */
+    private static Response buildErrorResponse(Request request, Throwable t) {
+        Response response = new Response(request.getId(), 
request.getVersion());
+        response.setStatus(Response.BAD_REQUEST);
+        response.setErrorMessage(StringUtils.toString(t));
+        return response;
+    }
 }
\ No newline at end of file

Reply via email to