Repository: incubator-reef
Updated Branches:
  refs/heads/master 25735bfa5 -> bc9f5b54d


[REEF-480] Close resources in NetworkServiceTest

  * Fixed NetworkMessagingTest.close() to close receiverResolver and
    senderResolver
  * Used try-with-resoureces
  * Renamed NetworkMessagingTest NetworkMessagingTestService
  * Changed NetworkConnectionServiceTest to catch NetworkException and
    throw RuntimeException

JIRA:
  [REEF-480](https://issues.apache.org/jira/browse/REEF-480)

Pull Request:
  This closes #301


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/bc9f5b54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/bc9f5b54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/bc9f5b54

Branch: refs/heads/master
Commit: bc9f5b54db4ab2dc39d189556133593a66d625c3
Parents: 25735bf
Author: Gyeongin Yu <[email protected]>
Authored: Fri Jul 17 19:22:42 2015 +0900
Committer: Markus Weimer <[email protected]>
Committed: Wed Jul 22 10:52:47 2015 -0700

----------------------------------------------------------------------
 .../reef/services/network/NamingTest.java       | 350 +++++-----
 .../network/NetworkConnectionServiceTest.java   | 329 +++++-----
 .../services/network/NetworkServiceTest.java    | 649 +++++++++----------
 .../network/util/NetworkMessagingTest.java      | 139 ----
 .../util/NetworkMessagingTestService.java       | 146 +++++
 5 files changed, 803 insertions(+), 810 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
index 462390e..b726b78 100644
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -96,33 +96,32 @@ public class NamingTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    this.port = server.getPort();
-    for (final Identifier id : idToAddrMap.keySet()) {
-      server.register(id, idToAddrMap.get(id));
-    }
-
-    // run a client
-    final NameLookupClient client = new NameLookupClient(localAddress, 
this.port,
-        10000, this.factory, retryCount, retryTimeout, new 
NameCache(this.TTL), this.localAddressProvider);
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      this.port = server.getPort();
+      for (final Identifier id : idToAddrMap.keySet()) {
+        server.register(id, idToAddrMap.get(id));
+      }
 
-    final Identifier id1 = this.factory.getNewInstance("task1");
-    final Identifier id2 = this.factory.getNewInstance("task2");
+      // run a client
+      try (final NameLookupClient client = new NameLookupClient(localAddress, 
this.port,
+          10000, this.factory, retryCount, retryTimeout, new 
NameCache(this.TTL), this.localAddressProvider)) {
 
-    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, 
InetSocketAddress>();
-    InetSocketAddress addr1 = client.lookup(id1);
-    respMap.put(id1, addr1);
-    InetSocketAddress addr2 = client.lookup(id2);
-    respMap.put(id2, addr2);
+        final Identifier id1 = this.factory.getNewInstance("task1");
+        final Identifier id2 = this.factory.getNewInstance("task2");
 
-    for (final Identifier id : respMap.keySet()) {
-      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
-    }
+        final Map<Identifier, InetSocketAddress> respMap = new 
HashMap<Identifier, InetSocketAddress>();
+        InetSocketAddress addr1 = client.lookup(id1);
+        respMap.put(id1, addr1);
+        InetSocketAddress addr2 = client.lookup(id2);
+        respMap.put(id2, addr2);
 
-    Assert.assertTrue(isEqual(idToAddrMap, respMap));
+        for (final Identifier id : respMap.keySet()) {
+          LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
+        }
 
-    client.close();
-    server.close();
+        Assert.assertTrue(isEqual(idToAddrMap, respMap));
+      }
+    }
   }
 
   /**
@@ -151,76 +150,75 @@ public class NamingTest {
       final Injector injector = Tang.Factory.getTang().newInjector();
       
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
       injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-      final NameServer server = injector.getInstance(NameServer.class);
-      this.port = server.getPort();
-      for (final Identifier id : idToAddrMap.keySet()) {
-        server.register(id, idToAddrMap.get(id));
-      }
-
-      // run a client
-      final NameLookupClient client = new NameLookupClient(localAddress, 
this.port,
-          10000, this.factory, retryCount, retryTimeout, new 
NameCache(this.TTL), this.localAddressProvider);
-
-      final Identifier id1 = this.factory.getNewInstance("task1");
-      final Identifier id2 = this.factory.getNewInstance("task2");
-      final Identifier id3 = this.factory.getNewInstance("task3");
-
-      final ExecutorService e = Executors.newCachedThreadPool();
-
-      final ConcurrentMap<Identifier, InetSocketAddress> respMap = new 
ConcurrentHashMap<Identifier, InetSocketAddress>();
-
-      final Future<?> f1 = e.submit(new Runnable() {
-        @Override
-        public void run() {
-          InetSocketAddress addr = null;
-          try {
-            addr = client.lookup(id1);
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Lookup failed", e);
-            Assert.fail(e.toString());
-          }
-          respMap.put(id1, addr);
-        }
-      });
-      final Future<?> f2 = e.submit(new Runnable() {
-        @Override
-        public void run() {
-          InetSocketAddress addr = null;
-          try {
-            addr = client.lookup(id2);
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Lookup failed", e);
-            Assert.fail(e.toString());
-          }
-          respMap.put(id2, addr);
-        }
-      });
-      final Future<?> f3 = e.submit(new Runnable() {
-        @Override
-        public void run() {
-          InetSocketAddress addr = null;
-          try {
-            addr = client.lookup(id3);
-          } catch (final Exception e) {
-            LOG.log(Level.SEVERE, "Lookup failed", e);
-            Assert.fail(e.toString());
-          }
-          respMap.put(id3, addr);
+      try (final NameServer server = injector.getInstance(NameServer.class)) {
+        this.port = server.getPort();
+        for (final Identifier id : idToAddrMap.keySet()) {
+          server.register(id, idToAddrMap.get(id));
         }
-      });
 
-      f1.get();
-      f2.get();
-      f3.get();
+        // run a client
+        try (final NameLookupClient client = new 
NameLookupClient(localAddress, this.port,
+            10000, this.factory, retryCount, retryTimeout, new 
NameCache(this.TTL), this.localAddressProvider)) {
+
+          final Identifier id1 = this.factory.getNewInstance("task1");
+          final Identifier id2 = this.factory.getNewInstance("task2");
+          final Identifier id3 = this.factory.getNewInstance("task3");
+
+          final ExecutorService e = Executors.newCachedThreadPool();
+
+          final ConcurrentMap<Identifier, InetSocketAddress> respMap = new 
ConcurrentHashMap<Identifier, InetSocketAddress>();
+
+          final Future<?> f1 = e.submit(new Runnable() {
+            @Override
+            public void run() {
+              InetSocketAddress addr = null;
+              try {
+                addr = client.lookup(id1);
+              } catch (final Exception e) {
+                LOG.log(Level.SEVERE, "Lookup failed", e);
+                Assert.fail(e.toString());
+              }
+              respMap.put(id1, addr);
+            }
+          });
+          final Future<?> f2 = e.submit(new Runnable() {
+            @Override
+            public void run() {
+              InetSocketAddress addr = null;
+              try {
+                addr = client.lookup(id2);
+              } catch (final Exception e) {
+                LOG.log(Level.SEVERE, "Lookup failed", e);
+                Assert.fail(e.toString());
+              }
+              respMap.put(id2, addr);
+            }
+          });
+          final Future<?> f3 = e.submit(new Runnable() {
+            @Override
+            public void run() {
+              InetSocketAddress addr = null;
+              try {
+                addr = client.lookup(id3);
+              } catch (final Exception e) {
+                LOG.log(Level.SEVERE, "Lookup failed", e);
+                Assert.fail(e.toString());
+              }
+              respMap.put(id3, addr);
+            }
+          });
+
+          f1.get();
+          f2.get();
+          f3.get();
+
+          for (final Identifier id : respMap.keySet()) {
+            LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
+          }
 
-      for (final Identifier id : respMap.keySet()) {
-        LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
+          Assert.assertTrue(isEqual(idToAddrMap, respMap));
+        }
       }
-
-      Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
-      client.close();
-      server.close();
     }
   }
 
@@ -237,55 +235,55 @@ public class NamingTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    this.port = server.getPort();
-    final String localAddress = localAddressProvider.getLocalAddress();
-
-    // names to start with
-    final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
-    idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
-    idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      this.port = server.getPort();
+      final String localAddress = localAddressProvider.getLocalAddress();
 
-    // registration
-    // invoke registration from the client side
-    final NameRegistryClient client = new NameRegistryClient(localAddress, 
this.port, this.factory, this.localAddressProvider);
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.register(id, idToAddrMap.get(id));
-    }
+      // names to start with
+      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
+      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
+      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
 
-    // wait
-    final Set<Identifier> ids = idToAddrMap.keySet();
-    busyWait(server, ids.size(), ids);
+      // registration
+      // invoke registration from the client side
+      try (final NameRegistryClient client = new 
NameRegistryClient(localAddress,
+          this.port, this.factory, this.localAddressProvider)) {
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.register(id, idToAddrMap.get(id));
+        }
 
-    // check the server side 
-    Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, 
InetSocketAddress>();
-    Iterable<NameAssignment> nas = server.lookup(ids);
+        // wait
+        final Set<Identifier> ids = idToAddrMap.keySet();
+        busyWait(server, ids.size(), ids);
 
-    for (final NameAssignment na : nas) {
-      LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
-          new Object[]{na.getIdentifier(), na.getAddress()});
-      serverMap.put(na.getIdentifier(), na.getAddress());
-    }
+        // check the server side
+        Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, 
InetSocketAddress>();
+        Iterable<NameAssignment> nas = server.lookup(ids);
 
-    Assert.assertTrue(isEqual(idToAddrMap, serverMap));
+        for (final NameAssignment na : nas) {
+          LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
+              new Object[]{na.getIdentifier(), na.getAddress()});
+          serverMap.put(na.getIdentifier(), na.getAddress());
+        }
 
-    // un-registration
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.unregister(id);
-    }
+        Assert.assertTrue(isEqual(idToAddrMap, serverMap));
 
-    // wait
-    busyWait(server, 0, ids);
+        // un-registration
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.unregister(id);
+        }
 
-    serverMap = new HashMap<Identifier, InetSocketAddress>();
-    nas = server.lookup(ids);
-    for (final NameAssignment na : nas)
-      serverMap.put(na.getIdentifier(), na.getAddress());
+        // wait
+        busyWait(server, 0, ids);
 
-    Assert.assertEquals(0, serverMap.size());
+        serverMap = new HashMap<Identifier, InetSocketAddress>();
+        nas = server.lookup(ids);
+        for (final NameAssignment na : nas)
+          serverMap.put(na.getIdentifier(), na.getAddress());
 
-    client.close();
-    server.close();
+        Assert.assertEquals(0, serverMap.size());
+      }
+    }
   }
 
   /**
@@ -302,66 +300,66 @@ public class NamingTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    this.port = server.getPort();
-
-    final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
-    idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
-    idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      this.port = server.getPort();
 
-    // registration
-    // invoke registration from the client side
-    Configuration nameResolverConf = NameResolverConfiguration.CONF
-        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
-        .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port)
-        .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL)
-        .set(NameResolverConfiguration.RETRY_TIMEOUT, retryTimeout)
-        .set(NameResolverConfiguration.RETRY_COUNT, retryCount)
-        .build();
-
-    final NameResolver client = 
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class);
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.register(id, idToAddrMap.get(id));
-    }
+      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
+      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
+      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
 
-    // wait
-    final Set<Identifier> ids = idToAddrMap.keySet();
-    busyWait(server, ids.size(), ids);
+      // registration
+      // invoke registration from the client side
+      Configuration nameResolverConf = NameResolverConfiguration.CONF
+          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+          .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port)
+          .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL)
+          .set(NameResolverConfiguration.RETRY_TIMEOUT, retryTimeout)
+          .set(NameResolverConfiguration.RETRY_COUNT, retryCount)
+          .build();
+
+      try (final NameResolver client
+               = 
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
 {
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.register(id, idToAddrMap.get(id));
+        }
 
-    // lookup
-    final Identifier id1 = this.factory.getNewInstance("task1");
-    final Identifier id2 = this.factory.getNewInstance("task2");
+        // wait
+        final Set<Identifier> ids = idToAddrMap.keySet();
+        busyWait(server, ids.size(), ids);
 
-    final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, 
InetSocketAddress>();
-    InetSocketAddress addr1 = client.lookup(id1);
-    respMap.put(id1, addr1);
-    InetSocketAddress addr2 = client.lookup(id2);
-    respMap.put(id2, addr2);
+        // lookup
+        final Identifier id1 = this.factory.getNewInstance("task1");
+        final Identifier id2 = this.factory.getNewInstance("task2");
 
-    for (final Identifier id : respMap.keySet()) {
-      LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
-    }
+        final Map<Identifier, InetSocketAddress> respMap = new 
HashMap<Identifier, InetSocketAddress>();
+        InetSocketAddress addr1 = client.lookup(id1);
+        respMap.put(id1, addr1);
+        InetSocketAddress addr2 = client.lookup(id2);
+        respMap.put(id2, addr2);
 
-    Assert.assertTrue(isEqual(idToAddrMap, respMap));
+        for (final Identifier id : respMap.keySet()) {
+          LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
+        }
 
-    // un-registration
-    for (final Identifier id : idToAddrMap.keySet()) {
-      client.unregister(id);
-    }
+        Assert.assertTrue(isEqual(idToAddrMap, respMap));
 
-    // wait
-    busyWait(server, 0, ids);
+        // un-registration
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.unregister(id);
+        }
 
-    final Map<Identifier, InetSocketAddress> serverMap = new 
HashMap<Identifier, InetSocketAddress>();
-    addr1 = server.lookup(id1);
-    if (addr1 != null) serverMap.put(id1, addr1);
-    addr2 = server.lookup(id1);
-    if (addr2 != null) serverMap.put(id2, addr2);
+        // wait
+        busyWait(server, 0, ids);
 
-    Assert.assertEquals(0, serverMap.size());
+        final Map<Identifier, InetSocketAddress> serverMap = new 
HashMap<Identifier, InetSocketAddress>();
+        addr1 = server.lookup(id1);
+        if (addr1 != null) serverMap.put(id1, addr1);
+        addr2 = server.lookup(id1);
+        if (addr2 != null) serverMap.put(id2, addr2);
 
-    client.close();
-    server.close();
+        Assert.assertEquals(0, serverMap.size());
+      }
+    }
   }
 
   private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
index ddf4bfd..3cb1310 100644
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
@@ -64,23 +64,23 @@ public class NetworkConnectionServiceTest {
   private void runMessagingNetworkConnectionService(Codec<String> codec) 
throws Exception {
     final int numMessages = 2000;
     final Monitor monitor = new Monitor();
-    final NetworkMessagingTest messagingTest = new 
NetworkMessagingTest(localAddress);
-    messagingTest.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-
-    final Connection<String> conn = 
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-    try {
-      conn.open();
-      for (int count = 0; count < numMessages; ++count) {
-        // send messages to the receiver.
-        conn.write("hello" + count);
+    try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+      try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+        try {
+          conn.open();
+          for (int count = 0; count < numMessages; ++count) {
+            // send messages to the receiver.
+            conn.write("hello" + count);
+          }
+          monitor.mwait();
+        } catch (NetworkException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
       }
-      monitor.mwait();
-    } catch (NetworkException e) {
-      e.printStackTrace();
     }
-
-    conn.close();
-    messagingTest.close();
   }
 
   /**
@@ -106,56 +106,52 @@ public class NetworkConnectionServiceTest {
 
     final int groupcommMessages = 1000;
     final Monitor monitor = new Monitor();
-    final NetworkMessagingTest messagingTest = new 
NetworkMessagingTest(localAddress);
+    try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
 
-    messagingTest.registerTestConnectionFactory(groupCommClientId, 
groupcommMessages, monitor, stringCodec);
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
groupcommMessages, monitor, stringCodec);
 
-    final int shuffleMessges = 2000;
-    final Monitor monitor2 = new Monitor();
-    messagingTest.registerTestConnectionFactory(shuffleClientId, 
shuffleMessges, monitor2, integerCodec);
+      final int shuffleMessages = 2000;
+      final Monitor monitor2 = new Monitor();
+      messagingTestService.registerTestConnectionFactory(shuffleClientId, 
shuffleMessages, monitor2, integerCodec);
 
-    executor.submit(new Runnable() {
-      @Override
-      public void run() {
-        final Connection<String> conn = 
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-        try {
-          conn.open();
-          for (int count = 0; count < groupcommMessages; ++count) {
-            // send messages to the receiver.
-            conn.write("hello" + count);
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+            conn.open();
+            for (int count = 0; count < groupcommMessages; ++count) {
+              // send messages to the receiver.
+              conn.write("hello" + count);
+            }
+            monitor.mwait();
+          } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
           }
-          monitor.mwait();
-          conn.close();
-        } catch (Exception e) {
-          e.printStackTrace();
-          throw new RuntimeException(e);
         }
-      }
-    });
+      });
 
-    executor.submit(new Runnable() {
-      @Override
-      public void run() {
-        final Connection<Integer> conn = 
messagingTest.getConnectionFromSenderToReceiver(shuffleClientId);
-        try {
-          conn.open();
-          for (int count = 0; count < shuffleMessges; ++count) {
-            // send messages to the receiver.
-            conn.write(count);
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<Integer> conn = 
messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) {
+            conn.open();
+            for (int count = 0; count < shuffleMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(count);
+            }
+            monitor2.mwait();
+          } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
           }
-          monitor2.mwait();
-          conn.close();
-        } catch (Exception e) {
-          e.printStackTrace();
-          throw new RuntimeException(e);
         }
-      }
-    });
+      });
 
-    monitor.mwait();
-    monitor2.mwait();
-    executor.shutdown();
-    messagingTest.close();
+      monitor.mwait();
+      monitor2.mwait();
+      executor.shutdown();
+    }
   }
 
   /**
@@ -188,33 +184,36 @@ public class NetworkConnectionServiceTest {
       final int numMessages = 300000 / (Math.max(1, size / 512));
       final Monitor monitor = new Monitor();
       final Codec<String> codec = new StringCodec();
-      final NetworkMessagingTest messagingTest = new 
NetworkMessagingTest(localAddress);
-      messagingTest.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-      final Connection<String> conn = 
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-
-      // build the message
-      final StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      final String message = msb.toString();
-
-      long start = System.currentTimeMillis();
-      try {
-        conn.open();
-        for (int count = 0; count < numMessages; ++count) {
-          // send messages to the receiver.
-          conn.write(message);
+      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
         }
-        monitor.mwait();
-      } catch (NetworkException e) {
-        e.printStackTrace();
-      }
-      final long end = System.currentTimeMillis();
+        final String message = msb.toString();
+
+        try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          long start = System.currentTimeMillis();
+          try {
+            conn.open();
+            for (int count = 0; count < numMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(message);
+            }
+            monitor.mwait();
+          } catch (NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          final long end = System.currentTimeMillis();
 
-      final double runtime = ((double) end - start) / 1000;
-      LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / 
runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime);// x2 for unicode chars
-      messagingTest.close();
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages 
/ runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime);// x2 for unicode chars
+        }
+      }
     }
   }
 
@@ -237,31 +236,32 @@ public class NetworkConnectionServiceTest {
 
       e.submit(new Runnable() {
         public void run() {
-          try {
+          try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
             final Monitor monitor = new Monitor();
             final Codec<String> codec = new StringCodec();
-            final NetworkMessagingTest messagingTest = new 
NetworkMessagingTest(localAddress);
-            messagingTest.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-            final Connection<String> conn = 
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-
-            // build the message
-            final StringBuilder msb = new StringBuilder();
-            for (int i = 0; i < size; i++) {
-              msb.append("1");
-            }
-            final String message = msb.toString();
 
-            try {
-              conn.open();
-              for (int count = 0; count < numMessages; ++count) {
-                // send messages to the receiver.
-                conn.write(message);
+            
messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+            try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+              // build the message
+              final StringBuilder msb = new StringBuilder();
+              for (int i = 0; i < size; i++) {
+                msb.append("1");
+              }
+              final String message = msb.toString();
+
+              try {
+                conn.open();
+                for (int count = 0; count < numMessages; ++count) {
+                  // send messages to the receiver.
+                  conn.write(message);
+                }
+                monitor.mwait();
+              } catch (NetworkException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
               }
-              monitor.mwait();
-            } catch (NetworkException e) {
-              e.printStackTrace();
             }
-            messagingTest.close();
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
@@ -291,46 +291,46 @@ public class NetworkConnectionServiceTest {
       final int totalNumMessages = numMessages * numThreads;
       final Monitor monitor = new Monitor();
       final Codec<String> codec = new StringCodec();
-      final NetworkMessagingTest messagingTest = new 
NetworkMessagingTest(localAddress);
-      messagingTest.registerTestConnectionFactory(groupCommClientId, 
totalNumMessages, monitor, codec);
-
-      final ExecutorService e = Executors.newCachedThreadPool();
+      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
totalNumMessages, monitor, codec);
 
-      // build the message
-      final StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      final String message = msb.toString();
-      final Connection<String> conn = 
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+        final ExecutorService e = Executors.newCachedThreadPool();
 
-      final long start = System.currentTimeMillis();
-      for (int i = 0; i < numThreads; i++) {
-        e.submit(new Runnable() {
-          @Override
-          public void run() {
-
-              try {
-                conn.open();
-                for (int count = 0; count < numMessages; ++count) {
-                  // send messages to the receiver.
-                  conn.write(message);
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
+        }
+        final String message = msb.toString();
+        try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          final long start = System.currentTimeMillis();
+          for (int i = 0; i < numThreads; i++) {
+            e.submit(new Runnable() {
+              @Override
+              public void run() {
+
+                try {
+                  conn.open();
+                  for (int count = 0; count < numMessages; ++count) {
+                    // send messages to the receiver.
+                    conn.write(message);
+                  }
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
                 }
-              } catch (Exception e) {
-                throw new RuntimeException(e);
               }
+            });
           }
-        });
-      }
 
-      e.shutdown();
-      e.awaitTermination(30, TimeUnit.SECONDS);
-      monitor.mwait();
-      final long end = System.currentTimeMillis();
-      final double runtime = ((double) end - start) / 1000;
-      LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) 
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
-      conn.close();
-      messagingTest.close();
+          e.shutdown();
+          e.awaitTermination(30, TimeUnit.SECONDS);
+          monitor.mwait();
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) 
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+        }
+      }
     }
   }
 
@@ -348,38 +348,39 @@ public class NetworkConnectionServiceTest {
       final int numMessages = 300 / (Math.max(1, size / 512));
       final Monitor monitor = new Monitor();
       final Codec<String> codec = new StringCodec();
-      final NetworkMessagingTest messagingTest = new 
NetworkMessagingTest(localAddress);
-      messagingTest.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-      final Connection<String> conn = 
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-
-      // build the message
-      final StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      final String message = msb.toString();
-
-      final long start = System.currentTimeMillis();
-      try {
-        for (int i = 0; i < numMessages; i++) {
-          final StringBuilder sb = new StringBuilder();
-          for (int j = 0; j < batchSize / size; j++) {
-            sb.append(message);
+      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+        try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          // build the message
+          final StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
           }
-          conn.open();
-          conn.write(sb.toString());
+          final String message = msb.toString();
+
+          final long start = System.currentTimeMillis();
+          try {
+            for (int i = 0; i < numMessages; i++) {
+              final StringBuilder sb = new StringBuilder();
+              for (int j = 0; j < batchSize / size; j++) {
+                sb.append(message);
+              }
+              conn.open();
+              conn.write(sb.toString());
+            }
+            monitor.mwait();
+          } catch (NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          final long numAppMessages = numMessages * batchSize / size;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 
2 * size) / runtime);// x2 for unicode chars
         }
-        monitor.mwait();
-      } catch (NetworkException e) {
-        e.printStackTrace();
-        throw new RuntimeException(e);
       }
-
-      final long end = System.currentTimeMillis();
-      final double runtime = ((double) end - start) / 1000;
-      final long numAppMessages = numMessages * batchSize / size;
-      LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages 
/ runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime);// x2 for unicode chars
-      messagingTest.close();
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index 2bb75ae..2b96c11 100644
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -75,59 +75,57 @@ public class NetworkServiceTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    int nameServerPort = server.getPort();
-
-    final int numMessages = 10;
-    final Monitor monitor = new Monitor();
-
-    LOG.log(Level.FINEST, "=== Test network service receiver start");
-    // network service
-    final String name2 = "task2";
-    final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
-        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-        .build())
-        .build();
-
-    final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-    final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-
-    NetworkService<String> ns2 = new NetworkService<String>(
-        factory, 0, nameResolver,
-        new StringCodec(), new MessagingTransportFactory(localAddressProvider),
-        new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
-    ns2.registerId(factory.getNewInstance(name2));
-    final int port2 = ns2.getTransport().getListeningPort();
-    server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
-
-    LOG.log(Level.FINEST, "=== Test network service sender start");
-    final String name1 = "task1";
-    final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, 
nameResolver,
-        new StringCodec(), new MessagingTransportFactory(localAddressProvider),
-        new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), 
localAddressProvider);
-    ns1.registerId(factory.getNewInstance(name1));
-    final int port1 = ns1.getTransport().getListeningPort();
-    server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-    final Identifier destId = factory.getNewInstance(name2);
-    final Connection<String> conn = ns1.newConnection(destId);
-    try {
-      conn.open();
-      for (int count = 0; count < numMessages; ++count) {
-        conn.write("hello! " + count);
-      }
-      monitor.mwait();
 
-    } catch (NetworkException e) {
-      e.printStackTrace();
-    }
-    conn.close();
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      int nameServerPort = server.getPort();
+
+      final int numMessages = 10;
+      final Monitor monitor = new Monitor();
+
+      // network service
+      final String name2 = "task2";
+      final String name1 = "task1";
+      final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+          .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+          .build())
+          .build();
+
+      final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+      LOG.log(Level.FINEST, "=== Test network service receiver start");
+      LOG.log(Level.FINEST, "=== Test network service sender start");
+      try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+           NetworkService<String> ns2 = new NetworkService<String>(factory, 0, 
nameResolver,
+               new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+               new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
+           final NetworkService<String> ns1 = new 
NetworkService<String>(factory, 0, nameResolver,
+               new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+               new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+        ns2.registerId(factory.getNewInstance(name2));
+        final int port2 = ns2.getTransport().getListeningPort();
+        server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+
+        ns1.registerId(factory.getNewInstance(name1));
+        final int port1 = ns1.getTransport().getListeningPort();
+        server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
 
-    ns1.close();
-    ns2.close();
+        final Identifier destId = factory.getNewInstance(name2);
 
-    server.close();
+        try (final Connection<String> conn = ns1.newConnection(destId)) {
+          conn.open();
+          for (int count = 0; count < numMessages; ++count) {
+            conn.write("hello! " + count);
+          }
+          monitor.mwait();
+
+        } catch (NetworkException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    }
   }
 
   /**
@@ -142,75 +140,71 @@ public class NetworkServiceTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    int nameServerPort = server.getPort();
-
-    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
-
-    for (int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-
-      LOG.log(Level.FINEST, "=== Test network service receiver start");
-      // network service
-      final String name2 = "task2";
-      final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
-          .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-          .build())
-          .build();
 
-      final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-      final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-
-      NetworkService<String> ns2 = new NetworkService<String>(
-          factory, 0, nameResolver,
-          new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-          new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
-      ns2.registerId(factory.getNewInstance(name2));
-      final int port2 = ns2.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
-
-      LOG.log(Level.FINEST, "=== Test network service sender start");
-      final String name1 = "task1";
-      NetworkService<String> ns1 = new NetworkService<String>(
-          factory, 0, nameResolver,
-          new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-          new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), 
localAddressProvider);
-      ns1.registerId(factory.getNewInstance(name1));
-      final int port1 = ns1.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-      Identifier destId = factory.getNewInstance(name2);
-      Connection<String> conn = ns1.newConnection(destId);
-
-      // build the message
-      StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      String message = msb.toString();
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      int nameServerPort = server.getPort();
+
+      final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+      for (int size : messageSizes) {
+        final int numMessages = 300000 / (Math.max(1, size / 512));
+        final Monitor monitor = new Monitor();
+
+        // network service
+        final String name2 = "task2";
+        final String name1 = "task1";
+        final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+            .build())
+            .build();
+
+        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+        LOG.log(Level.FINEST, "=== Test network service receiver start");
+        LOG.log(Level.FINEST, "=== Test network service sender start");
+        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
+             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+          ns2.registerId(factory.getNewInstance(name2));
+          final int port2 = ns2.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+
+          ns1.registerId(factory.getNewInstance(name1));
+          final int port1 = ns1.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
+
+          Identifier destId = factory.getNewInstance(name2);
+
+          // build the message
+          StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
+          }
+          String message = msb.toString();
 
-      long start = System.currentTimeMillis();
-      try {
-        for (int i = 0; i < numMessages; i++) {
-          conn.open();
-          conn.write(message);
+          long start = System.currentTimeMillis();
+          try (Connection<String> conn = ns1.newConnection(destId)) {
+            for (int i = 0; i < numMessages; i++) {
+              conn.open();
+              conn.write(message);
+            }
+            monitor.mwait();
+          } catch (NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          long end = System.currentTimeMillis();
+          double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * 
size) / runtime);// x2 for unicode chars
         }
-        monitor.mwait();
-      } catch (NetworkException e) {
-        e.printStackTrace();
       }
-      long end = System.currentTimeMillis();
-      double runtime = ((double) end - start) / 1000;
-      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / 
runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime);// x2 for unicode chars
-      conn.close();
-
-      ns1.close();
-      ns2.close();
     }
-
-    server.close();
   }
 
   /**
@@ -225,98 +219,95 @@ public class NetworkServiceTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    final int nameServerPort = server.getPort();
-
-    BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
-
-    int numThreads = 4;
-    final int size = 2000;
-    final int numMessages = 300000 / (Math.max(1, size / 512));
-    final int totalNumMessages = numMessages * numThreads;
-
-    ExecutorService e = Executors.newCachedThreadPool();
-    for (int t = 0; t < numThreads; t++) {
-      final int tt = t;
-
-      e.submit(new Runnable() {
-        public void run() {
-          try {
-            Monitor monitor = new Monitor();
-
-            LOG.log(Level.FINEST, "=== Test network service receiver start");
-            // network service
-            final String name2 = "task2-" + tt;
-            final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-                .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
localAddress)
-                .set(NameResolverConfiguration.NAME_SERVICE_PORT, 
nameServerPort)
-                .build())
-                .build();
-
-            final Injector injector = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-            final NameResolver nameResolver = 
injector.getInstance(NameResolver.class);
-            NetworkService<String> ns2 = new NetworkService<String>(
-                factory, 0, nameResolver,
-                new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
-            ns2.registerId(factory.getNewInstance(name2));
-            final int port2 = ns2.getTransport().getListeningPort();
-            server.register(factory.getNewInstance(name2), new 
InetSocketAddress(localAddress, port2));
-
-            LOG.log(Level.FINEST, "=== Test network service sender start");
-            final String name1 = "task1-" + tt;
-            NetworkService<String> ns1 = new NetworkService<String>(
-                factory, 0, nameResolver,
-                new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider);
-            ns1.registerId(factory.getNewInstance(name1));
-            final int port1 = ns1.getTransport().getListeningPort();
-            server.register(factory.getNewInstance(name1), new 
InetSocketAddress(localAddress, port1));
-
-            Identifier destId = factory.getNewInstance(name2);
-            Connection<String> conn = ns1.newConnection(destId);
 
-            // build the message
-            StringBuilder msb = new StringBuilder();
-            for (int i = 0; i < size; i++) {
-              msb.append("1");
-            }
-            String message = msb.toString();
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int nameServerPort = server.getPort();
+
+      BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
 
+      int numThreads = 4;
+      final int size = 2000;
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final int totalNumMessages = numMessages * numThreads;
+
+      ExecutorService e = Executors.newCachedThreadPool();
+      for (int t = 0; t < numThreads; t++) {
+        final int tt = t;
 
+        e.submit(new Runnable() {
+          @Override
+          public void run() {
             try {
-              for (int i = 0; i < numMessages; i++) {
-                conn.open();
-                conn.write(message);
+              Monitor monitor = new Monitor();
+
+              // network service
+              final String name2 = "task2-" + tt;
+              final String name1 = "task1-" + tt;
+              final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+                  .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
localAddress)
+                  .set(NameResolverConfiguration.NAME_SERVICE_PORT, 
nameServerPort)
+                  .build())
+                  .build();
+
+              final Injector injector = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+              LOG.log(Level.FINEST, "=== Test network service receiver start");
+              LOG.log(Level.FINEST, "=== Test network service sender start");
+              try (final NameResolver nameResolver = 
injector.getInstance(NameResolver.class);
+                   NetworkService<String> ns2 = new 
NetworkService<String>(factory, 0, nameResolver,
+                       new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                       new MessageHandler<String>(name2, monitor, 
numMessages), new ExceptionHandler(), localAddressProvider);
+                   NetworkService<String> ns1 = new 
NetworkService<String>(factory, 0, nameResolver,
+                       new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                       new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+                ns2.registerId(factory.getNewInstance(name2));
+                final int port2 = ns2.getTransport().getListeningPort();
+                server.register(factory.getNewInstance(name2), new 
InetSocketAddress(localAddress, port2));
+
+                ns1.registerId(factory.getNewInstance(name1));
+                final int port1 = ns1.getTransport().getListeningPort();
+                server.register(factory.getNewInstance(name1), new 
InetSocketAddress(localAddress, port1));
+
+                Identifier destId = factory.getNewInstance(name2);
+
+                // build the message
+                StringBuilder msb = new StringBuilder();
+                for (int i = 0; i < size; i++) {
+                  msb.append("1");
+                }
+                String message = msb.toString();
+
+                try (Connection<String> conn = ns1.newConnection(destId)) {
+                  for (int i = 0; i < numMessages; i++) {
+                    conn.open();
+                    conn.write(message);
+                  }
+                  monitor.mwait();
+                } catch (NetworkException e) {
+                  e.printStackTrace();
+                  throw new RuntimeException(e);
+                }
               }
-              monitor.mwait();
-            } catch (NetworkException e) {
+            } catch (Exception e) {
               e.printStackTrace();
+              throw new RuntimeException(e);
             }
-            conn.close();
-
-            ns1.close();
-            ns2.close();
-          } catch (Exception e) {
-            e.printStackTrace();
-
           }
-        }
-      });
-    }
-
-    // start and time
-    long start = System.currentTimeMillis();
-    Object ignore = new Object();
-    for (int i = 0; i < numThreads; i++) barrier.add(ignore);
-    e.shutdown();
-    e.awaitTermination(100, TimeUnit.SECONDS);
-    long end = System.currentTimeMillis();
+        });
+      }
 
-    double runtime = ((double) end - start) / 1000;
-    LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) 
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+      // start and time
+      long start = System.currentTimeMillis();
+      Object ignore = new Object();
+      for (int i = 0; i < numThreads; i++) barrier.add(ignore);
+      e.shutdown();
+      e.awaitTermination(100, TimeUnit.SECONDS);
+      long end = System.currentTimeMillis();
 
-    server.close();
+      double runtime = ((double) end - start) / 1000;
+      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) 
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+    }
   }
 
   @Test
@@ -328,88 +319,87 @@ public class NetworkServiceTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    int nameServerPort = server.getPort();
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      int nameServerPort = server.getPort();
 
-    final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
+      final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
 
-    for (int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      int numThreads = 2;
-      int totalNumMessages = numMessages * numThreads;
-      final Monitor monitor = new Monitor();
+      for (int size : messageSizes) {
+        final int numMessages = 300000 / (Math.max(1, size / 512));
+        int numThreads = 2;
+        int totalNumMessages = numMessages * numThreads;
+        final Monitor monitor = new Monitor();
 
-      LOG.log(Level.FINEST, "=== Test network service receiver start");
-      // network service
-      final String name2 = "task2";
-      final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
-          .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-          .build())
-          .build();
 
-      final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-      final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-      NetworkService<String> ns2 = new NetworkService<String>(
-          factory, 0, nameResolver,
-          new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-          new MessageHandler<String>(name2, monitor, totalNumMessages), new 
ExceptionHandler(), localAddressProvider);
-      ns2.registerId(factory.getNewInstance(name2));
-      final int port2 = ns2.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+        // network service
+        final String name2 = "task2";
+        final String name1 = "task1";
+        final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+            .build())
+            .build();
 
-      LOG.log(Level.FINEST, "=== Test network service sender start");
-      final String name1 = "task1";
-      NetworkService<String> ns1 = new NetworkService<String>(
-          factory, 0, nameResolver,
-          new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-          new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), 
localAddressProvider);
-      ns1.registerId(factory.getNewInstance(name1));
-      final int port1 = ns1.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-      Identifier destId = factory.getNewInstance(name2);
-      final Connection<String> conn = ns1.newConnection(destId);
-      conn.open();
-
-      // build the message
-      StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      final String message = msb.toString();
+        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
 
-      ExecutorService e = Executors.newCachedThreadPool();
+        LOG.log(Level.FINEST, "=== Test network service receiver start");
+        LOG.log(Level.FINEST, "=== Test network service sender start");
+        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name2, monitor, totalNumMessages), 
new ExceptionHandler(), localAddressProvider);
+             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
 
-      long start = System.currentTimeMillis();
-      for (int i = 0; i < numThreads; i++) {
-        e.submit(new Runnable() {
+          ns2.registerId(factory.getNewInstance(name2));
+          final int port2 = ns2.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
 
-          @Override
-          public void run() {
-            for (int i = 0; i < numMessages; i++) {
-              conn.write(message);
+          ns1.registerId(factory.getNewInstance(name1));
+          final int port1 = ns1.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
+
+          Identifier destId = factory.getNewInstance(name2);
+
+          try (final Connection<String> conn = ns1.newConnection(destId)) {
+            conn.open();
+
+            // build the message
+            StringBuilder msb = new StringBuilder();
+            for (int i = 0; i < size; i++) {
+              msb.append("1");
             }
-          }
-        });
-      }
+            final String message = msb.toString();
 
+            ExecutorService e = Executors.newCachedThreadPool();
 
-      e.shutdown();
-      e.awaitTermination(30, TimeUnit.SECONDS);
-      monitor.mwait();
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < numThreads; i++) {
+              e.submit(new Runnable() {
 
-      long end = System.currentTimeMillis();
-      double runtime = ((double) end - start) / 1000;
+                @Override
+                public void run() {
+                  for (int i = 0; i < numMessages; i++) {
+                    conn.write(message);
+                  }
+                }
+              });
+            }
 
-      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) 
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
-      conn.close();
 
-      ns1.close();
-      ns2.close();
-    }
+            e.shutdown();
+            e.awaitTermination(30, TimeUnit.SECONDS);
+            monitor.mwait();
 
-    server.close();
+            long end = System.currentTimeMillis();
+            double runtime = ((double) end - start) / 1000;
+
+            LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double) 
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+          }
+        }
+      }
+    }
   }
 
   /**
@@ -424,80 +414,77 @@ public class NetworkServiceTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
     injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    final NameServer server = injector.getInstance(NameServer.class);
-    int nameServerPort = server.getPort();
-
-    final int batchSize = 1024 * 1024;
-    final int[] messageSizes = {32, 64, 512};
-
-    for (int size : messageSizes) {
-      final int numMessages = 300 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-
-      LOG.log(Level.FINEST, "=== Test network service receiver start");
-      // network service
-      final String name2 = "task2";
-      final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
-          .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-          .build())
-          .build();
-
-      final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-      final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-      NetworkService<String> ns2 = new NetworkService<String>(
-          factory, 0, nameResolver,
-          new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-          new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
-      ns2.registerId(factory.getNewInstance(name2));
-      final int port2 = ns2.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
 
-      LOG.log(Level.FINEST, "=== Test network service sender start");
-      final String name1 = "task1";
-      NetworkService<String> ns1 = new NetworkService<String>(
-          factory, 0, nameResolver,
-          new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-          new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), 
localAddressProvider);
-      ns1.registerId(factory.getNewInstance(name1));
-      final int port1 = ns1.getTransport().getListeningPort();
-      server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-      Identifier destId = factory.getNewInstance(name2);
-      Connection<String> conn = ns1.newConnection(destId);
-
-      // build the message
-      StringBuilder msb = new StringBuilder();
-      for (int i = 0; i < size; i++) {
-        msb.append("1");
-      }
-      String message = msb.toString();
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      int nameServerPort = server.getPort();
+
+      final int batchSize = 1024 * 1024;
+      final int[] messageSizes = {32, 64, 512};
+
+      for (int size : messageSizes) {
+        final int numMessages = 300 / (Math.max(1, size / 512));
+        final Monitor monitor = new Monitor();
+
+        // network service
+        final String name2 = "task2";
+        final String name1 = "task1";
+        final Configuration nameResolverConf = 
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+            .build())
+            .build();
+
+        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+        LOG.log(Level.FINEST, "=== Test network service receiver start");
+        LOG.log(Level.FINEST, "=== Test network service sender start");
+        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
+             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+          ns2.registerId(factory.getNewInstance(name2));
+          final int port2 = ns2.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+
+          ns1.registerId(factory.getNewInstance(name1));
+          final int port1 = ns1.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
+
+          Identifier destId = factory.getNewInstance(name2);
+
+          // build the message
+          StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
+          }
+          String message = msb.toString();
 
-      long start = System.currentTimeMillis();
-      try {
-        for (int i = 0; i < numMessages; i++) {
-          StringBuilder sb = new StringBuilder();
-          for (int j = 0; j < batchSize / size; j++) {
-            sb.append(message);
+          long start = System.currentTimeMillis();
+          try (Connection<String> conn = ns1.newConnection(destId)) {
+            for (int i = 0; i < numMessages; i++) {
+              StringBuilder sb = new StringBuilder();
+              for (int j = 0; j < batchSize / size; j++) {
+                sb.append(message);
+              }
+              conn.open();
+              conn.write(sb.toString());
+            }
+            monitor.mwait();
+          } catch (NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
           }
-          conn.open();
-          conn.write(sb.toString());
+          long end = System.currentTimeMillis();
+          double runtime = ((double) end - start) / 1000;
+          long numAppMessages = numMessages * batchSize / size;
+          LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 
2 * size) / runtime);// x2 for unicode chars
         }
-        monitor.mwait();
-      } catch (NetworkException e) {
-        e.printStackTrace();
       }
-      long end = System.currentTimeMillis();
-      double runtime = ((double) end - start) / 1000;
-      long numAppMessages = numMessages * batchSize / size;
-      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 
2 * size) / runtime);// x2 for unicode chars
-      conn.close();
-
-      ns1.close();
-      ns2.close();
     }
-
-    server.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
deleted file mode 100644
index ba0cfe4..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.Connection;
-import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.NetworkConnectionService;
-import 
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
-import org.apache.reef.io.network.naming.NameResolverConfiguration;
-import org.apache.reef.io.network.naming.NameServer;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.transport.LinkListener;
-
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Helper class for NetworkConnectionService test.
- */
-public final class NetworkMessagingTest {
-  private static final Logger LOG = 
Logger.getLogger(NetworkMessagingTest.class.getName());
-
-  private final IdentifierFactory factory;
-  private final NetworkConnectionService receiverNetworkConnService;
-  private final NetworkConnectionService senderNetworkConnService;
-  private final String receiver;
-  private final String sender;
-  private final NameServer nameServer;
-
-  public NetworkMessagingTest(final String localAddress) throws 
InjectionException {
-    // name server
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    this.nameServer = injector.getInstance(NameServer.class);
-    final Configuration netConf = NameResolverConfiguration.CONF
-        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
-        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
-        .build();
-
-    LOG.log(Level.FINEST, "=== Test network connection service receiver 
start");
-    // network service for receiver
-    this.receiver = "receiver";
-    final Injector injectorReceiver = injector.forkInjector(netConf);
-    this.receiverNetworkConnService = 
injectorReceiver.getInstance(NetworkConnectionService.class);
-    this.factory = 
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
-    
this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
-
-    // network service for sender
-    this.sender = "sender";
-    LOG.log(Level.FINEST, "=== Test network connection service sender start");
-    final Injector injectorSender = injector.forkInjector(netConf);
-    senderNetworkConnService = 
injectorSender.getInstance(NetworkConnectionService.class);
-    
senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
-  }
-
-  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
-                                                final int numMessages, final 
Monitor monitor,
-                                                final Codec<T> codec) throws 
NetworkException {
-    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, 
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
-    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, 
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
-  }
-
-  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier 
connFactoryId) {
-    final Identifier destId = factory.getNewInstance(receiver);
-    return 
(Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
-  }
-
-  public void close() throws Exception {
-    senderNetworkConnService.close();
-    receiverNetworkConnService.close();
-    nameServer.close();
-  }
-
-  public static final class MessageHandler<T> implements 
EventHandler<Message<T>> {
-    private final int expected;
-    private final Monitor monitor;
-    private AtomicInteger count = new AtomicInteger(0);
-
-    public MessageHandler(final Monitor monitor,
-                          final int expected) {
-      this.monitor = monitor;
-      this.expected = expected;
-    }
-
-    @Override
-    public void onNext(Message<T> value) {
-      count.incrementAndGet();
-      LOG.log(Level.FINE, "Count: {0}", count.get());
-      LOG.log(Level.FINE,
-          "OUT: {0} received {1} from {2} to {3}",
-          new Object[]{value, value.getSrcId(), value.getDestId()});
-
-      for (final T obj : value.getData()) {
-        LOG.log(Level.FINE, "OUT: data: {0}", obj);
-      }
-
-      if (count.get() == expected) {
-        monitor.mnotify();
-      }
-    }
-  }
-
-  public static final class TestListener<T> implements 
LinkListener<Message<T>> {
-    @Override
-    public void onSuccess(Message<T> message) {
-      LOG.log(Level.FINE, "success: " + message);
-    }
-    @Override
-    public void onException(Throwable cause, SocketAddress remoteAddress, 
Message<T> message) {
-      LOG.log(Level.WARNING, "exception: " + cause + message);
-      throw new RuntimeException(cause);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
new file mode 100644
index 0000000..0b188c5
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import 
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class for NetworkConnectionService test.
+ */
+public final class NetworkMessagingTestService implements AutoCloseable {
+  private static final Logger LOG = 
Logger.getLogger(NetworkMessagingTestService.class.getName());
+
+  private final IdentifierFactory factory;
+  private final NetworkConnectionService receiverNetworkConnService;
+  private final NetworkConnectionService senderNetworkConnService;
+  private final String receiver;
+  private final String sender;
+  private final NameServer nameServer;
+  private final NameResolver receiverResolver;
+  private final NameResolver senderResolver;
+
+  public NetworkMessagingTestService(final String localAddress) throws 
InjectionException {
+    // name server
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.nameServer = injector.getInstance(NameServer.class);
+    final Configuration netConf = NameResolverConfiguration.CONF
+        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
+        .build();
+
+    LOG.log(Level.FINEST, "=== Test network connection service receiver 
start");
+    // network service for receiver
+    this.receiver = "receiver";
+    final Injector injectorReceiver = injector.forkInjector(netConf);
+    this.receiverNetworkConnService = 
injectorReceiver.getInstance(NetworkConnectionService.class);
+    this.receiverResolver = injectorReceiver.getInstance(NameResolver.class);
+    this.factory = 
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
+    
this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
+
+    // network service for sender
+    this.sender = "sender";
+    LOG.log(Level.FINEST, "=== Test network connection service sender start");
+    final Injector injectorSender = injector.forkInjector(netConf);
+    senderNetworkConnService = 
injectorSender.getInstance(NetworkConnectionService.class);
+    
senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
+    this.senderResolver = injectorSender.getInstance(NameResolver.class);
+  }
+
+  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
+                                                final int numMessages, final 
Monitor monitor,
+                                                final Codec<T> codec) throws 
NetworkException {
+    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, 
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, 
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+  }
+
+  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier 
connFactoryId) {
+    final Identifier destId = factory.getNewInstance(receiver);
+    return 
(Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
+  }
+
+  public void close() throws Exception {
+    senderNetworkConnService.close();
+    receiverNetworkConnService.close();
+    nameServer.close();
+    receiverResolver.close();
+    senderResolver.close();
+  }
+
+  public static final class MessageHandler<T> implements 
EventHandler<Message<T>> {
+    private final int expected;
+    private final Monitor monitor;
+    private AtomicInteger count = new AtomicInteger(0);
+
+    public MessageHandler(final Monitor monitor,
+                          final int expected) {
+      this.monitor = monitor;
+      this.expected = expected;
+    }
+
+    @Override
+    public void onNext(Message<T> value) {
+      count.incrementAndGet();
+      LOG.log(Level.FINE, "Count: {0}", count.get());
+      LOG.log(Level.FINE,
+          "OUT: {0} received {1} from {2} to {3}",
+          new Object[]{value, value.getSrcId(), value.getDestId()});
+
+      for (final T obj : value.getData()) {
+        LOG.log(Level.FINE, "OUT: data: {0}", obj);
+      }
+
+      if (count.get() == expected) {
+        monitor.mnotify();
+      }
+    }
+  }
+
+  public static final class TestListener<T> implements 
LinkListener<Message<T>> {
+    @Override
+    public void onSuccess(Message<T> message) {
+      LOG.log(Level.FINE, "success: " + message);
+    }
+    @Override
+    public void onException(Throwable cause, SocketAddress remoteAddress, 
Message<T> message) {
+      LOG.log(Level.WARNING, "exception: " + cause + message);
+      throw new RuntimeException(cause);
+    }
+  }
+}
\ No newline at end of file

Reply via email to