Repository: reef Updated Branches: refs/heads/master 10678052c -> d3503ec1a
[REEF-1761] Race condition in NetworkMessagingTestService Summary of changes: * Fix the race condition in `NetworkMessagingTestService.MessageHandler.onNext()` * Disable benchmarking in `NetworkConnectionServiceTest` if logging is too detailed * Remove some excessive logging in `NetworkMessagingTestService` * Make output of `NetworkConnectionServiceMessage.toString()` shorter * Use proper assertions in `NetworkMessagingTestService` * Other minor refactoring JIRA: [REEF-1761](https://issues.apache.org/jira/browse/REEF-1761) Pull request: This closes #1280 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d3503ec1 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d3503ec1 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d3503ec1 Branch: refs/heads/master Commit: d3503ec1a98385269c5e243a719e21dc32dde61d Parents: 1067805 Author: Sergiy Matusevych <[email protected]> Authored: Thu Mar 30 17:50:42 2017 -0700 Committer: taegeonum <[email protected]> Committed: Sat Apr 1 13:55:52 2017 +0900 ---------------------------------------------------------------------- .../impl/NetworkConnectionServiceMessage.java | 12 ++------- .../network/NetworkConnectionServiceTest.java | 16 +++++++++++ .../reef/io/network/NetworkServiceTest.java | 28 +++++++++++++------- .../util/NetworkMessagingTestService.java | 27 +++++++++---------- 4 files changed, 49 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d3503ec1/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java index c0fea17..5e29e08 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java @@ -101,15 +101,7 @@ final class NetworkConnectionServiceMessage<T> implements Message<T> { * @return a string representation of this object */ public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("NSMessage"); - builder.append(" remoteID="); - builder.append(destId); - builder.append(" message=[| "); - for (final T message : messages) { - builder.append(message).append(" |"); - } - builder.append("]"); - return builder.toString(); + return String.format("%s[%s -> %s]: size %d", + this.getClass().getSimpleName(), this.srcId, this.destId, this.messages.size()); } } http://git-wip-us.apache.org/repos/asf/reef/blob/d3503ec1/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java index 75e2c5e..e65d904 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java @@ -28,6 +28,7 @@ import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.remote.Codec; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -179,7 +180,11 @@ public class NetworkConnectionServiceTest { */ @Test public void testMessagingNetworkConnServiceRate() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; for (final int size : messageSizes) { @@ -220,7 +225,11 @@ public class NetworkConnectionServiceTest { */ @Test public void testMessagingNetworkConnServiceRateDisjoint() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); + final BlockingQueue<Object> barrier = new LinkedBlockingQueue<>(); final int numThreads = 4; @@ -277,7 +286,11 @@ public class NetworkConnectionServiceTest { @Test public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; for (final int size : messageSizes) { @@ -331,6 +344,9 @@ public class NetworkConnectionServiceTest { */ @Test public void testMessagingNetworkConnServiceBatchingRate() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); final int batchSize = 1024 * 1024; http://git-wip-us.apache.org/repos/asf/reef/blob/d3503ec1/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java index 86acc86..5f16af0 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java @@ -38,6 +38,8 @@ import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; +import org.junit.Assert; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -147,6 +149,9 @@ public class NetworkServiceTest { */ @Test public void testMessagingNetworkServiceRate() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); final IdentifierFactory factory = new StringIdentifierFactory(); @@ -233,6 +238,9 @@ public class NetworkServiceTest { */ @Test public void testMessagingNetworkServiceRateDisjoint() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); final IdentifierFactory factory = new StringIdentifierFactory(); @@ -342,6 +350,9 @@ public class NetworkServiceTest { @Test public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); final IdentifierFactory factory = new StringIdentifierFactory(); @@ -444,6 +455,9 @@ public class NetworkServiceTest { */ @Test public void testMessagingNetworkServiceBatchingRate() throws Exception { + + Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST)); + LOG.log(Level.FINEST, name.getMethodName()); final IdentifierFactory factory = new StringIdentifierFactory(); @@ -535,7 +549,7 @@ public class NetworkServiceTest { private final String name; private final int expected; private final Monitor monitor; - private AtomicInteger count = new AtomicInteger(0); + private final AtomicInteger count = new AtomicInteger(0); MessageHandler(final String name, final Monitor monitor, final int expected) { this.name = name; @@ -546,17 +560,13 @@ public class NetworkServiceTest { @Override public void onNext(final Message<T> value) { - count.incrementAndGet(); + final int currentCount = count.incrementAndGet(); - LOG.log(Level.FINEST, - "OUT: {0} received {1} from {2} to {3}", - new Object[]{name, value.getData(), value.getSrcId(), value.getDestId()}); + LOG.log(Level.FINER, "{0} Message {1}/{2} :: {3}", new Object[] {name, currentCount, expected, value}); - for (final T obj : value.getData()) { - LOG.log(Level.FINEST, "OUT: data: {0}", obj); - } + Assert.assertTrue(currentCount <= expected); - if (count.get() == expected) { + if (currentCount >= expected) { monitor.mnotify(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/d3503ec1/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java index 59fd0c8..7308bb0 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java @@ -34,6 +34,7 @@ import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.remote.Codec; import org.apache.reef.wake.remote.transport.LinkListener; +import org.junit.Assert; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; @@ -99,11 +100,12 @@ public final class NetworkMessagingTestService implements AutoCloseable { } public static final class MessageHandler<T> implements EventHandler<Message<T>> { + private final int expected; private final Monitor monitor; private final Identifier expectedSrcId; private final Identifier expectedDestId; - private AtomicInteger count = new AtomicInteger(0); + private final AtomicInteger count = new AtomicInteger(0); public MessageHandler(final Monitor monitor, final int expected, @@ -117,20 +119,15 @@ public final class NetworkMessagingTestService implements AutoCloseable { @Override public void onNext(final Message<T> value) { - count.incrementAndGet(); - LOG.log(Level.FINE, "Count: {0}", count.get()); - LOG.log(Level.FINE, - "OUT: {0} received {1} from {2} to {3}", - new Object[]{value, value.getSrcId(), value.getDestId()}); - - for (final T obj : value.getData()) { - LOG.log(Level.FINE, "OUT: data: {0}", obj); - } - assert value.getSrcId().equals(expectedSrcId); - assert value.getDestId().equals(expectedDestId); + final int currentCount = count.incrementAndGet(); + LOG.log(Level.FINER, "Message {0}/{1} :: {2}", new Object[] {currentCount, expected, value}); + + Assert.assertEquals(expectedSrcId, value.getSrcId()); + Assert.assertEquals(expectedDestId, value.getDestId()); + Assert.assertTrue(currentCount <= expected); - if (count.get() == expected) { + if (currentCount >= expected) { monitor.mnotify(); } } @@ -139,11 +136,11 @@ public final class NetworkMessagingTestService implements AutoCloseable { public static final class TestListener<T> implements LinkListener<Message<T>> { @Override public void onSuccess(final Message<T> message) { - LOG.log(Level.FINE, "success: " + message); + LOG.log(Level.FINER, "Success: message {0}", message); } @Override public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) { - LOG.log(Level.WARNING, "exception: " + cause + message); + LOG.log(Level.WARNING, "Exception: message " + message, cause); throw new RuntimeException(cause); } }
