Repository: reef Updated Branches: refs/heads/master c4710a886 -> 4729037be
[REEF-1801] Improve logging when sending remote events in Wake Summary of changes: * Replace `ex.printStackTrace()` calls with proper log messages * Minor refactoring for readability and better performance in Wake identifier factory and around * Better logging in Wake * Other minor refactoring in Wake: implement some `.toString()` etc. JIRA: [REEF-1801](https://issues.apache.org/jira/browse/REEF-1801) Pull Request: This closes #1306 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4729037b Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4729037b Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4729037b Branch: refs/heads/master Commit: 4729037be79fc7d5f2f02194cda433497022798c Parents: c4710a8 Author: Sergiy Matusevych <[email protected]> Authored: Tue May 16 14:03:45 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed May 17 09:06:27 2017 -0700 ---------------------------------------------------------------------- .../common/driver/client/ClientConnection.java | 2 +- .../wake/impl/DefaultIdentifierFactory.java | 36 ++++++++----- .../remote/impl/RemoteSenderEventHandler.java | 54 ++++++++++---------- .../transport/netty/LoggingLinkListener.java | 7 +-- .../wake/remote/transport/netty/NettyLink.java | 18 +++---- 5 files changed, 61 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java index 7be8e07..bce2fbe 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java @@ -54,7 +54,7 @@ public final class ClientConnection { * @param status */ public synchronized void send(final ReefServiceProtos.JobStatusProto status) { - LOG.log(Level.FINEST, "Sending:\n" + status); + LOG.log(Level.FINEST, "Sending to client: status={0}", status.getState()); this.jobStatusHandler.onNext(status); } http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java index 951fe5a..47e1ffd 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java @@ -28,10 +28,12 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Default remote identifier factory that creates a specific remote identifier - * from a string representation + * from a string representation. * <p> * A string representation is broken into two parts type and type-specific details separated by "://" * A remote identifier implementation should implement a constructor that accepts a string. @@ -39,7 +41,9 @@ import java.util.Map; */ public class DefaultIdentifierFactory implements IdentifierFactory { - // map between type and remote identifier class + private static final Logger LOG = Logger.getLogger(DefaultIdentifierFactory.class.getName()); + + /** Map between type and remote identifier class. */ private final Map<String, Class<? extends Identifier>> typeToClazzMap; /** @@ -60,6 +64,8 @@ public class DefaultIdentifierFactory implements IdentifierFactory { this.typeToClazzMap = typeToClazzMap; } + private static final Class<?>[] CONSTRUCTOR_ARG_TYPES = {String.class}; + /** * Creates a new remote identifier instance. * @@ -69,24 +75,28 @@ public class DefaultIdentifierFactory implements IdentifierFactory { */ @Override public Identifier getNewInstance(final String str) { + final int index = str.indexOf("://"); if (index < 0) { - throw new RemoteRuntimeException("Invalid name " + str); + throw new RemoteRuntimeException("Invalid remote identifier name: " + str); } + final String type = str.substring(0, index); final Class<? extends Identifier> clazz = typeToClazzMap.get(type); - final Class<?>[] argTypes = {String.class}; - final Constructor<? extends Identifier> constructor; + try { - constructor = clazz.getDeclaredConstructor(argTypes); - final Object[] args = new Object[1]; - args[0] = str.substring(index + 3); - return constructor.newInstance(args); - } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | - IllegalArgumentException | InvocationTargetException e) { - e.printStackTrace(); + + final Constructor<? extends Identifier> constructor = clazz.getDeclaredConstructor(CONSTRUCTOR_ARG_TYPES); + final Object[] args = new Object[] {str.substring(index + 3)}; + + final Identifier instance = constructor.newInstance(args); + LOG.log(Level.FINER, "Created new identifier: {0} for {1}", new Object[] {instance, str}); + return instance; + + } catch (final NoSuchMethodException | SecurityException | InstantiationException + | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + LOG.log(Level.SEVERE, "Cannot create new identifier for: " + str, e); throw new RemoteRuntimeException(e); } } - } http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java index 8e158f9..bdd0879 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java @@ -40,10 +40,11 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { private static final Logger LOG = Logger.getLogger(RemoteSenderEventHandler.class.getName()); + private final BlockingQueue<RemoteEvent<T>> queue = new LinkedBlockingQueue<>(); + private final AtomicReference<Link<byte[]>> linkRef = new AtomicReference<>(); + private final RemoteEventEncoder<T> encoder; private final Transport transport; - private final BlockingQueue<RemoteEvent<T>> queue; - private final AtomicReference<Link<byte[]>> linkRef; private final ExecutorService executor; /** @@ -57,12 +58,15 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { this.encoder = new RemoteEventEncoder<>(encoder); this.transport = transport; this.executor = executor; - this.linkRef = new AtomicReference<>(); - this.queue = new LinkedBlockingQueue<>(); + } + + @Override + public String toString() { + return String.format("RemoteSenderEventHandler: { transport: %s encoder: %s}", this.transport, this.encoder); } void setLink(final Link<byte[]> link) { - LOG.log(Level.FINEST, "thread {0} link {1}", new Object[]{Thread.currentThread(), link}); + LOG.log(Level.FINEST, "thread {0} set link {1}", new Object[] {Thread.currentThread(), link}); linkRef.compareAndSet(null, link); consumeQueue(); } @@ -71,12 +75,12 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { try { RemoteEvent<T> event; while ((event = queue.poll(0, TimeUnit.MICROSECONDS)) != null) { - LOG.log(Level.FINEST, "{0}", event); + LOG.log(Level.FINEST, "Event: {0}", event); linkRef.get().write(encoder.encode(event)); } - } catch (final InterruptedException e) { - e.printStackTrace(); - throw new RemoteRuntimeException(e); + } catch (final InterruptedException ex) { + LOG.log(Level.SEVERE, "Interrupted", ex); + throw new RemoteRuntimeException(ex); } } @@ -89,6 +93,9 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { @Override public void onNext(final RemoteEvent<T> value) { try { + + LOG.log(Level.FINEST, "Link: {0} event: {1}", new Object[] {linkRef, value}); + if (linkRef.get() == null) { queue.add(value); @@ -101,36 +108,30 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { final ConnectFutureTask<Link<byte[]>> cf = new ConnectFutureTask<>( new ConnectCallable(transport, value.localAddress(), value.remoteAddress()), - new ConnectEventHandler<T>(this)); + new ConnectEventHandler<>(this)); executor.submit(cf); + } else { // encode and write bytes // consumeQueue(); - - if (LOG.isLoggable(Level.FINEST)) { - LOG.log(Level.FINEST, "Send an event from " + linkRef.get().getLocalAddress() + " to " + - linkRef.get().getRemoteAddress() + " value " + value); - } + LOG.log(Level.FINEST, "Send: {0} event: {1}", new Object[] {linkRef, value}); linkRef.get().write(encoder.encode(value)); } - } catch (final RemoteRuntimeException ex2) { - ex2.printStackTrace(); - throw ex2; + + } catch (final RemoteRuntimeException ex) { + LOG.log(Level.SEVERE, "Remote Exception", ex); + throw ex; } } - - } class ConnectCallable implements Callable<Link<byte[]>> { private final Transport transport; - private final SocketAddress localAddress; private final SocketAddress remoteAddress; ConnectCallable(final Transport transport, final SocketAddress localAddress, final SocketAddress remoteAddress) { this.transport = transport; - this.localAddress = localAddress; this.remoteAddress = remoteAddress; } @@ -144,6 +145,8 @@ class ConnectCallable implements Callable<Link<byte[]>> { class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte[]>>> { + private static final Logger LOG = Logger.getLogger(ConnectEventHandler.class.getName()); + private final RemoteSenderEventHandler<T> handler; ConnectEventHandler(final RemoteSenderEventHandler<T> handler) { @@ -154,10 +157,9 @@ class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte public void onNext(final ConnectFutureTask<Link<byte[]>> value) { try { handler.setLink(value.get()); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - throw new RemoteRuntimeException(e); + } catch (final InterruptedException | ExecutionException ex) { + LOG.log(Level.SEVERE, "Execution Exception", ex); + throw new RemoteRuntimeException(ex); } } - } http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java index 8cd8daf..72ae838 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java @@ -38,9 +38,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> { */ @Override public void onSuccess(final T message) { - if (LOG.isLoggable(Level.FINEST)) { - LOG.log(Level.FINEST, "The message is successfully sent : {0}", new Object[]{message}); - } + LOG.log(Level.FINEST, "Message successfully sent: {0}", message); } /** @@ -49,8 +47,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> { @Override public void onException(final Throwable cause, final SocketAddress remoteAddress, final T message) { if (LOG.isLoggable(Level.FINEST)) { - LOG.log(Level.FINEST, "The message to {0} is failed to be sent. message : {1}, cause : {2}", - new Object[]{remoteAddress, message, cause}); + LOG.log(Level.FINEST, "Error sending message " + message + " to " + remoteAddress, cause); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/4729037b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java index bde515b..3c51f2a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java @@ -39,7 +39,9 @@ import java.util.logging.Logger; public class NettyLink<T> implements Link<T> { public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; + private static final Logger LOG = Logger.getLogger(NettyLink.class.getName()); + private final Channel channel; private final Encoder<? super T> encoder; private final LinkListener<? super T> listener; @@ -61,14 +63,12 @@ public class NettyLink<T> implements Link<T> { * @param encoder the encoder * @param listener the link listener */ - public NettyLink(final Channel channel, - final Encoder<? super T> encoder, final LinkListener<? super T> listener) { + public NettyLink(final Channel channel, final Encoder<? super T> encoder, final LinkListener<? super T> listener) { this.channel = channel; this.encoder = encoder; this.listener = listener; } - /** * Writes the message to this link. * @@ -76,14 +76,10 @@ public class NettyLink<T> implements Link<T> { */ @Override public void write(final T message) { - LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message}); - final byte[] allData = encoder.encode(message); - // byte[] -> ByteBuf + LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message}); + final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message))); if (listener != null) { - channel.writeAndFlush(Unpooled.wrappedBuffer(allData)) - .addListener(new NettyChannelFutureListener<>(message, listener)); - } else { - channel.writeAndFlush(Unpooled.wrappedBuffer(allData)); + future.addListener(new NettyChannelFutureListener<>(message, listener)); } } @@ -109,7 +105,7 @@ public class NettyLink<T> implements Link<T> { @Override public String toString() { - return "localAddr: " + getLocalAddress() + " remoteAddr: " + getRemoteAddress(); + return "NettyLink: " + channel; // Channel has good .toString() implementation } }
