This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 9cdb2f0 Merge pull request #3141, optimize outbound event and some
code formatting.
9cdb2f0 is described below
commit 9cdb2f0882c95272ab8e76698acdd0a6ae9d35be
Author: 时无两丶 <[email protected]>
AuthorDate: Thu Jan 24 20:15:14 2019 +0800
Merge pull request #3141, optimize outbound event and some code formatting.
---
.../org/apache/dubbo/common/bytecode/Proxy.java | 19 ++-----
.../dubbo/registry/support/AbstractRegistry.java | 63 ++++++++++++----------
.../registry/multicast/MulticastRegistry.java | 2 +-
.../apache/dubbo/registry/redis/RedisRegistry.java | 56 +++++++------------
.../transport/netty4/NettyClientHandler.java | 62 +++++++++++++--------
5 files changed, 98 insertions(+), 104 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java
index d29b9f7..e72faae 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/bytecode/Proxy.java
@@ -39,12 +39,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public abstract class Proxy {
- public static final InvocationHandler RETURN_NULL_INVOKER = new
InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) {
- return null;
- }
- };
+ public static final InvocationHandler RETURN_NULL_INVOKER = (proxy,
method, args) -> null;
public static final InvocationHandler THROW_UNSUPPORTED_INVOKER = new
InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
@@ -108,11 +103,7 @@ public abstract class Proxy {
// get cache by class loader.
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
- cache = ProxyCacheMap.get(cl);
- if (cache == null) {
- cache = new HashMap<String, Object>();
- ProxyCacheMap.put(cl, cache);
- }
+ cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
}
Proxy proxy = null;
@@ -145,8 +136,8 @@ public abstract class Proxy {
try {
ccp = ClassGenerator.newInstance(cl);
- Set<String> worked = new HashSet<String>();
- List<Method> methods = new ArrayList<Method>();
+ Set<String> worked = new HashSet<>();
+ List<Method> methods = new ArrayList<>();
for (int i = 0; i < ics.length; i++) {
if (!Modifier.isPublic(ics[i].getModifiers())) {
@@ -176,7 +167,7 @@ public abstract class Proxy {
for (int j = 0; j < pts.length; j++) {
code.append(" args[").append(j).append("] =
($w)$").append(j + 1).append(";");
}
- code.append(" Object ret = handler.invoke(this, methods["
+ ix + "], args);");
+ code.append(" Object ret = handler.invoke(this,
methods[").append(ix).append("], args);");
if (!Void.TYPE.equals(rt)) {
code.append(" return ").append(asArgument(rt,
"ret")).append(";");
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index 4ea6108..de58981 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
@@ -52,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* AbstractRegistry. (SPI, Prototype, ThreadSafe)
- *
*/
public abstract class AbstractRegistry implements Registry {
@@ -62,16 +62,16 @@ public abstract class AbstractRegistry implements Registry {
private static final String URL_SPLIT = "\\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
- // Local disk cache, where the special key value.registies records the
list of registry centers, and the others are the list of notified service
providers
+ // Local disk cache, where the special key value.registries records the
list of registry centers, and the others are the list of notified service
providers
private final Properties properties = new Properties();
// File cache timing writing
private final ExecutorService registryCacheExecutor =
Executors.newFixedThreadPool(1, new
NamedThreadFactory("DubboSaveRegistryCache", true));
// Is it synchronized to save the file
private final boolean syncSaveFile;
private final AtomicLong lastCacheChanged = new AtomicLong();
- private final Set<URL> registered = new ConcurrentHashSet<URL>();
- private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new
ConcurrentHashMap<URL, Set<NotifyListener>>();
- private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new
ConcurrentHashMap<URL, Map<String, List<URL>>>();
+ private final Set<URL> registered = new ConcurrentHashSet<>();
+ private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new
ConcurrentHashMap<>();
private URL registryUrl;
// Local disk cache file
private File file;
@@ -91,13 +91,15 @@ public abstract class AbstractRegistry implements Registry {
}
}
this.file = file;
+ // When starting the subscription center,
+ // we need to read the local cache file for future Registry fault
tolerance processing.
loadProperties();
notify(url.getBackupUrls());
}
protected static List<URL> filterEmpty(URL url, List<URL> urls) {
- if (urls == null || urls.isEmpty()) {
- List<URL> result = new ArrayList<URL>(1);
+ if (CollectionUtils.isEmpty(urls)) {
+ List<URL> result = new ArrayList<>(1);
result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
return result;
}
@@ -222,7 +224,7 @@ public abstract class AbstractRegistry implements Registry {
&& (Character.isLetter(key.charAt(0)) || key.charAt(0) ==
'_')
&& value != null && value.length() > 0) {
String[] arr = value.trim().split(URL_SPLIT);
- List<URL> urls = new ArrayList<URL>();
+ List<URL> urls = new ArrayList<>();
for (String u : arr) {
urls.add(URL.valueOf(u));
}
@@ -234,7 +236,7 @@ public abstract class AbstractRegistry implements Registry {
@Override
public List<URL> lookup(URL url) {
- List<URL> result = new ArrayList<URL>();
+ List<URL> result = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
@@ -245,7 +247,7 @@ public abstract class AbstractRegistry implements Registry {
}
}
} else {
- final AtomicReference<List<URL>> reference = new
AtomicReference<List<URL>>();
+ final AtomicReference<List<URL>> reference = new
AtomicReference<>();
NotifyListener listener = reference::set;
subscribe(url, listener); // Subscribe logic guarantees the first
notify to return
List<URL> urls = reference.get();
@@ -293,10 +295,7 @@ public abstract class AbstractRegistry implements Registry
{
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
-
- Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, k -> {
- return new ConcurrentHashSet<>();
- });
+ Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n ->
new ConcurrentHashSet<>());
listeners.add(listener);
}
@@ -319,7 +318,7 @@ public abstract class AbstractRegistry implements Registry {
protected void recover() throws Exception {
// register
- Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
+ Set<URL> recoverRegistered = new HashSet<>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
@@ -329,7 +328,7 @@ public abstract class AbstractRegistry implements Registry {
}
}
// subscribe
- Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL,
Set<NotifyListener>>(getSubscribed());
+ Map<URL, Set<NotifyListener>> recoverSubscribed = new
HashMap<>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " +
recoverSubscribed.keySet());
@@ -344,7 +343,7 @@ public abstract class AbstractRegistry implements Registry {
}
protected void notify(List<URL> urls) {
- if (urls == null || urls.isEmpty()) {
+ if (CollectionUtils.isEmpty(urls)) {
return;
}
@@ -368,6 +367,13 @@ public abstract class AbstractRegistry implements Registry
{
}
}
+ /**
+ * Notify changes from the Provider side.
+ *
+ * @param url consumer side url
+ * @param listener listener
+ * @param urls provider latest urls
+ */
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
@@ -375,7 +381,7 @@ public abstract class AbstractRegistry implements Registry {
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
- if ((urls == null || urls.isEmpty())
+ if ((CollectionUtils.isEmpty(urls))
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
@@ -383,28 +389,27 @@ public abstract class AbstractRegistry implements
Registry {
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " +
urls);
}
- Map<String, List<URL>> result = new HashMap<String, List<URL>>();
+ // keep every provider's category.
+ Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY,
Constants.DEFAULT_CATEGORY);
- List<URL> categoryList = result.computeIfAbsent(category, k ->
{
- return new ArrayList<>();
- });
+ List<URL> categoryList = result.computeIfAbsent(category, k ->
new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
- Map<String, List<URL>> categoryNotified =
notified.computeIfAbsent(url, k -> {
- return new ConcurrentHashMap<>();
- });
+ Map<String, List<URL>> categoryNotified =
notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
- saveProperties(url);
listener.notify(categoryList);
+ // We will update our cache file after each notification.
+ // When our Registry has a subscribe failure due to network
jitter, we can return at least the existing cache URL.
+ saveProperties(url);
}
}
@@ -443,9 +448,9 @@ public abstract class AbstractRegistry implements Registry {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
- Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
+ Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
- for (URL url : new HashSet<URL>(getRegistered())) {
+ for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url);
@@ -458,7 +463,7 @@ public abstract class AbstractRegistry implements Registry {
}
}
}
- Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL,
Set<NotifyListener>>(getSubscribed());
+ Map<URL, Set<NotifyListener>> destroySubscribed = new
HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry :
destroySubscribed.entrySet()) {
URL url = entry.getKey();
diff --git
a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
index 5498e9e..de15e25 100644
---
a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
+++
b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastRegistry.java
@@ -396,7 +396,7 @@ public class MulticastRegistry extends FailbackRegistry {
@Override
public List<URL> lookup(URL url) {
- List<URL> urls = new ArrayList<URL>();
+ List<URL> urls = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> values : notifiedUrls.values()) {
diff --git
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
index 589e92f..e9dde9b 100644
---
a/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
+++
b/dubbo-registry/dubbo-registry-redis/src/main/java/org/apache/dubbo/registry/redis/RedisRegistry.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
@@ -38,6 +39,7 @@ import redis.clients.jedis.JedisPubSub;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -156,14 +158,11 @@ public class RedisRegistry extends FailbackRegistry {
this.root = group;
this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY,
Constants.DEFAULT_SESSION_TIMEOUT);
- this.expireFuture = expireExecutor.scheduleWithFixedDelay(new
Runnable() {
- @Override
- public void run() {
- try {
- deferExpired(); // Extend the expiration time
- } catch (Throwable t) { // Defensive fault tolerance
- logger.error("Unexpected exception occur at defer expire
time, cause: " + t.getMessage(), t);
- }
+ this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
+ try {
+ deferExpired(); // Extend the expiration time
+ } catch (Throwable t) { // Defensive fault tolerance
+ logger.error("Unexpected exception occur at defer expire time,
cause: " + t.getMessage(), t);
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}
@@ -172,9 +171,8 @@ public class RedisRegistry extends FailbackRegistry {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
- Jedis jedis = jedisPool.getResource();
- try {
- for (URL url : new HashSet<URL>(getRegistered())) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
String key = toCategoryPath(url);
if (jedis.hset(key, url.toFullString(),
String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
@@ -188,8 +186,6 @@ public class RedisRegistry extends FailbackRegistry {
if (!replicate) {
break;// If the server side has synchronized data,
just write a single machine
}
- } finally {
- jedis.close();
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis
registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
@@ -231,13 +227,10 @@ public class RedisRegistry extends FailbackRegistry {
public boolean isAvailable() {
for (JedisPool jedisPool : jedisPools.values()) {
try {
- Jedis jedis = jedisPool.getResource();
- try {
+ try (Jedis jedis = jedisPool.getResource()) {
if (jedis.isConnected()) {
return true; // At least one single machine is
available.
}
- } finally {
- jedis.close();
}
} catch (Throwable t) {
}
@@ -281,16 +274,13 @@ public class RedisRegistry extends FailbackRegistry {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
- Jedis jedis = jedisPool.getResource();
- try {
+ try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(key, value, expire);
jedis.publish(key, Constants.REGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data,
just write a single machine
}
- } finally {
- jedis.close();
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause:
" + t.getMessage(), t);
@@ -314,16 +304,13 @@ public class RedisRegistry extends FailbackRegistry {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
- Jedis jedis = jedisPool.getResource();
- try {
+ try (Jedis jedis = jedisPool.getResource()) {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNREGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data,
just write a single machine
}
- } finally {
- jedis.close();
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause:
" + t.getMessage(), t);
@@ -355,8 +342,7 @@ public class RedisRegistry extends FailbackRegistry {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
- Jedis jedis = jedisPool.getResource();
- try {
+ try (Jedis jedis = jedisPool.getResource()) {
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
Set<String> keys = jedis.keys(service);
@@ -364,24 +350,18 @@ public class RedisRegistry extends FailbackRegistry {
Map<String, Set<String>> serviceKeys = new
HashMap<>();
for (String key : keys) {
String serviceKey = toServicePath(key);
- Set<String> sk = serviceKeys.get(serviceKey);
- if (sk == null) {
- sk = new HashSet<>();
- serviceKeys.put(serviceKey, sk);
- }
+ Set<String> sk =
serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
sk.add(key);
}
for (Set<String> sk : serviceKeys.values()) {
- doNotify(jedis, sk, url,
Arrays.asList(listener));
+ doNotify(jedis, sk, url,
Collections.singletonList(listener));
}
}
} else {
- doNotify(jedis, jedis.keys(service +
Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener));
+ doNotify(jedis, jedis.keys(service +
Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url,
Collections.singletonList(listener));
}
success = true;
break; // Just read one server's data
- } finally {
- jedis.close();
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from
redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause:
" + t.getMessage(), t);
@@ -402,7 +382,7 @@ public class RedisRegistry extends FailbackRegistry {
private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new
HashMap<>(getSubscribed()).entrySet()) {
- doNotify(jedis, Arrays.asList(key), entry.getKey(), new
HashSet<>(entry.getValue()));
+ doNotify(jedis, Collections.singletonList(key), entry.getKey(),
new HashSet<>(entry.getValue()));
}
}
@@ -450,7 +430,7 @@ public class RedisRegistry extends FailbackRegistry {
logger.info("redis notify: " + key + " = " + urls);
}
}
- if (result == null || result.isEmpty()) {
+ if (CollectionUtils.isEmpty(result)) {
return;
}
for (NotifyListener listener : listeners) {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
index 2d89811..265d6ef 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
@@ -16,16 +16,15 @@
*/
package org.apache.dubbo.remoting.transport.netty4;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-
/**
* NettyClientHandler
*/
@@ -47,7 +46,6 @@ public class NettyClientHandler extends ChannelDuplexHandler {
this.handler = handler;
}
-
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),
url, handler);
@@ -78,27 +76,33 @@ public class NettyClientHandler extends
ChannelDuplexHandler {
}
}
-
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
super.write(ctx, msg, promise);
- NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),
url, handler);
- try {
- // if error happens from write, mock a BAD_REQUEST response so
that invoker can return immediately without
- // waiting until timeout. FIXME: not sure if this is the right
approach, but exceptionCaught doesn't work
- // as expected.
- if (promise.cause() != null && msg instanceof Request) {
- Request request = (Request) msg;
- Response response = new Response(request.getId(),
request.getVersion());
- response.setStatus(Response.BAD_REQUEST);
-
response.setErrorMessage(StringUtils.toString(promise.cause()));
- handler.received(channel, response);
- } else {
- handler.sent(channel, msg);
+ final NettyChannel channel =
NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ final boolean isRequest = msg instanceof Request;
+
+ // We add listeners to make sure our out bound event is correct.
+ // If our out bound event has an error (in most cases the encoder
fails),
+ // we need to have the request return directly instead of blocking the
invoke process.
+ promise.addListener(future -> {
+ try {
+ if (future.isSuccess()) {
+ // if our future is success, mark the future to sent.
+ handler.sent(channel, msg);
+ return;
+ }
+
+ Throwable t = future.cause();
+ if (t != null && isRequest) {
+ Request request = (Request) msg;
+ Response response = buildErrorResponse(request, t);
+ handler.received(channel, response);
+ }
+ } finally {
+ NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
- } finally {
- NettyChannel.removeChannelIfDisconnected(ctx.channel());
- }
+ });
}
@Override
@@ -111,4 +115,18 @@ public class NettyClientHandler extends
ChannelDuplexHandler {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
+
+ /**
+ * build a bad request's response
+ *
+ * @param request the request
+ * @param t the throwable. In most cases, serialization fails.
+ * @return the response
+ */
+ private static Response buildErrorResponse(Request request, Throwable t) {
+ Response response = new Response(request.getId(),
request.getVersion());
+ response.setStatus(Response.BAD_REQUEST);
+ response.setErrorMessage(StringUtils.toString(t));
+ return response;
+ }
}
\ No newline at end of file