This is an automated email from the ASF dual-hosted git repository. dionysios pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push: new f8d017e GIRAPH-1230 f8d017e is described below commit f8d017e61d66ec56b17ecf796743d6851c2f0988 Author: Dionysios Logothetis <dlogothe...@gmail.com> AuthorDate: Mon Dec 16 12:40:54 2019 -0800 GIRAPH-1230 closes #118 --- checkstyle.xml | 2 +- .../apache/giraph/comm/netty/ChannelRotater.java | 9 ++++-- .../org/apache/giraph/comm/netty/NettyClient.java | 37 ++++++++++++---------- .../org/apache/giraph/graph/GraphTaskManager.java | 28 ++++++++++++++++ .../org/apache/giraph/worker/BspServiceWorker.java | 12 ++++++- 5 files changed, 66 insertions(+), 22 deletions(-) diff --git a/checkstyle.xml b/checkstyle.xml index e0e604c..f820d74 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -227,7 +227,7 @@ </module> <!-- Over time, we will revised this down --> <module name="MethodLength"> - <property name="max" value="200"/> + <property name="max" value="210"/> </module> <module name="ParameterNumber"> <property name="max" value="8"/> diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java index 54c0b50..53af9c0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java @@ -25,12 +25,15 @@ import com.google.common.collect.Lists; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.log4j.Logger; /** * Maintains multiple channels and rotates between them. This is thread-safe. */ public class ChannelRotater { + /** Logger */ + private static final Logger LOG = Logger.getLogger(ChannelRotater.class); /** Index of last used channel */ private int index = 0; /** Channel list */ @@ -73,9 +76,9 @@ public class ChannelRotater { */ public synchronized Channel nextChannel() { if (channelList.isEmpty()) { - throw new IllegalArgumentException( - "nextChannel: No channels exist for hostname " + - address.getHostName()); + LOG.warn("nextChannel: No channels exist for hostname " + + address.getHostName()); + return null; } ++index; diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index c8ccea2..103a8ec 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -242,6 +242,11 @@ public class NettyClient { /** How many network requests were resent because connection failed */ private final GiraphHadoopCounter networkRequestsResentForConnectionFailure; + /** + * Keeps track of the number of reconnect failures. Once this exceeds the + * value of {@link #maxConnectionFailures}, the job will fail. + */ + private int reconnectFailures = 0; /** * Only constructor @@ -764,26 +769,25 @@ public class NettyClient { private Channel getNextChannel(InetSocketAddress remoteServer) { Channel channel = addressChannelMap.get(remoteServer).nextChannel(); if (channel == null) { - throw new IllegalStateException( - "getNextChannel: No channel exists for " + remoteServer); - } - - // Return this channel if it is connected - if (channel.isActive()) { - return channel; - } + LOG.warn("getNextChannel: No channel exists for " + remoteServer); + } else { + // Return this channel if it is connected + if (channel.isActive()) { + return channel; + } - // Get rid of the failed channel - if (addressChannelMap.get(remoteServer).removeChannel(channel)) { - LOG.warn("getNextChannel: Unlikely event that the channel " + + // Get rid of the failed channel + if (addressChannelMap.get(remoteServer).removeChannel(channel)) { + LOG.warn("getNextChannel: Unlikely event that the channel " + channel + " was already removed!"); - } - if (LOG.isInfoEnabled()) { - LOG.info("getNextChannel: Fixing disconnected channel to " + + } + if (LOG.isInfoEnabled()) { + LOG.info("getNextChannel: Fixing disconnected channel to " + remoteServer + ", open = " + channel.isOpen() + ", " + "bound = " + channel.isRegistered()); + } } - int reconnectFailures = 0; + while (reconnectFailures < maxConnectionFailures) { ChannelFuture connectionFuture = bootstrap.connect(remoteServer); try { @@ -1205,7 +1209,7 @@ public class NettyClient { * This listener class just dumps exception stack traces if * something happens. */ - private class LogOnErrorChannelFutureListener + private static class LogOnErrorChannelFutureListener implements ChannelFutureListener { @Override @@ -1213,7 +1217,6 @@ public class NettyClient { if (future.isDone() && !future.isSuccess()) { LOG.error("Channel failed channelId=" + future.channel().hashCode(), future.cause()); - checkRequestsAfterChannelFailure(future.channel()); } } } diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 54d3084..6db1934 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -1075,6 +1075,18 @@ end[PURE_YARN]*/ getConf()), getJobProgressTracker()); } + /** + * Creates exception handler with the passed implementation of + * {@link CheckerIfWorkerShouldFailAfterException}. + * + * @param checker Instance that checks whether the job should fail. + * @return Exception handler. + */ + public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler( + CheckerIfWorkerShouldFailAfterException checker) { + return new OverrideExceptionHandler(checker, getJobProgressTracker()); + } + public ImmutableClassesGiraphConfiguration<I, V, E> getConf() { return conf; } @@ -1128,6 +1140,9 @@ end[PURE_YARN]*/ @Override public void uncaughtException(final Thread t, final Throwable e) { if (!checker.checkIfWorkerShouldFail(t, e)) { + LOG.error( + "uncaughtException: OverrideExceptionHandler on thread " + + t.getName() + ", msg = " + e.getMessage(), e); return; } try { @@ -1169,4 +1184,17 @@ end[PURE_YARN]*/ return true; } } + + /** + * Checks the message of a throwable, and checks whether it is a + * "connection reset by peer" type of exception. + * + * @param throwable Throwable + * @return True if the throwable is a "connection reset by peer", + * false otherwise. + */ + public static boolean isConnectionResetByPeer(Throwable throwable) { + return throwable.getMessage().startsWith( + "Connection reset by peer") ? true : false; + } } diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index b6756c9..a745b1e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -121,6 +121,8 @@ import org.json.JSONObject; import com.google.common.collect.Lists; +import static org.apache.giraph.graph.GraphTaskManager.isConnectionResetByPeer; + /** * ZooKeeper-based implementation of {@link CentralizedServiceWorker}. * @@ -217,7 +219,15 @@ public class BspServiceWorker<I extends WritableComparable, getGraphPartitionerFactory().createWorkerGraphPartitioner(); workerInfo = new WorkerInfo(); workerServer = new NettyWorkerServer<I, V, E>(conf, this, context, - graphTaskManager.createUncaughtExceptionHandler()); + graphTaskManager.createUncaughtExceptionHandler( + (thread, throwable) -> { + // If the connection was closed by the client, then we just log + // the error, we do not fail the job, since the client will + // attempt to reconnect. + return !isConnectionResetByPeer(throwable); + } + ) + ); workerInfo.setInetSocketAddress(workerServer.getMyAddress(), workerServer.getLocalHostOrIp()); workerInfo.setTaskId(getTaskId());