Repository: incubator-reef Updated Branches: refs/heads/master 4ce2ee141 -> 6ce4f41e2
[REEF-76] Throw ConnectException after the configured number of connection retries JIRA: [REEF-76] https://issues.apache.org/jira/browse/REEF-76 Pull Request: Closes #37 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/6ce4f41e Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/6ce4f41e Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/6ce4f41e Branch: refs/heads/master Commit: 6ce4f41e215db11a753effbea2848b32753fc4af Parents: 4ce2ee1 Author: taegeonum <[email protected]> Authored: Wed Dec 17 20:59:18 2014 +0900 Committer: Byung-Gon Chun <[email protected]> Committed: Sun Feb 22 20:40:38 2015 +0900 ---------------------------------------------------------------------- .../netty/NettyMessagingTransport.java | 30 +++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6ce4f41e/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index ba76501..f9999ae 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -30,19 +30,10 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; -import org.apache.reef.wake.EStage; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.DefaultThreadFactory; -import org.apache.reef.wake.remote.Encoder; -import org.apache.reef.wake.remote.exception.RemoteRuntimeException; -import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.transport.Link; -import org.apache.reef.wake.remote.transport.LinkListener; -import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException; import java.io.IOException; import java.net.BindException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Random; @@ -52,6 +43,17 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.DefaultThreadFactory; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.exception.RemoteRuntimeException; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException; + /** * Messaging transport implementation with Netty */ @@ -207,7 +209,7 @@ public class NettyMessagingTransport implements Transport { Link<T> link = null; - for (int i = 0; i < this.numberOfTries; ++i) { + for (int i = 0; i <= this.numberOfTries; ++i) { LinkReference linkRef = this.addrToLinkRefMap.get(remoteAddr); if (linkRef != null) { @@ -219,6 +221,11 @@ public class NettyMessagingTransport implements Transport { return link; } } + + if (i == this.numberOfTries) { + // Connection failure + throw new ConnectException("Connection to " + remoteAddr + " refused"); + } LOG.log(Level.FINE, "No cached link for {0} thread {1}", new Object[]{remoteAddr, Thread.currentThread()}); @@ -282,6 +289,7 @@ public class NettyMessagingTransport implements Transport { } } } + return link; }
