Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 fdc91fa3f -> fd670e6d4
- MLHR-1864 #resolve #comment Made adding a query to a queue atomic - Fixed race condition where a query could be accessed from the queue and then have its countdown decremented. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/fd670e6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/fd670e6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/fd670e6d Branch: refs/heads/devel-3 Commit: fd670e6d400d816d26d0de1ca08a3c12f8278ae5 Parents: fdc91fa Author: Timothy Farkas <[email protected]> Authored: Thu Oct 1 17:55:42 2015 -0700 Committer: Timothy Farkas <[email protected]> Committed: Thu Oct 1 22:39:57 2015 -0700 ---------------------------------------------------------------------- .../query/AbstractWindowEndQueueManager.java | 18 ++++++++++----- .../appdata/query/QueryManagerAsynchronous.java | 23 ++++++++++++++------ 2 files changed, 29 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fd670e6d/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java b/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java index 8b6301e..951f591 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java @@ -93,12 +93,14 @@ public abstract class AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY, QUEU QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> node = new QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>>(queryQueueable); - if(addingFilter(queryQueueable)) { - queryQueue.enqueue(node); - numLeft.getAndIncrement(); - semaphore.release(); + synchronized (numLeft) { + if (addingFilter(queryQueueable)) { + queryQueue.enqueue(node); + numLeft.getAndIncrement(); + semaphore.release(); - addedNode(node); + addedNode(node); + } } return true; @@ -241,6 +243,12 @@ public abstract class AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY, QUEU return qq; } + //Dirty hack TODO fix QueueManager interface + public boolean isEmptyAndBlocked() + { + return numLeft.get() == 0 && semaphore.availablePermits() == 0; + } + private void acquire() { try { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fd670e6d/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java b/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java index c6b9a5f..4c33183 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java @@ -47,6 +47,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RES { private DefaultOutputPort<String> resultPort = null; + //TODO I believe this semaphore is no longer necessary and can just be straight up deleted. private transient final Semaphore inWindowSemaphore = new Semaphore(0); private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); private QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager; @@ -123,11 +124,10 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RES { queueManager.haltEnqueue(); - while(queueManager.getNumLeft() > 0) { - if(queue.isEmpty()) { + while (!isProcessingDone()) { + if (queue.isEmpty()) { Thread.yield(); - } - else { + } else { emptyQueue(); } } @@ -142,6 +142,16 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RES } } + //Dirty hack TODO fix QueManager interface + private boolean isProcessingDone() + { + if (queueManager instanceof AbstractWindowEndQueueManager) { + return ((AbstractWindowEndQueueManager) queueManager).isEmptyAndBlocked(); + } + + return queueManager.getNumLeft() == 0; + } + private void emptyQueue() { while(!queue.isEmpty()) { @@ -201,8 +211,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RES try { inWindowSemaphore.acquire(); - } - catch(InterruptedException ex) { + } catch (InterruptedException ex) { throw new RuntimeException(ex); } @@ -210,7 +219,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RES Result result = queryExecutor.executeQuery(queryBundle.getQuery(), queryBundle.getMetaQuery(), queryBundle.getQueueContext()); - if(result != null) { + if (result != null) { String serializedMessage = messageSerializerFactory.serialize(result); queue.add(serializedMessage); }
