Repository: reef
Updated Branches:
refs/heads/master 5a1c60f5a -> 5da7cb457
[REEF-1651] Improve logging in REEF Remote Manager and related classes
Summary of changes:
* Improve logging in the `RemoteManager` and around
* Implement `.toString()` methods for some remote messages
* Log unhandled exceptions in `DefaultErrorHandler`
* Multiple code readability improvements
JIRA:
[REEF-1651](https://issues.apache.org/jira/browse/REEF-1651)
Pull request:
This closes #1166
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5da7cb45
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5da7cb45
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5da7cb45
Branch: refs/heads/master
Commit: 5da7cb457ccbd4614ea80345f57585299b51cabe
Parents: 5a1c60f
Author: Sergiy Matusevych <[email protected]>
Authored: Thu Oct 20 17:44:36 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Tue Oct 25 17:27:22 2016 -0700
----------------------------------------------------------------------
.../common/driver/DriverStartHandler.java | 49 ++++++------
.../reef/wake/remote/DefaultErrorHandler.java | 7 +-
.../apache/reef/wake/remote/RemoteManager.java | 4 -
.../DefaultRemoteManagerImplementation.java | 80 +++++++++++---------
.../reef/wake/remote/impl/HandlerContainer.java | 46 ++++++-----
.../reef/wake/remote/impl/RemoteEvent.java | 17 ++---
.../reef/wake/remote/impl/TransportEvent.java | 10 ++-
7 files changed, 118 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/5da7cb45/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index abdecd7..c090dec 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -38,29 +38,33 @@ import java.util.logging.Logger;
* This is bound to the start event of the clock and dispatches it to the
appropriate application code.
*/
public final class DriverStartHandler implements EventHandler<StartTime> {
+
private static final Logger LOG =
Logger.getLogger(DriverStartHandler.class.getName());
private final Set<EventHandler<StartTime>> startHandlers;
private final Set<EventHandler<DriverRestarted>> restartHandlers;
private final Set<EventHandler<DriverRestarted>> serviceRestartHandlers;
+
private final DriverRestartManager driverRestartManager;
@Inject
-
DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
- final Set<EventHandler<StartTime>> startHandlers,
- @Parameter(DriverRestartHandler.class)
- final Set<EventHandler<DriverRestarted>> restartHandlers,
- @Parameter(ServiceDriverRestartedHandlers.class)
- final Set<EventHandler<DriverRestarted>>
serviceRestartHandlers,
- final DriverRestartManager driverRestartManager) {
+ private DriverStartHandler(
+ @Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
+ final Set<EventHandler<StartTime>> startHandlers,
+ @Parameter(DriverRestartHandler.class)
+ final Set<EventHandler<DriverRestarted>> restartHandlers,
+ @Parameter(ServiceDriverRestartedHandlers.class)
+ final Set<EventHandler<DriverRestarted>> serviceRestartHandlers,
+ final DriverRestartManager driverRestartManager) {
+
this.startHandlers = startHandlers;
this.restartHandlers = restartHandlers;
this.serviceRestartHandlers = serviceRestartHandlers;
this.driverRestartManager = driverRestartManager;
- LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers
[{0}], RestartHandlers [{1}]," +
- "and ServiceRestartHandlers [{2}].",
- new String[] {this.startHandlers.toString(),
this.restartHandlers.toString(),
- this.serviceRestartHandlers.toString()});
+
+ LOG.log(Level.FINE,
+ "Instantiated DriverStartHandler: StartHandlers:{0}
RestartHandlers:{1} ServiceRestartHandlers:{2}",
+ new Object[] {this.startHandlers, this.restartHandlers,
this.serviceRestartHandlers});
}
@Override
@@ -73,19 +77,20 @@ public final class DriverStartHandler implements
EventHandler<StartTime> {
}
private void onRestart(final StartTime startTime) {
- if (this.restartHandlers.size() > 0) {
- final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
- new ArrayList<>(this.serviceRestartHandlers.size() +
this.restartHandlers.size());
-
- orderedRestartHandlers.addAll(this.serviceRestartHandlers);
- orderedRestartHandlers.addAll(this.restartHandlers);
- // This can only be called after calling client restart handlers because
REEF.NET
- // JobDriver requires making this call to set up the InterOp handlers.
- this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
- } else {
- throw new DriverFatalRuntimeException("Driver restart happened, but no
ON_DRIVER_RESTART handler is bound.");
+ if (this.restartHandlers.isEmpty()) {
+ throw new DriverFatalRuntimeException("Driver restarted, but no
ON_DRIVER_RESTART handler is bound.");
}
+
+ final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
+ new ArrayList<>(this.serviceRestartHandlers.size() +
this.restartHandlers.size());
+
+ orderedRestartHandlers.addAll(this.serviceRestartHandlers);
+ orderedRestartHandlers.addAll(this.restartHandlers);
+
+ // This can only be called after calling client restart handlers because
REEF.NET
+ // JobDriver requires making this call to set up the InterOp handlers.
+ this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
}
private void onStart(final StartTime startTime) {
http://git-wip-us.apache.org/repos/asf/reef/blob/5da7cb45/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
index 476cc75..4b234a7 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
@@ -21,18 +21,23 @@ package org.apache.reef.wake.remote;
import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* The default RemoteConfiguration.ErrorHandler.
*/
final class DefaultErrorHandler implements EventHandler<Throwable> {
+ private static final Logger LOG =
Logger.getLogger(DefaultErrorHandler.class.getName());
+
@Inject
- DefaultErrorHandler() {
+ private DefaultErrorHandler() {
}
@Override
public void onNext(final Throwable value) {
+ LOG.log(Level.SEVERE, "No error handler in RemoteManager", value);
throw new RuntimeException("No error handler bound for RemoteManager.",
value);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/5da7cb45/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
index 17b9b13..310de6d 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.wake.remote;
-
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Stage;
@@ -29,9 +28,6 @@ import
org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
*/
@DefaultImplementation(DefaultRemoteManagerImplementation.class)
public interface RemoteManager extends Stage {
- /**
- * Constructor that takes a Codec<T>
- */
/**
* Returns an event handler that can be used to send messages of type T to
the
http://git-wip-us.apache.org/repos/asf/reef/blob/5da7cb45/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index d525c51..6012ba1 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -52,60 +52,61 @@ public final class DefaultRemoteManagerImplementation
implements RemoteManager {
* The timeout used for the execute running in close().
*/
private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms
+
+ /**
+ * Indicates a hostname that isn't set or known.
+ */
+ public static final String UNKNOWN_HOST_NAME =
NettyMessagingTransport.UNKNOWN_HOST_NAME;
+
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
+
private final String name;
private final Transport transport;
private final RemoteSenderStage reSendStage;
private final EStage<TransportEvent> reRecvStage;
private final HandlerContainer handlerContainer;
- private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
+
private RemoteIdentifier myIdentifier;
- /**
- * Indicates a hostname that isn't set or known.
- */
- public static final String UNKNOWN_HOST_NAME =
NettyMessagingTransport.UNKNOWN_HOST_NAME;
@Inject
private <T> DefaultRemoteManagerImplementation(
- @Parameter(RemoteConfiguration.ManagerName.class) final String
name,
- @Parameter(RemoteConfiguration.HostAddress.class) final String
hostAddress,
- @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
- @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T>
codec,
- @Parameter(RemoteConfiguration.ErrorHandler.class) final
EventHandler<Throwable> errorHandler,
- @Parameter(RemoteConfiguration.OrderingGuarantee.class) final
boolean orderingGuarantee,
- @Parameter(RemoteConfiguration.NumberOfTries.class) final int
numberOfTries,
- @Parameter(RemoteConfiguration.RetryTimeout.class) final int
retryTimeout,
- final LocalAddressProvider localAddressProvider,
- final TransportFactory tpFactory,
- final TcpPortProvider tcpPortProvider) {
+ @Parameter(RemoteConfiguration.ManagerName.class) final String name,
+ @Parameter(RemoteConfiguration.HostAddress.class) final String
hostAddress,
+ @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
+ @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T>
codec,
+ @Parameter(RemoteConfiguration.ErrorHandler.class) final
EventHandler<Throwable> errorHandler,
+ @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean
orderingGuarantee,
+ @Parameter(RemoteConfiguration.NumberOfTries.class) final int
numberOfTries,
+ @Parameter(RemoteConfiguration.RetryTimeout.class) final int
retryTimeout,
+ final LocalAddressProvider localAddressProvider,
+ final TransportFactory tpFactory,
+ final TcpPortProvider tcpPortProvider) {
this.name = name;
this.handlerContainer = new HandlerContainer<>(name, codec);
this.reRecvStage = orderingGuarantee ?
- new OrderedRemoteReceiverStage(this.handlerContainer,
errorHandler) :
- new RemoteReceiverStage(this.handlerContainer, errorHandler,
10);
+ new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
+ new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
- this.transport = tpFactory.newInstance(
- hostAddress, listeningPort, this.reRecvStage,
this.reRecvStage, numberOfTries, retryTimeout,
- tcpPortProvider);
+ this.transport = tpFactory.newInstance(hostAddress, listeningPort,
+ this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout,
tcpPortProvider);
this.handlerContainer.setTransport(this.transport);
- this.myIdentifier = new SocketRemoteIdentifier(
- (InetSocketAddress) this.transport.getLocalAddress());
+ this.myIdentifier = new
SocketRemoteIdentifier((InetSocketAddress)this.transport.getLocalAddress());
this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
StageManager.instance().register(this);
- LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2}
listening on {3}:{4}. " +
- "Binding address provided by {5}",
- new Object[]{this.name, this.myIdentifier,
COUNTER.incrementAndGet(),
- this.transport.getLocalAddress().toString(),
- this.transport.getListeningPort(),
localAddressProvider}
- );
- }
+ final int counter = COUNTER.incrementAndGet();
+
+ LOG.log(Level.FINEST,
+ "RemoteManager {0} instantiated id {1} counter {2} listening on {3}
Binding address provided by {4}",
+ new Object[] {this.name, this.myIdentifier, counter,
this.transport.getLocalAddress(), localAddressProvider});
+ }
/**
* Returns a proxy event handler for a remote identifier and a message type.
@@ -116,7 +117,7 @@ public final class DefaultRemoteManagerImplementation
implements RemoteManager {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1}
messageType: {2}",
- new Object[]{this.name, destinationIdentifier,
messageType.getName()});
+ new Object[] {this.name, destinationIdentifier,
messageType.getName()});
}
return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier,
@@ -131,11 +132,12 @@ public final class DefaultRemoteManagerImplementation
implements RemoteManager {
public <T, U extends T> AutoCloseable registerHandler(
final RemoteIdentifier sourceIdentifier,
final Class<U> messageType, final EventHandler<T> theHandler) {
+
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2}
handler: {3}",
- new Object[]{this.name, sourceIdentifier, messageType.getName(),
- theHandler.getClass().getName()});
+ new Object[] {this.name, sourceIdentifier, messageType.getName(),
theHandler.getClass().getName()});
}
+
return this.handlerContainer.registerHandler(sourceIdentifier,
messageType, theHandler);
}
@@ -145,10 +147,12 @@ public final class DefaultRemoteManagerImplementation
implements RemoteManager {
@Override
public <T, U extends T> AutoCloseable registerHandler(
final Class<U> messageType, final EventHandler<RemoteMessage<T>>
theHandler) {
+
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}",
- new Object[]{this.name, messageType.getName(),
theHandler.getClass().getName()});
+ new Object[] {this.name, messageType.getName(),
theHandler.getClass().getName()});
}
+
return this.handlerContainer.registerHandler(messageType, theHandler);
}
@@ -162,10 +166,11 @@ public final class DefaultRemoteManagerImplementation
implements RemoteManager {
@Override
public void close() {
- if (closed.compareAndSet(false, true)) {
+
+ if (this.closed.compareAndSet(false, true)) {
LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}",
- new Object[]{this.name, this.myIdentifier});
+ new Object[] {this.name, this.myIdentifier});
final Runnable closeRunnable = new Runnable() {
@Override
@@ -194,12 +199,13 @@ public final class DefaultRemoteManagerImplementation
implements RemoteManager {
LOG.log(Level.SEVERE, "Unable to close the remote receiver stage",
e);
}
}
-
};
final ExecutorService closeExecutor =
Executors.newSingleThreadExecutor();
+
closeExecutor.submit(closeRunnable);
closeExecutor.shutdown();
+
if (!closeExecutor.isShutdown()) {
LOG.log(Level.SEVERE, "close executor did not shutdown properly.");
}
http://git-wip-us.apache.org/repos/asf/reef/blob/5da7cb45/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
index d972d03..26a7747 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
@@ -52,8 +52,12 @@ final class HandlerContainer<T> implements
EventHandler<RemoteEvent<byte[]>> {
private Transport transport;
HandlerContainer(final String name, final Codec<T> codec) {
+
this.name = name;
this.codec = codec;
+
+ LOG.log(Level.FINER, "Instantiated HandlerContainer {0} with codec {1}",
+ new String[] {this.name, this.codec.getClass().getCanonicalName()});
}
void setTransport(final Transport transport) {
@@ -141,7 +145,7 @@ final class HandlerContainer<T> implements
EventHandler<RemoteEvent<byte[]>> {
unsubscribeClass = new SubscriptionHandler.Unsubscriber<Class<? extends
T>>() {
@Override
public void unsubscribe(final Class<? extends T> token) {
- LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[]
{name, token.getName()});
+ LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[]
{name, token.getCanonicalName()});
msgTypeToHandlerMap.remove(token);
}
};
@@ -152,7 +156,7 @@ final class HandlerContainer<T> implements
EventHandler<RemoteEvent<byte[]>> {
@Override
public void unsubscribe(final Tuple2<RemoteIdentifier, Class<? extends
T>> token) {
LOG.log(Level.FINER, "Unsubscribe: {0} tuple {1},{2}",
- new Object[] {name, token.getT1(), token.getT2().getName()});
+ new Object[] {name, token.getT1(),
token.getT2().getCanonicalName()});
tupleToHandlerMap.remove(token);
}
};
@@ -168,41 +172,49 @@ final class HandlerContainer<T> implements
EventHandler<RemoteEvent<byte[]>> {
};
/**
- * Dispatches a message.
- *
- * @param value
+ * Dispatch message received from the remote to proper event handler.
+ * @param value Remote message, encoded as byte[].
*/
- @SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
@Override
+ @SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
public synchronized void onNext(final RemoteEvent<byte[]> value) {
- LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new
Object[]{this.name, value});
+ LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[]
{this.name, value});
final T decodedEvent = this.codec.decode(value.getEvent());
final Class<?> clazz = decodedEvent.getClass();
+ LOG.log(Level.FINEST, "RemoteManager: {0} decoded event {1} :: {2}",
+ new Object[] {this.name, clazz.getCanonicalName(), decodedEvent});
+
// check remote identifier and message type
- final SocketRemoteIdentifier id =
- new SocketRemoteIdentifier((InetSocketAddress) value.remoteAddress());
+ final SocketRemoteIdentifier id = new
SocketRemoteIdentifier((InetSocketAddress)value.remoteAddress());
final Tuple2<RemoteIdentifier, Class<?>> tuple = new
Tuple2<RemoteIdentifier, Class<?>>(id, clazz);
final EventHandler<T> tupleHandler = (EventHandler<T>)
this.tupleToHandlerMap.get(tuple);
+
if (tupleHandler != null) {
- LOG.log(Level.FINER, "Tuple handler: {0}", tuple);
+
+ LOG.log(Level.FINER, "Tuple handler: {0},{1}",
+ new Object[] {tuple.getT1(), tuple.getT2().getCanonicalName()});
+
tupleHandler.onNext(decodedEvent);
+
} else {
- final EventHandler<RemoteMessage<? extends T>> messageHandler =
- this.msgTypeToHandlerMap.get(clazz);
- if (messageHandler != null) {
- LOG.log(Level.FINER, "Message handler: {0}", clazz);
- messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));
- } else {
+
+ final EventHandler<RemoteMessage<? extends T>> messageHandler =
this.msgTypeToHandlerMap.get(clazz);
+
+ if (messageHandler == null) {
final RuntimeException ex = new RemoteRuntimeException(
- "Unknown message type in dispatch: " + clazz.getName() + " from "
+ id);
+ "Unknown message type in dispatch: " + clazz.getCanonicalName() +
" from " + id);
LOG.log(Level.WARNING, "Unknown message type in dispatch.", ex);
throw ex;
}
+
+ LOG.log(Level.FINER, "Message handler: {0}", clazz.getCanonicalName());
+
+ messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/5da7cb45/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
index 55adbc6..3ece567 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
@@ -29,7 +29,9 @@ public class RemoteEvent<T> {
private final T event;
private final long seq;
+
//private static final AtomicLong curSeq = new AtomicLong(0);
+
private SocketAddress localAddr;
private SocketAddress remoteAddr;
@@ -108,17 +110,8 @@ public class RemoteEvent<T> {
* @return a string representation of this object
*/
public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("RemoteEvent");
- builder.append(" localAddr=");
- builder.append(localAddr);
- builder.append(" remoteAddr=");
- builder.append(remoteAddr);
- builder.append(" seq=");
- builder.append(seq);
- builder.append(" event=");
- builder.append(event);
- return builder.toString();
+ return String.format(
+ "RemoteEvent localAddr=%s remoteAddr=%s seq=%d event=%s:%s",
+ this.localAddr, this.remoteAddr, this.seq,
this.event.getClass().getCanonicalName(), this.event);
}
-
}
http://git-wip-us.apache.org/repos/asf/reef/blob/5da7cb45/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
index 5f6b15e..8565944 100644
---
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
+++
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
@@ -22,11 +22,11 @@ import org.apache.reef.wake.remote.transport.Link;
import java.net.SocketAddress;
-
/**
* Event sent from a remote node.
*/
public class TransportEvent {
+
private final byte[] data;
private final SocketAddress localAddr;
private final SocketAddress remoteAddr;
@@ -65,6 +65,13 @@ public class TransportEvent {
}
}
+ @Override
+ public String toString() {
+ return String.format(
+ "TransportEvent: {local: %s remote: %s size: %d bytes}",
+ this.localAddr, this.remoteAddr, this.data.length);
+ }
+
/**
* Gets the data.
*
@@ -102,5 +109,4 @@ public class TransportEvent {
public SocketAddress getRemoteAddress() {
return remoteAddr;
}
-
}