Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 38a6f4e3b -> c33694866
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java index c84a127..0bfcf9b 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java @@ -18,11 +18,9 @@ */ package com.datatorrent.bufferserver.packet; -import org.testng.Assert; import org.testng.annotations.Test; -import com.datatorrent.bufferserver.packet.PublishRequestTuple; -import com.datatorrent.bufferserver.packet.Tuple; +import static org.testng.Assert.assertEquals; /** * @@ -41,8 +39,8 @@ public class PublishRequestTupleTest byte[] serial = PublishRequestTuple.getSerializedRequest(null, pubId, windowId); PublishRequestTuple request = (PublishRequestTuple)Tuple.getTuple(serial, 0, serial.length); - Assert.assertEquals(request.identifier, pubId, "Identifier"); - Assert.assertEquals(Long.toHexString((long)request.baseSeconds << 32 | request.windowId), Long.toHexString(windowId), "Window"); + assertEquals(request.identifier, pubId, "Identifier"); + assertEquals((long)request.baseSeconds << 32 | request.windowId, windowId, "Window"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java index f5176ec..fcdf04b 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java @@ -18,11 +18,9 @@ */ package com.datatorrent.bufferserver.packet; -import org.testng.Assert; import org.testng.annotations.Test; -import com.datatorrent.bufferserver.packet.ResetWindowTuple; -import com.datatorrent.bufferserver.packet.Tuple; +import static org.testng.Assert.assertEquals; /** * @@ -39,7 +37,7 @@ public class ResetWindowTupleTest byte[] serial = ResetWindowTuple.getSerializedTuple(0x7afebabe, 500); ResetWindowTuple tuple = (ResetWindowTuple)Tuple.getTuple(serial, 0, serial.length); - Assert.assertEquals(tuple.getBaseSeconds(), 0x7afebabe, "base seconds"); - Assert.assertEquals(tuple.getWindowWidth(), 500, "window width"); + assertEquals(tuple.getBaseSeconds(), 0x7afebabe, "base seconds"); + assertEquals(tuple.getWindowWidth(), 500, "window width"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java index df29222..20c658a 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java @@ -19,9 +19,11 @@ package com.datatorrent.bufferserver.packet; import java.util.ArrayList; -import org.testng.Assert; import org.testng.annotations.Test; +import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; /** * @@ -42,17 +44,17 @@ public class SubscribeRequestTupleTest ArrayList<Integer> partitions = new ArrayList<Integer>(); partitions.add(5); long startingWindowId = 0xcafebabe00000078L; - byte[] serial = SubscribeRequestTuple.getSerializedRequest(null, id, down_type, upstream_id, mask, partitions, startingWindowId, 0); + byte[] serial = getSerializedRequest(null, id, down_type, upstream_id, mask, partitions, startingWindowId, 0); SubscribeRequestTuple tuple = (SubscribeRequestTuple)Tuple.getTuple(serial, 0, serial.length); - Assert.assertEquals(tuple.getIdentifier(), id, "Identifier"); - Assert.assertEquals(tuple.getStreamType(), down_type, "UpstreamType"); - Assert.assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId"); - Assert.assertEquals(tuple.getMask(), mask, "Mask"); + assertEquals(tuple.getIdentifier(), id, "Identifier"); + assertEquals(tuple.getStreamType(), down_type, "UpstreamType"); + assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId"); + assertEquals(tuple.getMask(), mask, "Mask"); int[] parts = tuple.getPartitions(); - Assert.assertTrue(parts != null && parts.length == 1 && parts[0] == 5); + assertTrue(parts != null && parts.length == 1 && parts[0] == 5); - Assert.assertEquals(Long.toHexString((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId()), Long.toHexString(startingWindowId), "Window"); + assertEquals((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId(), startingWindowId, "Window"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java index 5ae3d6d..b7d8de1 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java @@ -24,7 +24,6 @@ import java.security.SecureRandom; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -39,7 +38,9 @@ import com.datatorrent.bufferserver.support.Subscriber; import com.datatorrent.netlet.DefaultEventLoop; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; /** * @@ -63,8 +64,7 @@ public class ServerTest try { eventloopServer = DefaultEventLoop.createEventLoop("server"); eventloopClient = DefaultEventLoop.createEventLoop("client"); - } - catch (IOException ioe) { + } catch (IOException ioe) { throw new RuntimeException(ioe); } eventloopServer.start(); @@ -72,7 +72,8 @@ public class ServerTest instance = new Server(0, 4096,8); address = instance.run(eventloopServer); - assert (address instanceof InetSocketAddress); + assertTrue(address instanceof InetSocketAddress); + assertFalse(address.isUnresolved()); SecureRandom random = new SecureRandom(); authToken = new byte[20]; @@ -90,10 +91,10 @@ public class ServerTest public void testNoPublishNoSubscribe() throws InterruptedException { bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); @@ -113,10 +114,10 @@ public class ServerTest public void test1Window() throws InterruptedException { bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); @@ -139,7 +140,7 @@ public class ServerTest eventloopClient.disconnect(bss); eventloopClient.disconnect(bsp); - Assert.assertFalse(bss.resetPayloads.isEmpty()); + assertFalse(bss.resetPayloads.isEmpty()); } @Test(dependsOnMethods = {"test1Window"}) @@ -147,7 +148,7 @@ public class ServerTest public void testLateSubscriber() throws InterruptedException { bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); @@ -162,7 +163,7 @@ public class ServerTest eventloopClient.disconnect(bss); assertEquals(bss.tupleCount.get(), 1); - Assert.assertFalse(bss.resetPayloads.isEmpty()); + assertFalse(bss.resetPayloads.isEmpty()); } @Test(dependsOnMethods = {"testLateSubscriber"}) @@ -170,11 +171,11 @@ public class ServerTest public void testATonOfData() throws InterruptedException { bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bsp.activate(null, 0x7afebabe, 0); long windowId = 0x7afebabe00000000L; @@ -221,7 +222,7 @@ public class ServerTest { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.purge(null, "MyPublisher", 0); for (int i = 0; i < spinCount; i++) { @@ -235,7 +236,7 @@ public class ServerTest assertNotNull(bsc.data); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -253,7 +254,7 @@ public class ServerTest public void testPurgeSome() throws InterruptedException { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.purge(null, "MyPublisher", 0x7afebabe00000000L); for (int i = 0; i < spinCount; i++) { @@ -267,7 +268,7 @@ public class ServerTest assertNotNull(bsc.data); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -284,7 +285,7 @@ public class ServerTest public void testPurgeAll() throws InterruptedException { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.purge(null, "MyPublisher", 0x7afebabe00000001L); for (int i = 0; i < spinCount; i++) { @@ -298,7 +299,7 @@ public class ServerTest assertNotNull(bsc.data); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { @@ -323,7 +324,7 @@ public class ServerTest public void testRepublishLowerWindow() throws InterruptedException { bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bsp.activate(null, 10, 0); @@ -354,7 +355,7 @@ public class ServerTest eventloopClient.disconnect(bsp); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { @@ -375,7 +376,7 @@ public class ServerTest public void testReset() throws InterruptedException { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.reset(null, "MyPublisher", 0x7afebabe00000001L); for (int i = 0; i < spinCount * 2; i++) { @@ -389,7 +390,7 @@ public class ServerTest assertNotNull(bsc.data); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { @@ -421,13 +422,13 @@ public class ServerTest public void testEarlySubscriberForLaterWindow() throws InterruptedException { bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 49L, 0); /* wait in a hope that the subscriber is able to reach the server */ Thread.sleep(100); bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bsp.activate(null, 0, 0); @@ -465,18 +466,18 @@ public class ServerTest bsp = new Publisher("MyPublisher"); bsp.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); bss.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); long resetInfo = 0x7afebabe000000faL; - bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500)); + bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int)(resetInfo >> 32), 500)); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -490,7 +491,7 @@ public class ServerTest eventloopClient.disconnect(bsp); assertEquals(bss.tupleCount.get(), 1); - Assert.assertFalse(bss.resetPayloads.isEmpty()); + assertFalse(bss.resetPayloads.isEmpty()); } @Test(dependsOnMethods = {"testAuth"}) @@ -501,18 +502,18 @@ public class ServerTest bsp = new Publisher("MyPublisher"); bsp.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); bss.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); long resetInfo = 0x7afebabe000000faL; - bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500)); + bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int)(resetInfo >> 32), 500)); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -526,7 +527,7 @@ public class ServerTest eventloopClient.disconnect(bsp); assertEquals(bss.tupleCount.get(), 0); - Assert.assertTrue(bss.resetPayloads.isEmpty()); + assertTrue(bss.resetPayloads.isEmpty()); } private static final Logger logger = LoggerFactory.getLogger(ServerTest.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java index 8a4297c..755298a 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java @@ -35,6 +35,7 @@ import com.datatorrent.netlet.DefaultEventLoop; import static java.lang.Thread.sleep; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; /** * @@ -63,16 +64,16 @@ public class DiskStorageTest instance.setSpoolStorage(new DiskStorage()); address = instance.run(eventloopServer); - assert (address instanceof InetSocketAddress); + assertFalse(address.isUnresolved()); bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsc = new Controller("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); } @AfterClass @@ -128,7 +129,7 @@ public class DiskStorageTest assertEquals(bss.tupleCount.get(), 2004); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java index 2e2ba18..3c0cb0e 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java @@ -34,7 +34,8 @@ public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber { public final ArrayList<Object> resetPayloads = new ArrayList<Object>(); public AtomicInteger tupleCount = new AtomicInteger(0); - public WindowIdHolder firstPayload, lastPayload; + public WindowIdHolder firstPayload; + public WindowIdHolder lastPayload; public Subscriber(String id) { @@ -42,7 +43,8 @@ public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber } @Override - public void activate(String version, String type, String sourceId, int mask, Collection<Integer> partitions, long windowId, int bufferSize) + public void activate(final String version, final String type, final String sourceId, final int mask, + final Collection<Integer> partitions, final long windowId, final int bufferSize) { tupleCount.set(0); firstPayload = lastPayload = null; @@ -67,6 +69,9 @@ public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber case RESET_WINDOW: resetWindow(tuple.getBaseSeconds(), tuple.getWindowWidth()); break; + + default: + break; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java index d3aebe7..0b2ab6d 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java @@ -22,8 +22,8 @@ package com.datatorrent.bufferserver.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; /** * @@ -33,7 +33,7 @@ public class CodecTest @Test public void testSomeMethod() { - byte buffer[] = new byte[10]; + byte[] buffer = new byte[10]; int value = 127; VarInt.write(value, buffer, 0);
