Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 fdc91fa3f -> fd670e6d4


 - MLHR-1864 #resolve #comment Made adding a query to a queue atomic
 - Fixed race condition where a query could be accessed from the queue and then 
have its countdown decremented.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/fd670e6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/fd670e6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/fd670e6d

Branch: refs/heads/devel-3
Commit: fd670e6d400d816d26d0de1ca08a3c12f8278ae5
Parents: fdc91fa
Author: Timothy Farkas <[email protected]>
Authored: Thu Oct 1 17:55:42 2015 -0700
Committer: Timothy Farkas <[email protected]>
Committed: Thu Oct 1 22:39:57 2015 -0700

----------------------------------------------------------------------
 .../query/AbstractWindowEndQueueManager.java    | 18 ++++++++++-----
 .../appdata/query/QueryManagerAsynchronous.java | 23 ++++++++++++++------
 2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fd670e6d/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
 
b/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
index 8b6301e..951f591 100644
--- 
a/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
+++ 
b/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
@@ -93,12 +93,14 @@ public abstract class 
AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY, QUEU
 
     QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> node = 
new QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, 
QUEUE_CONTEXT>>(queryQueueable);
 
-    if(addingFilter(queryQueueable)) {
-      queryQueue.enqueue(node);
-      numLeft.getAndIncrement();
-      semaphore.release();
+    synchronized (numLeft) {
+      if (addingFilter(queryQueueable)) {
+        queryQueue.enqueue(node);
+        numLeft.getAndIncrement();
+        semaphore.release();
 
-      addedNode(node);
+        addedNode(node);
+      }
     }
 
     return true;
@@ -241,6 +243,12 @@ public abstract class 
AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY, QUEU
     return qq;
   }
 
+  //Dirty hack TODO fix QueueManager interface
+  public boolean isEmptyAndBlocked()
+  {
+    return numLeft.get() == 0 && semaphore.availablePermits() == 0;
+  }
+
   private void acquire()
   {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fd670e6d/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
 
b/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
index c6b9a5f..4c33183 100644
--- 
a/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
+++ 
b/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
@@ -47,6 +47,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, 
QUEUE_CONTEXT, RES
 {
   private DefaultOutputPort<String> resultPort = null;
 
+  //TODO I believe this semaphore is no longer necessary and can just be 
straight up deleted.
   private transient final Semaphore inWindowSemaphore = new Semaphore(0);
   private final ConcurrentLinkedQueue<String> queue = new 
ConcurrentLinkedQueue<String>();
   private QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager;
@@ -123,11 +124,10 @@ public class QueryManagerAsynchronous<QUERY_TYPE, 
META_QUERY, QUEUE_CONTEXT, RES
   {
     queueManager.haltEnqueue();
 
-    while(queueManager.getNumLeft() > 0) {
-      if(queue.isEmpty()) {
+    while (!isProcessingDone()) {
+      if (queue.isEmpty()) {
         Thread.yield();
-      }
-      else {
+      } else {
         emptyQueue();
       }
     }
@@ -142,6 +142,16 @@ public class QueryManagerAsynchronous<QUERY_TYPE, 
META_QUERY, QUEUE_CONTEXT, RES
     }
   }
 
+  //Dirty hack TODO fix QueManager interface
+  private boolean isProcessingDone()
+  {
+    if (queueManager instanceof AbstractWindowEndQueueManager) {
+      return ((AbstractWindowEndQueueManager) 
queueManager).isEmptyAndBlocked();
+    }
+
+    return queueManager.getNumLeft() == 0;
+  }
+
   private void emptyQueue()
   {
     while(!queue.isEmpty()) {
@@ -201,8 +211,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, 
META_QUERY, QUEUE_CONTEXT, RES
 
         try {
           inWindowSemaphore.acquire();
-        }
-        catch(InterruptedException ex) {
+        } catch (InterruptedException ex) {
           throw new RuntimeException(ex);
         }
 
@@ -210,7 +219,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, 
META_QUERY, QUEUE_CONTEXT, RES
         Result result = queryExecutor.executeQuery(queryBundle.getQuery(),
                                                    queryBundle.getMetaQuery(),
                                                    
queryBundle.getQueueContext());
-        if(result != null) {
+        if (result != null) {
           String serializedMessage = 
messageSerializerFactory.serialize(result);
           queue.add(serializedMessage);
         }

Reply via email to