Repository: incubator-reef
Updated Branches:
refs/heads/master 25735bfa5 -> bc9f5b54d
[REEF-480] Close resources in NetworkServiceTest
* Fixed NetworkMessagingTest.close() to close receiverResolver and
senderResolver
* Used try-with-resoureces
* Renamed NetworkMessagingTest NetworkMessagingTestService
* Changed NetworkConnectionServiceTest to catch NetworkException and
throw RuntimeException
JIRA:
[REEF-480](https://issues.apache.org/jira/browse/REEF-480)
Pull Request:
This closes #301
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/bc9f5b54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/bc9f5b54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/bc9f5b54
Branch: refs/heads/master
Commit: bc9f5b54db4ab2dc39d189556133593a66d625c3
Parents: 25735bf
Author: Gyeongin Yu <[email protected]>
Authored: Fri Jul 17 19:22:42 2015 +0900
Committer: Markus Weimer <[email protected]>
Committed: Wed Jul 22 10:52:47 2015 -0700
----------------------------------------------------------------------
.../reef/services/network/NamingTest.java | 350 +++++-----
.../network/NetworkConnectionServiceTest.java | 329 +++++-----
.../services/network/NetworkServiceTest.java | 649 +++++++++----------
.../network/util/NetworkMessagingTest.java | 139 ----
.../util/NetworkMessagingTestService.java | 146 +++++
5 files changed, 803 insertions(+), 810 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
index 462390e..b726b78 100644
---
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
+++
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
@@ -96,33 +96,32 @@ public class NamingTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
this.factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- this.port = server.getPort();
- for (final Identifier id : idToAddrMap.keySet()) {
- server.register(id, idToAddrMap.get(id));
- }
-
- // run a client
- final NameLookupClient client = new NameLookupClient(localAddress,
this.port,
- 10000, this.factory, retryCount, retryTimeout, new
NameCache(this.TTL), this.localAddressProvider);
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ this.port = server.getPort();
+ for (final Identifier id : idToAddrMap.keySet()) {
+ server.register(id, idToAddrMap.get(id));
+ }
- final Identifier id1 = this.factory.getNewInstance("task1");
- final Identifier id2 = this.factory.getNewInstance("task2");
+ // run a client
+ try (final NameLookupClient client = new NameLookupClient(localAddress,
this.port,
+ 10000, this.factory, retryCount, retryTimeout, new
NameCache(this.TTL), this.localAddressProvider)) {
- final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier,
InetSocketAddress>();
- InetSocketAddress addr1 = client.lookup(id1);
- respMap.put(id1, addr1);
- InetSocketAddress addr2 = client.lookup(id2);
- respMap.put(id2, addr2);
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
- for (final Identifier id : respMap.keySet()) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id,
respMap.get(id)});
- }
+ final Map<Identifier, InetSocketAddress> respMap = new
HashMap<Identifier, InetSocketAddress>();
+ InetSocketAddress addr1 = client.lookup(id1);
+ respMap.put(id1, addr1);
+ InetSocketAddress addr2 = client.lookup(id2);
+ respMap.put(id2, addr2);
- Assert.assertTrue(isEqual(idToAddrMap, respMap));
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id,
respMap.get(id)});
+ }
- client.close();
- server.close();
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+ }
+ }
}
/**
@@ -151,76 +150,75 @@ public class NamingTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
this.factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- this.port = server.getPort();
- for (final Identifier id : idToAddrMap.keySet()) {
- server.register(id, idToAddrMap.get(id));
- }
-
- // run a client
- final NameLookupClient client = new NameLookupClient(localAddress,
this.port,
- 10000, this.factory, retryCount, retryTimeout, new
NameCache(this.TTL), this.localAddressProvider);
-
- final Identifier id1 = this.factory.getNewInstance("task1");
- final Identifier id2 = this.factory.getNewInstance("task2");
- final Identifier id3 = this.factory.getNewInstance("task3");
-
- final ExecutorService e = Executors.newCachedThreadPool();
-
- final ConcurrentMap<Identifier, InetSocketAddress> respMap = new
ConcurrentHashMap<Identifier, InetSocketAddress>();
-
- final Future<?> f1 = e.submit(new Runnable() {
- @Override
- public void run() {
- InetSocketAddress addr = null;
- try {
- addr = client.lookup(id1);
- } catch (final Exception e) {
- LOG.log(Level.SEVERE, "Lookup failed", e);
- Assert.fail(e.toString());
- }
- respMap.put(id1, addr);
- }
- });
- final Future<?> f2 = e.submit(new Runnable() {
- @Override
- public void run() {
- InetSocketAddress addr = null;
- try {
- addr = client.lookup(id2);
- } catch (final Exception e) {
- LOG.log(Level.SEVERE, "Lookup failed", e);
- Assert.fail(e.toString());
- }
- respMap.put(id2, addr);
- }
- });
- final Future<?> f3 = e.submit(new Runnable() {
- @Override
- public void run() {
- InetSocketAddress addr = null;
- try {
- addr = client.lookup(id3);
- } catch (final Exception e) {
- LOG.log(Level.SEVERE, "Lookup failed", e);
- Assert.fail(e.toString());
- }
- respMap.put(id3, addr);
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ this.port = server.getPort();
+ for (final Identifier id : idToAddrMap.keySet()) {
+ server.register(id, idToAddrMap.get(id));
}
- });
- f1.get();
- f2.get();
- f3.get();
+ // run a client
+ try (final NameLookupClient client = new
NameLookupClient(localAddress, this.port,
+ 10000, this.factory, retryCount, retryTimeout, new
NameCache(this.TTL), this.localAddressProvider)) {
+
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
+ final Identifier id3 = this.factory.getNewInstance("task3");
+
+ final ExecutorService e = Executors.newCachedThreadPool();
+
+ final ConcurrentMap<Identifier, InetSocketAddress> respMap = new
ConcurrentHashMap<Identifier, InetSocketAddress>();
+
+ final Future<?> f1 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id1);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id1, addr);
+ }
+ });
+ final Future<?> f2 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id2);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id2, addr);
+ }
+ });
+ final Future<?> f3 = e.submit(new Runnable() {
+ @Override
+ public void run() {
+ InetSocketAddress addr = null;
+ try {
+ addr = client.lookup(id3);
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Lookup failed", e);
+ Assert.fail(e.toString());
+ }
+ respMap.put(id3, addr);
+ }
+ });
+
+ f1.get();
+ f2.get();
+ f3.get();
+
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id,
respMap.get(id)});
+ }
- for (final Identifier id : respMap.keySet()) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id,
respMap.get(id)});
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
+ }
}
-
- Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
- client.close();
- server.close();
}
}
@@ -237,55 +235,55 @@ public class NamingTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
this.factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- this.port = server.getPort();
- final String localAddress = localAddressProvider.getLocalAddress();
-
- // names to start with
- final Map<Identifier, InetSocketAddress> idToAddrMap = new
HashMap<Identifier, InetSocketAddress>();
- idToAddrMap.put(this.factory.getNewInstance("task1"), new
InetSocketAddress(localAddress, 7001));
- idToAddrMap.put(this.factory.getNewInstance("task2"), new
InetSocketAddress(localAddress, 7002));
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ this.port = server.getPort();
+ final String localAddress = localAddressProvider.getLocalAddress();
- // registration
- // invoke registration from the client side
- final NameRegistryClient client = new NameRegistryClient(localAddress,
this.port, this.factory, this.localAddressProvider);
- for (final Identifier id : idToAddrMap.keySet()) {
- client.register(id, idToAddrMap.get(id));
- }
+ // names to start with
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new
HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new
InetSocketAddress(localAddress, 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new
InetSocketAddress(localAddress, 7002));
- // wait
- final Set<Identifier> ids = idToAddrMap.keySet();
- busyWait(server, ids.size(), ids);
+ // registration
+ // invoke registration from the client side
+ try (final NameRegistryClient client = new
NameRegistryClient(localAddress,
+ this.port, this.factory, this.localAddressProvider)) {
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.register(id, idToAddrMap.get(id));
+ }
- // check the server side
- Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier,
InetSocketAddress>();
- Iterable<NameAssignment> nas = server.lookup(ids);
+ // wait
+ final Set<Identifier> ids = idToAddrMap.keySet();
+ busyWait(server, ids.size(), ids);
- for (final NameAssignment na : nas) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
- new Object[]{na.getIdentifier(), na.getAddress()});
- serverMap.put(na.getIdentifier(), na.getAddress());
- }
+ // check the server side
+ Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier,
InetSocketAddress>();
+ Iterable<NameAssignment> nas = server.lookup(ids);
- Assert.assertTrue(isEqual(idToAddrMap, serverMap));
+ for (final NameAssignment na : nas) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
+ new Object[]{na.getIdentifier(), na.getAddress()});
+ serverMap.put(na.getIdentifier(), na.getAddress());
+ }
- // un-registration
- for (final Identifier id : idToAddrMap.keySet()) {
- client.unregister(id);
- }
+ Assert.assertTrue(isEqual(idToAddrMap, serverMap));
- // wait
- busyWait(server, 0, ids);
+ // un-registration
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.unregister(id);
+ }
- serverMap = new HashMap<Identifier, InetSocketAddress>();
- nas = server.lookup(ids);
- for (final NameAssignment na : nas)
- serverMap.put(na.getIdentifier(), na.getAddress());
+ // wait
+ busyWait(server, 0, ids);
- Assert.assertEquals(0, serverMap.size());
+ serverMap = new HashMap<Identifier, InetSocketAddress>();
+ nas = server.lookup(ids);
+ for (final NameAssignment na : nas)
+ serverMap.put(na.getIdentifier(), na.getAddress());
- client.close();
- server.close();
+ Assert.assertEquals(0, serverMap.size());
+ }
+ }
}
/**
@@ -302,66 +300,66 @@ public class NamingTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
this.factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- this.port = server.getPort();
-
- final Map<Identifier, InetSocketAddress> idToAddrMap = new
HashMap<Identifier, InetSocketAddress>();
- idToAddrMap.put(this.factory.getNewInstance("task1"), new
InetSocketAddress(localAddress, 7001));
- idToAddrMap.put(this.factory.getNewInstance("task2"), new
InetSocketAddress(localAddress, 7002));
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ this.port = server.getPort();
- // registration
- // invoke registration from the client side
- Configuration nameResolverConf = NameResolverConfiguration.CONF
- .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
- .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port)
- .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL)
- .set(NameResolverConfiguration.RETRY_TIMEOUT, retryTimeout)
- .set(NameResolverConfiguration.RETRY_COUNT, retryCount)
- .build();
-
- final NameResolver client =
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class);
- for (final Identifier id : idToAddrMap.keySet()) {
- client.register(id, idToAddrMap.get(id));
- }
+ final Map<Identifier, InetSocketAddress> idToAddrMap = new
HashMap<Identifier, InetSocketAddress>();
+ idToAddrMap.put(this.factory.getNewInstance("task1"), new
InetSocketAddress(localAddress, 7001));
+ idToAddrMap.put(this.factory.getNewInstance("task2"), new
InetSocketAddress(localAddress, 7002));
- // wait
- final Set<Identifier> ids = idToAddrMap.keySet();
- busyWait(server, ids.size(), ids);
+ // registration
+ // invoke registration from the client side
+ Configuration nameResolverConf = NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port)
+ .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL)
+ .set(NameResolverConfiguration.RETRY_TIMEOUT, retryTimeout)
+ .set(NameResolverConfiguration.RETRY_COUNT, retryCount)
+ .build();
+
+ try (final NameResolver client
+ =
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
{
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.register(id, idToAddrMap.get(id));
+ }
- // lookup
- final Identifier id1 = this.factory.getNewInstance("task1");
- final Identifier id2 = this.factory.getNewInstance("task2");
+ // wait
+ final Set<Identifier> ids = idToAddrMap.keySet();
+ busyWait(server, ids.size(), ids);
- final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier,
InetSocketAddress>();
- InetSocketAddress addr1 = client.lookup(id1);
- respMap.put(id1, addr1);
- InetSocketAddress addr2 = client.lookup(id2);
- respMap.put(id2, addr2);
+ // lookup
+ final Identifier id1 = this.factory.getNewInstance("task1");
+ final Identifier id2 = this.factory.getNewInstance("task2");
- for (final Identifier id : respMap.keySet()) {
- LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id,
respMap.get(id)});
- }
+ final Map<Identifier, InetSocketAddress> respMap = new
HashMap<Identifier, InetSocketAddress>();
+ InetSocketAddress addr1 = client.lookup(id1);
+ respMap.put(id1, addr1);
+ InetSocketAddress addr2 = client.lookup(id2);
+ respMap.put(id2, addr2);
- Assert.assertTrue(isEqual(idToAddrMap, respMap));
+ for (final Identifier id : respMap.keySet()) {
+ LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id,
respMap.get(id)});
+ }
- // un-registration
- for (final Identifier id : idToAddrMap.keySet()) {
- client.unregister(id);
- }
+ Assert.assertTrue(isEqual(idToAddrMap, respMap));
- // wait
- busyWait(server, 0, ids);
+ // un-registration
+ for (final Identifier id : idToAddrMap.keySet()) {
+ client.unregister(id);
+ }
- final Map<Identifier, InetSocketAddress> serverMap = new
HashMap<Identifier, InetSocketAddress>();
- addr1 = server.lookup(id1);
- if (addr1 != null) serverMap.put(id1, addr1);
- addr2 = server.lookup(id1);
- if (addr2 != null) serverMap.put(id2, addr2);
+ // wait
+ busyWait(server, 0, ids);
- Assert.assertEquals(0, serverMap.size());
+ final Map<Identifier, InetSocketAddress> serverMap = new
HashMap<Identifier, InetSocketAddress>();
+ addr1 = server.lookup(id1);
+ if (addr1 != null) serverMap.put(id1, addr1);
+ addr2 = server.lookup(id1);
+ if (addr2 != null) serverMap.put(id2, addr2);
- client.close();
- server.close();
+ Assert.assertEquals(0, serverMap.size());
+ }
+ }
}
private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
index ddf4bfd..3cb1310 100644
---
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
+++
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
@@ -64,23 +64,23 @@ public class NetworkConnectionServiceTest {
private void runMessagingNetworkConnectionService(Codec<String> codec)
throws Exception {
final int numMessages = 2000;
final Monitor monitor = new Monitor();
- final NetworkMessagingTest messagingTest = new
NetworkMessagingTest(localAddress);
- messagingTest.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
-
- final Connection<String> conn =
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
- try {
- conn.open();
- for (int count = 0; count < numMessages; ++count) {
- // send messages to the receiver.
- conn.write("hello" + count);
+ try (final NetworkMessagingTestService messagingTestService = new
NetworkMessagingTestService(localAddress)) {
+ messagingTestService.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
+
+ try (final Connection<String> conn =
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write("hello" + count);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
}
-
- conn.close();
- messagingTest.close();
}
/**
@@ -106,56 +106,52 @@ public class NetworkConnectionServiceTest {
final int groupcommMessages = 1000;
final Monitor monitor = new Monitor();
- final NetworkMessagingTest messagingTest = new
NetworkMessagingTest(localAddress);
+ try (final NetworkMessagingTestService messagingTestService = new
NetworkMessagingTestService(localAddress)) {
- messagingTest.registerTestConnectionFactory(groupCommClientId,
groupcommMessages, monitor, stringCodec);
+ messagingTestService.registerTestConnectionFactory(groupCommClientId,
groupcommMessages, monitor, stringCodec);
- final int shuffleMessges = 2000;
- final Monitor monitor2 = new Monitor();
- messagingTest.registerTestConnectionFactory(shuffleClientId,
shuffleMessges, monitor2, integerCodec);
+ final int shuffleMessages = 2000;
+ final Monitor monitor2 = new Monitor();
+ messagingTestService.registerTestConnectionFactory(shuffleClientId,
shuffleMessages, monitor2, integerCodec);
- executor.submit(new Runnable() {
- @Override
- public void run() {
- final Connection<String> conn =
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
- try {
- conn.open();
- for (int count = 0; count < groupcommMessages; ++count) {
- // send messages to the receiver.
- conn.write("hello" + count);
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try (final Connection<String> conn =
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+ conn.open();
+ for (int count = 0; count < groupcommMessages; ++count) {
+ // send messages to the receiver.
+ conn.write("hello" + count);
+ }
+ monitor.mwait();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
}
- monitor.mwait();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
}
- }
- });
+ });
- executor.submit(new Runnable() {
- @Override
- public void run() {
- final Connection<Integer> conn =
messagingTest.getConnectionFromSenderToReceiver(shuffleClientId);
- try {
- conn.open();
- for (int count = 0; count < shuffleMessges; ++count) {
- // send messages to the receiver.
- conn.write(count);
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try (final Connection<Integer> conn =
messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) {
+ conn.open();
+ for (int count = 0; count < shuffleMessages; ++count) {
+ // send messages to the receiver.
+ conn.write(count);
+ }
+ monitor2.mwait();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
}
- monitor2.mwait();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
}
- }
- });
+ });
- monitor.mwait();
- monitor2.mwait();
- executor.shutdown();
- messagingTest.close();
+ monitor.mwait();
+ monitor2.mwait();
+ executor.shutdown();
+ }
}
/**
@@ -188,33 +184,36 @@ public class NetworkConnectionServiceTest {
final int numMessages = 300000 / (Math.max(1, size / 512));
final Monitor monitor = new Monitor();
final Codec<String> codec = new StringCodec();
- final NetworkMessagingTest messagingTest = new
NetworkMessagingTest(localAddress);
- messagingTest.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
- final Connection<String> conn =
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-
- // build the message
- final StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- final String message = msb.toString();
-
- long start = System.currentTimeMillis();
- try {
- conn.open();
- for (int count = 0; count < numMessages; ++count) {
- // send messages to the receiver.
- conn.write(message);
+ try (final NetworkMessagingTestService messagingTestService = new
NetworkMessagingTestService(localAddress)) {
+ messagingTestService.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
+
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
}
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
- }
- final long end = System.currentTimeMillis();
+ final String message = msb.toString();
+
+ try (final Connection<String> conn =
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+ long start = System.currentTimeMillis();
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ final long end = System.currentTimeMillis();
- final double runtime = ((double) end - start) / 1000;
- LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages /
runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) /
runtime);// x2 for unicode chars
- messagingTest.close();
+ final double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages
/ runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) /
runtime);// x2 for unicode chars
+ }
+ }
}
}
@@ -237,31 +236,32 @@ public class NetworkConnectionServiceTest {
e.submit(new Runnable() {
public void run() {
- try {
+ try (final NetworkMessagingTestService messagingTestService = new
NetworkMessagingTestService(localAddress)) {
final Monitor monitor = new Monitor();
final Codec<String> codec = new StringCodec();
- final NetworkMessagingTest messagingTest = new
NetworkMessagingTest(localAddress);
- messagingTest.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
- final Connection<String> conn =
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-
- // build the message
- final StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- final String message = msb.toString();
- try {
- conn.open();
- for (int count = 0; count < numMessages; ++count) {
- // send messages to the receiver.
- conn.write(message);
+
messagingTestService.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
+ try (final Connection<String> conn =
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ final String message = msb.toString();
+
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
}
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
}
- messagingTest.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -291,46 +291,46 @@ public class NetworkConnectionServiceTest {
final int totalNumMessages = numMessages * numThreads;
final Monitor monitor = new Monitor();
final Codec<String> codec = new StringCodec();
- final NetworkMessagingTest messagingTest = new
NetworkMessagingTest(localAddress);
- messagingTest.registerTestConnectionFactory(groupCommClientId,
totalNumMessages, monitor, codec);
-
- final ExecutorService e = Executors.newCachedThreadPool();
+ try (final NetworkMessagingTestService messagingTestService = new
NetworkMessagingTestService(localAddress)) {
+ messagingTestService.registerTestConnectionFactory(groupCommClientId,
totalNumMessages, monitor, codec);
- // build the message
- final StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- final String message = msb.toString();
- final Connection<String> conn =
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
+ final ExecutorService e = Executors.newCachedThreadPool();
- final long start = System.currentTimeMillis();
- for (int i = 0; i < numThreads; i++) {
- e.submit(new Runnable() {
- @Override
- public void run() {
-
- try {
- conn.open();
- for (int count = 0; count < numMessages; ++count) {
- // send messages to the receiver.
- conn.write(message);
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ final String message = msb.toString();
+ try (final Connection<String> conn =
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+ final long start = System.currentTimeMillis();
+ for (int i = 0; i < numThreads; i++) {
+ e.submit(new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ // send messages to the receiver.
+ conn.write(message);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
}
+ });
}
- });
- }
- e.shutdown();
- e.awaitTermination(30, TimeUnit.SECONDS);
- monitor.mwait();
- final long end = System.currentTimeMillis();
- final double runtime = ((double) end - start) / 1000;
- LOG.log(Level.INFO, "size: " + size + "; messages/s: " +
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double)
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
- conn.close();
- messagingTest.close();
+ e.shutdown();
+ e.awaitTermination(30, TimeUnit.SECONDS);
+ monitor.mwait();
+ final long end = System.currentTimeMillis();
+ final double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.INFO, "size: " + size + "; messages/s: " +
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double)
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+ }
+ }
}
}
@@ -348,38 +348,39 @@ public class NetworkConnectionServiceTest {
final int numMessages = 300 / (Math.max(1, size / 512));
final Monitor monitor = new Monitor();
final Codec<String> codec = new StringCodec();
- final NetworkMessagingTest messagingTest = new
NetworkMessagingTest(localAddress);
- messagingTest.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
- final Connection<String> conn =
messagingTest.getConnectionFromSenderToReceiver(groupCommClientId);
-
- // build the message
- final StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- final String message = msb.toString();
-
- final long start = System.currentTimeMillis();
- try {
- for (int i = 0; i < numMessages; i++) {
- final StringBuilder sb = new StringBuilder();
- for (int j = 0; j < batchSize / size; j++) {
- sb.append(message);
+ try (final NetworkMessagingTestService messagingTestService = new
NetworkMessagingTestService(localAddress)) {
+ messagingTestService.registerTestConnectionFactory(groupCommClientId,
numMessages, monitor, codec);
+ try (final Connection<String> conn =
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+ // build the message
+ final StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
}
- conn.open();
- conn.write(sb.toString());
+ final String message = msb.toString();
+
+ final long start = System.currentTimeMillis();
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ final StringBuilder sb = new StringBuilder();
+ for (int j = 0; j < batchSize / size; j++) {
+ sb.append(message);
+ }
+ conn.open();
+ conn.write(sb.toString());
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ final long end = System.currentTimeMillis();
+ final double runtime = ((double) end - start) / 1000;
+ final long numAppMessages = numMessages * batchSize / size;
+ LOG.log(Level.INFO, "size: " + size + "; messages/s: " +
numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages *
2 * size) / runtime);// x2 for unicode chars
}
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
}
-
- final long end = System.currentTimeMillis();
- final double runtime = ((double) end - start) / 1000;
- final long numAppMessages = numMessages * batchSize / size;
- LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages
/ runtime + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) /
runtime);// x2 for unicode chars
- messagingTest.close();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
index 2bb75ae..2b96c11 100644
---
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
@@ -75,59 +75,57 @@ public class NetworkServiceTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- int nameServerPort = server.getPort();
-
- final int numMessages = 10;
- final Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
- .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
- .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
- .build())
- .build();
-
- final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
- final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
-
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name2, monitor, numMessages), new
ExceptionHandler(), localAddressProvider);
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- final NetworkService<String> ns1 = new NetworkService<String>(factory, 0,
nameResolver,
- new StringCodec(), new MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler(),
localAddressProvider);
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
-
- final Identifier destId = factory.getNewInstance(name2);
- final Connection<String> conn = ns1.newConnection(destId);
- try {
- conn.open();
- for (int count = 0; count < numMessages; ++count) {
- conn.write("hello! " + count);
- }
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
- }
- conn.close();
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ int nameServerPort = server.getPort();
+
+ final int numMessages = 10;
+ final Monitor monitor = new Monitor();
+
+ // network service
+ final String name2 = "task2";
+ final String name1 = "task1";
+ final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ try (final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
+ NetworkService<String> ns2 = new NetworkService<String>(factory, 0,
nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name2, monitor, numMessages), new
ExceptionHandler(), localAddressProvider);
+ final NetworkService<String> ns1 = new
NetworkService<String>(factory, 0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name1, null, 0), new
ExceptionHandler(), localAddressProvider)) {
+
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
+
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
- ns1.close();
- ns2.close();
+ final Identifier destId = factory.getNewInstance(name2);
- server.close();
+ try (final Connection<String> conn = ns1.newConnection(destId)) {
+ conn.open();
+ for (int count = 0; count < numMessages; ++count) {
+ conn.write("hello! " + count);
+ }
+ monitor.mwait();
+
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
/**
@@ -142,75 +140,71 @@ public class NetworkServiceTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- int nameServerPort = server.getPort();
-
- final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
-
- for (int size : messageSizes) {
- final int numMessages = 300000 / (Math.max(1, size / 512));
- final Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
- .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
this.localAddress)
- .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
- .build())
- .build();
- final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
- final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
-
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name2, monitor, numMessages), new
ExceptionHandler(), localAddressProvider);
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler(),
localAddressProvider);
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- Connection<String> conn = ns1.newConnection(destId);
-
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- String message = msb.toString();
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ int nameServerPort = server.getPort();
+
+ final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ final Monitor monitor = new Monitor();
+
+ // network service
+ final String name2 = "task2";
+ final String name1 = "task1";
+ final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ try (final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
+ NetworkService<String> ns2 = new NetworkService<String>(factory,
0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name2, monitor, numMessages), new
ExceptionHandler(), localAddressProvider);
+ NetworkService<String> ns1 = new NetworkService<String>(factory,
0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name1, null, 0), new
ExceptionHandler(), localAddressProvider)) {
+
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
+
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ String message = msb.toString();
- long start = System.currentTimeMillis();
- try {
- for (int i = 0; i < numMessages; i++) {
- conn.open();
- conn.write(message);
+ long start = System.currentTimeMillis();
+ try (Connection<String> conn = ns1.newConnection(destId)) {
+ for (int i = 0; i < numMessages; i++) {
+ conn.open();
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ long end = System.currentTimeMillis();
+ double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " +
numMessages / runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 *
size) / runtime);// x2 for unicode chars
}
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
}
- long end = System.currentTimeMillis();
- double runtime = ((double) end - start) / 1000;
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages /
runtime + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) /
runtime);// x2 for unicode chars
- conn.close();
-
- ns1.close();
- ns2.close();
}
-
- server.close();
}
/**
@@ -225,98 +219,95 @@ public class NetworkServiceTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- final int nameServerPort = server.getPort();
-
- BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
-
- int numThreads = 4;
- final int size = 2000;
- final int numMessages = 300000 / (Math.max(1, size / 512));
- final int totalNumMessages = numMessages * numThreads;
-
- ExecutorService e = Executors.newCachedThreadPool();
- for (int t = 0; t < numThreads; t++) {
- final int tt = t;
-
- e.submit(new Runnable() {
- public void run() {
- try {
- Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2-" + tt;
- final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
- .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
localAddress)
- .set(NameResolverConfiguration.NAME_SERVICE_PORT,
nameServerPort)
- .build())
- .build();
-
- final Injector injector =
Tang.Factory.getTang().newInjector(nameResolverConf);
- final NameResolver nameResolver =
injector.getInstance(NameResolver.class);
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name2, monitor, numMessages), new
ExceptionHandler(), localAddressProvider);
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance(name2), new
InetSocketAddress(localAddress, port2));
-
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1-" + tt;
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name1, null, 0), new
ExceptionHandler(), localAddressProvider);
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance(name1), new
InetSocketAddress(localAddress, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- Connection<String> conn = ns1.newConnection(destId);
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- String message = msb.toString();
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ final int nameServerPort = server.getPort();
+
+ BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+ int numThreads = 4;
+ final int size = 2000;
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ final int totalNumMessages = numMessages * numThreads;
+
+ ExecutorService e = Executors.newCachedThreadPool();
+ for (int t = 0; t < numThreads; t++) {
+ final int tt = t;
+ e.submit(new Runnable() {
+ @Override
+ public void run() {
try {
- for (int i = 0; i < numMessages; i++) {
- conn.open();
- conn.write(message);
+ Monitor monitor = new Monitor();
+
+ // network service
+ final String name2 = "task2-" + tt;
+ final String name1 = "task1-" + tt;
+ final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT,
nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector =
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ try (final NameResolver nameResolver =
injector.getInstance(NameResolver.class);
+ NetworkService<String> ns2 = new
NetworkService<String>(factory, 0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name2, monitor,
numMessages), new ExceptionHandler(), localAddressProvider);
+ NetworkService<String> ns1 = new
NetworkService<String>(factory, 0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name1, null, 0), new
ExceptionHandler(), localAddressProvider)) {
+
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance(name2), new
InetSocketAddress(localAddress, port2));
+
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance(name1), new
InetSocketAddress(localAddress, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ String message = msb.toString();
+
+ try (Connection<String> conn = ns1.newConnection(destId)) {
+ for (int i = 0; i < numMessages; i++) {
+ conn.open();
+ conn.write(message);
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
- monitor.mwait();
- } catch (NetworkException e) {
+ } catch (Exception e) {
e.printStackTrace();
+ throw new RuntimeException(e);
}
- conn.close();
-
- ns1.close();
- ns2.close();
- } catch (Exception e) {
- e.printStackTrace();
-
}
- }
- });
- }
-
- // start and time
- long start = System.currentTimeMillis();
- Object ignore = new Object();
- for (int i = 0; i < numThreads; i++) barrier.add(ignore);
- e.shutdown();
- e.awaitTermination(100, TimeUnit.SECONDS);
- long end = System.currentTimeMillis();
+ });
+ }
- double runtime = ((double) end - start) / 1000;
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " +
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double)
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+ // start and time
+ long start = System.currentTimeMillis();
+ Object ignore = new Object();
+ for (int i = 0; i < numThreads; i++) barrier.add(ignore);
+ e.shutdown();
+ e.awaitTermination(100, TimeUnit.SECONDS);
+ long end = System.currentTimeMillis();
- server.close();
+ double runtime = ((double) end - start) / 1000;
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " +
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double)
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+ }
}
@Test
@@ -328,88 +319,87 @@ public class NetworkServiceTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- int nameServerPort = server.getPort();
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ int nameServerPort = server.getPort();
- final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
+ final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024};
- for (int size : messageSizes) {
- final int numMessages = 300000 / (Math.max(1, size / 512));
- int numThreads = 2;
- int totalNumMessages = numMessages * numThreads;
- final Monitor monitor = new Monitor();
+ for (int size : messageSizes) {
+ final int numMessages = 300000 / (Math.max(1, size / 512));
+ int numThreads = 2;
+ int totalNumMessages = numMessages * numThreads;
+ final Monitor monitor = new Monitor();
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
- .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
this.localAddress)
- .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
- .build())
- .build();
- final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
- final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name2, monitor, totalNumMessages), new
ExceptionHandler(), localAddressProvider);
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
+ // network service
+ final String name2 = "task2";
+ final String name1 = "task1";
+ final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler(),
localAddressProvider);
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- final Connection<String> conn = ns1.newConnection(destId);
- conn.open();
-
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- final String message = msb.toString();
+ final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
- ExecutorService e = Executors.newCachedThreadPool();
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ try (final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
+ NetworkService<String> ns2 = new NetworkService<String>(factory,
0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name2, monitor, totalNumMessages),
new ExceptionHandler(), localAddressProvider);
+ NetworkService<String> ns1 = new NetworkService<String>(factory,
0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name1, null, 0), new
ExceptionHandler(), localAddressProvider)) {
- long start = System.currentTimeMillis();
- for (int i = 0; i < numThreads; i++) {
- e.submit(new Runnable() {
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
- @Override
- public void run() {
- for (int i = 0; i < numMessages; i++) {
- conn.write(message);
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+
+ try (final Connection<String> conn = ns1.newConnection(destId)) {
+ conn.open();
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
}
- }
- });
- }
+ final String message = msb.toString();
+ ExecutorService e = Executors.newCachedThreadPool();
- e.shutdown();
- e.awaitTermination(30, TimeUnit.SECONDS);
- monitor.mwait();
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < numThreads; i++) {
+ e.submit(new Runnable() {
- long end = System.currentTimeMillis();
- double runtime = ((double) end - start) / 1000;
+ @Override
+ public void run() {
+ for (int i = 0; i < numMessages; i++) {
+ conn.write(message);
+ }
+ }
+ });
+ }
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " +
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double)
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
- conn.close();
- ns1.close();
- ns2.close();
- }
+ e.shutdown();
+ e.awaitTermination(30, TimeUnit.SECONDS);
+ monitor.mwait();
- server.close();
+ long end = System.currentTimeMillis();
+ double runtime = ((double) end - start) / 1000;
+
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " +
totalNumMessages / runtime + " bandwidth(bytes/s): " + ((double)
totalNumMessages * 2 * size) / runtime);// x2 for unicode chars
+ }
+ }
+ }
+ }
}
/**
@@ -424,80 +414,77 @@ public class NetworkServiceTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
factory);
injector.bindVolatileInstance(LocalAddressProvider.class,
this.localAddressProvider);
- final NameServer server = injector.getInstance(NameServer.class);
- int nameServerPort = server.getPort();
-
- final int batchSize = 1024 * 1024;
- final int[] messageSizes = {32, 64, 512};
-
- for (int size : messageSizes) {
- final int numMessages = 300 / (Math.max(1, size / 512));
- final Monitor monitor = new Monitor();
-
- LOG.log(Level.FINEST, "=== Test network service receiver start");
- // network service
- final String name2 = "task2";
- final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
- .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
this.localAddress)
- .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
- .build())
- .build();
-
- final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
- final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
- NetworkService<String> ns2 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name2, monitor, numMessages), new
ExceptionHandler(), localAddressProvider);
- ns2.registerId(factory.getNewInstance(name2));
- final int port2 = ns2.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
- LOG.log(Level.FINEST, "=== Test network service sender start");
- final String name1 = "task1";
- NetworkService<String> ns1 = new NetworkService<String>(
- factory, 0, nameResolver,
- new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
- new MessageHandler<String>(name1, null, 0), new ExceptionHandler(),
localAddressProvider);
- ns1.registerId(factory.getNewInstance(name1));
- final int port1 = ns1.getTransport().getListeningPort();
- server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
-
- Identifier destId = factory.getNewInstance(name2);
- Connection<String> conn = ns1.newConnection(destId);
-
- // build the message
- StringBuilder msb = new StringBuilder();
- for (int i = 0; i < size; i++) {
- msb.append("1");
- }
- String message = msb.toString();
+ try (final NameServer server = injector.getInstance(NameServer.class)) {
+ int nameServerPort = server.getPort();
+
+ final int batchSize = 1024 * 1024;
+ final int[] messageSizes = {32, 64, 512};
+
+ for (int size : messageSizes) {
+ final int numMessages = 300 / (Math.max(1, size / 512));
+ final Monitor monitor = new Monitor();
+
+ // network service
+ final String name2 = "task2";
+ final String name1 = "task1";
+ final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME,
this.localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+ .build())
+ .build();
+
+ final Injector injector2 =
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+ LOG.log(Level.FINEST, "=== Test network service receiver start");
+ LOG.log(Level.FINEST, "=== Test network service sender start");
+ try (final NameResolver nameResolver =
injector2.getInstance(NameResolver.class);
+ NetworkService<String> ns2 = new NetworkService<String>(factory,
0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name2, monitor, numMessages), new
ExceptionHandler(), localAddressProvider);
+ NetworkService<String> ns1 = new NetworkService<String>(factory,
0, nameResolver,
+ new StringCodec(), new
MessagingTransportFactory(localAddressProvider),
+ new MessageHandler<String>(name1, null, 0), new
ExceptionHandler(), localAddressProvider)) {
+
+ ns2.registerId(factory.getNewInstance(name2));
+ final int port2 = ns2.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task2"), new
InetSocketAddress(this.localAddress, port2));
+
+ ns1.registerId(factory.getNewInstance(name1));
+ final int port1 = ns1.getTransport().getListeningPort();
+ server.register(factory.getNewInstance("task1"), new
InetSocketAddress(this.localAddress, port1));
+
+ Identifier destId = factory.getNewInstance(name2);
+
+ // build the message
+ StringBuilder msb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ msb.append("1");
+ }
+ String message = msb.toString();
- long start = System.currentTimeMillis();
- try {
- for (int i = 0; i < numMessages; i++) {
- StringBuilder sb = new StringBuilder();
- for (int j = 0; j < batchSize / size; j++) {
- sb.append(message);
+ long start = System.currentTimeMillis();
+ try (Connection<String> conn = ns1.newConnection(destId)) {
+ for (int i = 0; i < numMessages; i++) {
+ StringBuilder sb = new StringBuilder();
+ for (int j = 0; j < batchSize / size; j++) {
+ sb.append(message);
+ }
+ conn.open();
+ conn.write(sb.toString());
+ }
+ monitor.mwait();
+ } catch (NetworkException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
}
- conn.open();
- conn.write(sb.toString());
+ long end = System.currentTimeMillis();
+ double runtime = ((double) end - start) / 1000;
+ long numAppMessages = numMessages * batchSize / size;
+ LOG.log(Level.FINEST, "size: " + size + "; messages/s: " +
numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages *
2 * size) / runtime);// x2 for unicode chars
}
- monitor.mwait();
- } catch (NetworkException e) {
- e.printStackTrace();
}
- long end = System.currentTimeMillis();
- double runtime = ((double) end - start) / 1000;
- long numAppMessages = numMessages * batchSize / size;
- LOG.log(Level.FINEST, "size: " + size + "; messages/s: " +
numAppMessages / runtime + " bandwidth(bytes/s): " + ((double) numAppMessages *
2 * size) / runtime);// x2 for unicode chars
- conn.close();
-
- ns1.close();
- ns2.close();
}
-
- server.close();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
deleted file mode 100644
index ba0cfe4..0000000
---
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.Connection;
-import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.NetworkConnectionService;
-import
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
-import org.apache.reef.io.network.naming.NameResolverConfiguration;
-import org.apache.reef.io.network.naming.NameServer;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.transport.LinkListener;
-
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Helper class for NetworkConnectionService test.
- */
-public final class NetworkMessagingTest {
- private static final Logger LOG =
Logger.getLogger(NetworkMessagingTest.class.getName());
-
- private final IdentifierFactory factory;
- private final NetworkConnectionService receiverNetworkConnService;
- private final NetworkConnectionService senderNetworkConnService;
- private final String receiver;
- private final String sender;
- private final NameServer nameServer;
-
- public NetworkMessagingTest(final String localAddress) throws
InjectionException {
- // name server
- final Injector injector = Tang.Factory.getTang().newInjector();
- this.nameServer = injector.getInstance(NameServer.class);
- final Configuration netConf = NameResolverConfiguration.CONF
- .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
- .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
- .build();
-
- LOG.log(Level.FINEST, "=== Test network connection service receiver
start");
- // network service for receiver
- this.receiver = "receiver";
- final Injector injectorReceiver = injector.forkInjector(netConf);
- this.receiverNetworkConnService =
injectorReceiver.getInstance(NetworkConnectionService.class);
- this.factory =
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
-
this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
-
- // network service for sender
- this.sender = "sender";
- LOG.log(Level.FINEST, "=== Test network connection service sender start");
- final Injector injectorSender = injector.forkInjector(netConf);
- senderNetworkConnService =
injectorSender.getInstance(NetworkConnectionService.class);
-
senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
- }
-
- public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
- final int numMessages, final
Monitor monitor,
- final Codec<T> codec) throws
NetworkException {
- receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec,
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
- senderNetworkConnService.registerConnectionFactory(connFactoryId, codec,
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
- }
-
- public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier
connFactoryId) {
- final Identifier destId = factory.getNewInstance(receiver);
- return
(Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
- }
-
- public void close() throws Exception {
- senderNetworkConnService.close();
- receiverNetworkConnService.close();
- nameServer.close();
- }
-
- public static final class MessageHandler<T> implements
EventHandler<Message<T>> {
- private final int expected;
- private final Monitor monitor;
- private AtomicInteger count = new AtomicInteger(0);
-
- public MessageHandler(final Monitor monitor,
- final int expected) {
- this.monitor = monitor;
- this.expected = expected;
- }
-
- @Override
- public void onNext(Message<T> value) {
- count.incrementAndGet();
- LOG.log(Level.FINE, "Count: {0}", count.get());
- LOG.log(Level.FINE,
- "OUT: {0} received {1} from {2} to {3}",
- new Object[]{value, value.getSrcId(), value.getDestId()});
-
- for (final T obj : value.getData()) {
- LOG.log(Level.FINE, "OUT: data: {0}", obj);
- }
-
- if (count.get() == expected) {
- monitor.mnotify();
- }
- }
- }
-
- public static final class TestListener<T> implements
LinkListener<Message<T>> {
- @Override
- public void onSuccess(Message<T> message) {
- LOG.log(Level.FINE, "success: " + message);
- }
- @Override
- public void onException(Throwable cause, SocketAddress remoteAddress,
Message<T> message) {
- LOG.log(Level.WARNING, "exception: " + cause + message);
- throw new RuntimeException(cause);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/bc9f5b54/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
new file mode 100644
index 0000000..0b188c5
--- /dev/null
+++
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class for NetworkConnectionService test.
+ */
+public final class NetworkMessagingTestService implements AutoCloseable {
+ private static final Logger LOG =
Logger.getLogger(NetworkMessagingTestService.class.getName());
+
+ private final IdentifierFactory factory;
+ private final NetworkConnectionService receiverNetworkConnService;
+ private final NetworkConnectionService senderNetworkConnService;
+ private final String receiver;
+ private final String sender;
+ private final NameServer nameServer;
+ private final NameResolver receiverResolver;
+ private final NameResolver senderResolver;
+
+ public NetworkMessagingTestService(final String localAddress) throws
InjectionException {
+ // name server
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ this.nameServer = injector.getInstance(NameServer.class);
+ final Configuration netConf = NameResolverConfiguration.CONF
+ .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+ .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
+ .build();
+
+ LOG.log(Level.FINEST, "=== Test network connection service receiver
start");
+ // network service for receiver
+ this.receiver = "receiver";
+ final Injector injectorReceiver = injector.forkInjector(netConf);
+ this.receiverNetworkConnService =
injectorReceiver.getInstance(NetworkConnectionService.class);
+ this.receiverResolver = injectorReceiver.getInstance(NameResolver.class);
+ this.factory =
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
+
this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
+
+ // network service for sender
+ this.sender = "sender";
+ LOG.log(Level.FINEST, "=== Test network connection service sender start");
+ final Injector injectorSender = injector.forkInjector(netConf);
+ senderNetworkConnService =
injectorSender.getInstance(NetworkConnectionService.class);
+
senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
+ this.senderResolver = injectorSender.getInstance(NameResolver.class);
+ }
+
+ public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
+ final int numMessages, final
Monitor monitor,
+ final Codec<T> codec) throws
NetworkException {
+ receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec,
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+ senderNetworkConnService.registerConnectionFactory(connFactoryId, codec,
new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+ }
+
+ public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier
connFactoryId) {
+ final Identifier destId = factory.getNewInstance(receiver);
+ return
(Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
+ }
+
+ public void close() throws Exception {
+ senderNetworkConnService.close();
+ receiverNetworkConnService.close();
+ nameServer.close();
+ receiverResolver.close();
+ senderResolver.close();
+ }
+
+ public static final class MessageHandler<T> implements
EventHandler<Message<T>> {
+ private final int expected;
+ private final Monitor monitor;
+ private AtomicInteger count = new AtomicInteger(0);
+
+ public MessageHandler(final Monitor monitor,
+ final int expected) {
+ this.monitor = monitor;
+ this.expected = expected;
+ }
+
+ @Override
+ public void onNext(Message<T> value) {
+ count.incrementAndGet();
+ LOG.log(Level.FINE, "Count: {0}", count.get());
+ LOG.log(Level.FINE,
+ "OUT: {0} received {1} from {2} to {3}",
+ new Object[]{value, value.getSrcId(), value.getDestId()});
+
+ for (final T obj : value.getData()) {
+ LOG.log(Level.FINE, "OUT: data: {0}", obj);
+ }
+
+ if (count.get() == expected) {
+ monitor.mnotify();
+ }
+ }
+ }
+
+ public static final class TestListener<T> implements
LinkListener<Message<T>> {
+ @Override
+ public void onSuccess(Message<T> message) {
+ LOG.log(Level.FINE, "success: " + message);
+ }
+ @Override
+ public void onException(Throwable cause, SocketAddress remoteAddress,
Message<T> message) {
+ LOG.log(Level.WARNING, "exception: " + cause + message);
+ throw new RuntimeException(cause);
+ }
+ }
+}
\ No newline at end of file