- MLHR-1876 #resolve Cleanly terminated window bounded service thread and removed unnecessary interrupt of main operator thread. - Fixed emitting schemas on output port in a different thread. - Closed connections in WebSocketInput Operator on teardown - Removed obsolete unit test
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/79e7eadf Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/79e7eadf Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/79e7eadf Branch: refs/heads/master Commit: 79e7eadf27879e4866ac399e31a41eb1a669eb0c Parents: 0bb9599 Author: Timothy Farkas <[email protected]> Authored: Wed Oct 21 21:33:53 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Thu Oct 22 15:19:32 2015 -0700 ---------------------------------------------------------------------- .../lib/appdata/query/WindowBoundedService.java | 41 ++++++++++++++++---- .../snapshot/AbstractAppDataSnapshotServer.java | 21 +++++++--- .../lib/io/WebSocketInputOperator.java | 9 +++++ .../appdata/query/WindowBoundedServiceTest.java | 27 ------------- 4 files changed, 59 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java index 6d229fd..4f653a3 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java @@ -22,7 +22,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; - +import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; @@ -59,6 +59,7 @@ public class WindowBoundedService implements Component<OperatorContext> protected transient ExecutorService executorThread; private final transient Semaphore mutex = new Semaphore(0); + private volatile boolean terminated = false; public WindowBoundedService(Runnable runnable) { @@ -78,8 +79,8 @@ public class WindowBoundedService implements Component<OperatorContext> @Override public void setup(OperatorContext context) { - executorThread = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory("Query Executor Thread")); - executorThread.submit(new AsynchExecutorThread(Thread.currentThread())); + executorThread = Executors.newSingleThreadExecutor(new NameableThreadFactory("Query Executor Thread")); + executorThread.submit(new AsynchExecutorThread()); } public void beginWindow(long windowId) @@ -99,17 +100,31 @@ public class WindowBoundedService implements Component<OperatorContext> @Override public void teardown() { - executorThread.shutdownNow(); + LOG.info("Shutting down"); + terminated = true; + mutex.release(); + + executorThread.shutdown(); + + try { + executorThread.awaitTermination(10000L + executeIntervalMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + //Do nothing + } } public class AsynchExecutorThread implements Callable<Void> { - private final Thread mainThread; private long lastExecuteTime = 0; + public AsynchExecutorThread() + { + } + + @Deprecated public AsynchExecutorThread(Thread mainThread) { - this.mainThread = mainThread; + //Do nothing } @Override @@ -121,7 +136,6 @@ public class WindowBoundedService implements Component<OperatorContext> } catch (Exception e) { LOG.error("Exception thrown while processing:", e); mutex.release(); - mainThread.interrupt(); } return null; @@ -133,12 +147,25 @@ public class WindowBoundedService implements Component<OperatorContext> while (true) { long currentTime = System.currentTimeMillis(); long diff = currentTime - lastExecuteTime; + if (diff > executeIntervalMillis) { lastExecuteTime = currentTime; mutex.acquireUninterruptibly(); + + if (terminated) { + LOG.info("Terminated"); + return; + } + runnable.run(); mutex.release(); } else { + + if (terminated) { + LOG.info("Terminated"); + return; + } + Thread.sleep(executeIntervalMillis - diff); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java index e10bf9e..a309746 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java @@ -21,6 +21,7 @@ package com.datatorrent.lib.appdata.snapshot; import java.io.IOException; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.validation.constraints.NotNull; @@ -92,9 +93,10 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper */ private List<GPOMutable> currentData = Lists.newArrayList(); private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider; + private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>(); @AppData.ResultPort - public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>(); @AppData.QueryPort @InputPortFieldAnnotation(optional=true) @@ -118,9 +120,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query); if (schemaResult != null) { - String schemaResultJSON = resultSerializerFactory.serialize(schemaResult); - LOG.debug("emitting {}", schemaResultJSON); - queryResult.emit(schemaResultJSON); + LOG.debug("queueing {}", schemaResult); + schemaQueue.add(schemaResult); } } else if (query instanceof DataQuerySnapshot) { queryProcessor.enqueue((DataQuerySnapshot)query, null, null); @@ -208,7 +209,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper } { - Result result = null; + Result result; while((result = queryProcessor.process()) != null) { String resultJSON = resultSerializerFactory.serialize(result); @@ -217,6 +218,16 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper } } + { + SchemaResult schemaResult; + + while ((schemaResult = schemaQueue.poll()) != null) { + String schemaResultJSON = resultSerializerFactory.serialize(schemaResult); + LOG.debug("emitting {}", schemaResultJSON); + queryResult.emit(schemaResultJSON); + } + } + queryProcessor.endWindow(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java index a51dc6f..3d7bc7a 100644 --- a/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/WebSocketInputOperator.java @@ -137,6 +137,15 @@ public class WebSocketInputOperator<T> extends SimpleSinglePortInputOperator<T> catch (Exception ex) { LOG.error("Error joining monitor", ex); } + + if (connection != null) { + connection.close(); + } + + if (client != null) { + client.close(); + } + super.teardown(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/79e7eadf/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java index 9da82e9..3fb3780 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/query/WindowBoundedServiceTest.java @@ -64,33 +64,6 @@ public class WindowBoundedServiceTest Assert.assertTrue(counterRunnable.getCounter() > 0); } - @Test - public void exceptionTest() throws Exception - { - WindowBoundedService wbs = new WindowBoundedService(1, - new ExceptionRunnable()); - - wbs.setup(null); - wbs.beginWindow(0); - - boolean caughtException = false; - - try { - Thread.sleep(500); - } catch (InterruptedException e) { - caughtException = true; - } - - try { - wbs.endWindow(); - } catch(Exception e) { - caughtException = true; - } - - wbs.teardown(); - Assert.assertEquals(true, caughtException); - } - public static class CounterRunnable implements Runnable { private int counter = 0;
