APEX-29 #resolve Use DefaultEventLoop.createEventLoop factory
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/8ae64ab6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/8ae64ab6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/8ae64ab6 Branch: refs/heads/master Commit: 8ae64ab64337c8c259b80661bfe125b64e417c93 Parents: 66a75e0 Author: Vlad Rozov <[email protected]> Authored: Sun Aug 2 11:32:09 2015 -0700 Committer: Vlad Rozov <[email protected]> Committed: Tue Aug 4 14:07:04 2015 -0700 ---------------------------------------------------------------------- api/pom.xml | 2 +- .../java/com/datatorrent/bufferserver/server/Server.java | 2 +- .../main/java/com/datatorrent/bufferserver/util/System.java | 2 +- .../com/datatorrent/bufferserver/client/SubscriberTest.java | 4 ++-- .../com/datatorrent/bufferserver/server/ServerTest.java | 4 ++-- .../datatorrent/bufferserver/storage/DiskStorageTest.java | 9 +++++---- .../com/datatorrent/stram/engine/StreamingContainer.java | 2 +- .../java/com/datatorrent/stram/stream/FastPublisher.java | 2 ++ .../java/com/datatorrent/stram/stream/FastStreamTest.java | 6 ++++-- .../java/com/datatorrent/stram/stream/SocketStreamTest.java | 8 ++++---- 10 files changed, 23 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/api/pom.xml ---------------------------------------------------------------------- diff --git a/api/pom.xml b/api/pom.xml index ff3f441..f04f622 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -74,7 +74,7 @@ <dependency> <groupId>${project.groupId}</groupId> <artifactId>netlet</artifactId> - <version>1.1.0</version> + <version>1.2.0-SNAPSHOT</version> </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index a8adf08..7fb4823 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -144,7 +144,7 @@ public class Server implements ServerListener port = 0; } - DefaultEventLoop eventloop = new DefaultEventLoop("alone"); + DefaultEventLoop eventloop = DefaultEventLoop.createEventLoop("alone"); eventloop.start(null, port, new Server(port)); new Thread(eventloop).start(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java index ff126cb..e9d6528 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java @@ -36,7 +36,7 @@ public class System DefaultEventLoop el = eventloops.get(identifier); if (el == null) { try { - eventloops.put(identifier, el = new DefaultEventLoop(identifier)); + eventloops.put(identifier, el = DefaultEventLoop.createEventLoop(identifier)); } catch (IOException io) { throw new RuntimeException(io); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java index 3b6b57a..cde4f69 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java @@ -52,8 +52,8 @@ public class SubscriberTest public static void setupServerAndClients() throws Exception { try { - eventloopServer = new DefaultEventLoop("server"); - eventloopClient = new DefaultEventLoop("client"); + eventloopServer = DefaultEventLoop.createEventLoop("server"); + eventloopClient = DefaultEventLoop.createEventLoop("client"); } catch (IOException ioe) { throw new RuntimeException(ioe); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java index 600f18c..de26da8 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java @@ -58,8 +58,8 @@ public class ServerTest public static void setupServerAndClients() throws Exception { try { - eventloopServer = new DefaultEventLoop("server"); - eventloopClient = new DefaultEventLoop("client"); + eventloopServer = DefaultEventLoop.createEventLoop("server"); + eventloopClient = DefaultEventLoop.createEventLoop("client"); } catch (IOException ioe) { throw new RuntimeException(ioe); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java index 08dc5b8..dac996a 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java @@ -16,12 +16,10 @@ package com.datatorrent.bufferserver.storage; import java.net.InetSocketAddress; -import static java.lang.Thread.sleep; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; import com.datatorrent.bufferserver.packet.BeginWindowTuple; import com.datatorrent.bufferserver.packet.EndWindowTuple; @@ -32,6 +30,9 @@ import com.datatorrent.bufferserver.support.Publisher; import com.datatorrent.bufferserver.support.Subscriber; import com.datatorrent.netlet.DefaultEventLoop; +import static java.lang.Thread.sleep; +import static org.testng.Assert.assertEquals; + /** * */ @@ -49,10 +50,10 @@ public class DiskStorageTest @BeforeClass public static void setupServerAndClients() throws Exception { - eventloopServer = new DefaultEventLoop("server"); + eventloopServer = DefaultEventLoop.createEventLoop("server"); eventloopServer.start(); - eventloopClient = new DefaultEventLoop("client"); + eventloopClient = DefaultEventLoop.createEventLoop("client"); eventloopClient.start(); instance = new Server(0, 1024,8); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 35861f1..9db88ee 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -125,7 +125,7 @@ public class StreamingContainer extends YarnContainerMain static { try { - eventloop = new DefaultEventLoop("ProcessWideEventLoop"); + eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop"); } catch (IOException io) { throw new RuntimeException(io); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java index 188fb7a..887c363 100644 --- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java +++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java @@ -189,6 +189,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream if (!write) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); write = true; + key.selector().wakeup(); } } } @@ -484,6 +485,7 @@ public class FastPublisher extends Kryo implements ClientListener, Stream if (!write) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); write = true; + key.selector().wakeup(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java index e23358b..c7ed83c 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/FastStreamTest.java @@ -16,7 +16,7 @@ package com.datatorrent.stram.stream; import java.io.IOException; -import static java.lang.Thread.sleep; + import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +36,8 @@ import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; +import static java.lang.Thread.sleep; + /** * */ @@ -52,7 +54,7 @@ public class FastStreamTest static { try { - eventloop = new DefaultEventLoop("StreamTestEventLoop"); + eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop"); } catch (IOException ex) { throw new RuntimeException(ex); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8ae64ab6/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java index 7702b85..2cdddc5 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java @@ -19,8 +19,6 @@ import com.datatorrent.stram.codec.DefaultStatefulStreamCodec; import com.datatorrent.stram.engine.StreamContext; import com.datatorrent.stram.engine.SweepableReservoir; import com.datatorrent.stram.support.StramTestSupport; -import com.datatorrent.stram.stream.BufferServerPublisher; -import com.datatorrent.stram.stream.BufferServerSubscriber; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; import com.datatorrent.api.Sink; @@ -29,13 +27,15 @@ import com.datatorrent.bufferserver.server.Server; import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.EventLoop; import java.io.IOException; -import static java.lang.Thread.sleep; + import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.Thread.sleep; + /** * */ @@ -49,7 +49,7 @@ public class SocketStreamTest static { try { - eventloop = new DefaultEventLoop("StreamTestEventLoop"); + eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop"); } catch (IOException ex) { throw new RuntimeException(ex);
