Author: cwiklik
Date: Wed Aug 14 20:54:08 2013
New Revision: 1514040

URL: http://svn.apache.org/r1514040
Log:
UIMA-3160 added support for concurrent CAS processing in the async aggregate

Added:
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
 Wed Aug 14 20:54:08 2013
@@ -723,7 +723,7 @@ public abstract class BaseTestSupport ex
               System.out.println("runTest: Incrementing ProcessTimeout 
Counter");
               timeoutCounter++;
             }
-          } else if (engine != null && e instanceof UimaASPingTimeout) {
+          } else if (engine != null && (e instanceof UimaASPingTimeout || 
(e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) )) {
             System.out.println("runTest: Ping Timeout - service Not Responding 
To Ping");
             if (cpcLatch != null) {
               cpcLatch.countDown();

Added: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml?rev=1514040&view=auto
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
 (added)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
 Wed Aug 14 20:54:08 2013
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+       <!--
+        ***************************************************************
+        * Licensed to the Apache Software Foundation (ASF) under one
+        * or more contributor license agreements.  See the NOTICE file
+        * distributed with this work for additional information
+        * regarding copyright ownership.  The ASF licenses this file
+        * to you under the Apache License, Version 2.0 (the
+        * "License"); you may not use this file except in compliance
+        * with the License.  You may obtain a copy of the License at
+         *
+        *   http://www.apache.org/licenses/LICENSE-2.0
+        * 
+        * Unless required by applicable law or agreed to in writing,
+        * software distributed under the License is distributed on an
+        * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+        * KIND, either express or implied.  See the License for the
+        * specific language governing permissions and limitations
+        * under the License.
+        ***************************************************************
+   -->
+   
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier";>
+  <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+  <primitive>false</primitive>
+  <delegateAnalysisEngineSpecifiers>
+    
+    <delegateAnalysisEngine key="TestMultiplier">
+      <import location="../multiplier/SimpleCasGenerator.xml"/>
+    </delegateAnalysisEngine>
+
+
+      <delegateAnalysisEngine key="NoOp">
+      <import location="NoOpAnnotatorWithLongDelay.xml"/>
+    </delegateAnalysisEngine>
+  
+  </delegateAnalysisEngineSpecifiers>
+  <analysisEngineMetaData>
+    <name>Test Aggregate TAE</name>
+    <description>Detects Nothing</description>
+    <configurationParameters/>
+    <configurationParameterSettings/>
+    <flowConstraints>
+      <fixedFlow>
+      
+        <node>TestMultiplier</node>
+        <node>NoOp</node> 
+      </fixedFlow>
+    </flowConstraints>
+    <capabilities>
+      <capability>
+        <inputs/>
+        <outputs>
+        </outputs>
+        <languagesSupported>
+          <language>en</language>
+        </languagesSupported>
+      </capability>
+    </capabilities>
+       <operationalProperties>
+               <modifiesCas>true</modifiesCas>
+               <multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+               <outputsNewCASes>false</outputsNewCASes>
+       </operationalProperties>
+  </analysisEngineMetaData>
+</analysisEngineDescription>

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
 Wed Aug 14 20:54:08 2013
@@ -132,4 +132,5 @@ public interface AggregateAnalysisEngine
   
   public void changeCollocatedDelegateState( String delegateKey, ServiceState 
state ) throws Exception;
 
+  public int getServiceCasPoolSize();
 }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
 Wed Aug 14 20:54:08 2013
@@ -37,6 +37,7 @@ import org.apache.uima.UIMARuntimeExcept
 import org.apache.uima.aae.AsynchAECasManager;
 import org.apache.uima.aae.InProcessCache;
 import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.AsynchAECasManager_impl;
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaClassFactory;
@@ -160,7 +161,19 @@ public class AggregateAnalysisEngineCont
   private HashMap<String, String> casLogDirMap; // = new HashMap<String, 
String>();
   // Base time used for default XmiCas file names.
   private Long initializationTime = System.nanoTime();
-
+  
+  // Used by async aggregate to throttle CAS ingestion from an input queue. 
The 
+  // semaphore is lazily created on the first Process request and is assigned
+  // the number of permits equal to the size of CasPool configured for this
+  // service. A permit is acquired right after process() is called which means
+  // that the CAS has been handed off to another async delegate. The permit is
+  // released when the CAS is fully processed and reply sent to a client. This
+  // allows the async aggregate to concurrently process as many CASes as there 
+  // are CASes in a CasPool. It also ensures that this aggregate doesnt try
+  // to ingest more CASes as it is currently capable of processing without 
waiting
+  // for a free CAS from the CasPool.
+  public Semaphore semaphore = null;
+  
   /**
    * 
    * @param anEndpointName
@@ -3136,7 +3149,17 @@ public class AggregateAnalysisEngineCont
   }
 
   public void stop() {
-    super.stop(true);  // shutdown now
+         super.stop(true);  // shutdown now
+         
+       // release all permits
+         if ( semaphore != null ) {
+                 while ( semaphore.availablePermits() > 0) {
+                               semaphore.release();
+                 }
+         }
+
+   
+    
     this.cleanUp();
         // dont kill jUnit tests
     if (isTopLevelComponent() &&  System.getProperty("dontKill") == null) {
@@ -3254,4 +3277,7 @@ public class AggregateAnalysisEngineCont
     enableCasLogMap.put(key, false);
     initializationTime = System.nanoTime();
   }
+  public int getServiceCasPoolSize() {
+        return ((AsynchAECasManager_impl)casManager).getCasPoolSize();
+  }
 }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 Wed Aug 14 20:54:08 2013
@@ -122,7 +122,7 @@ public abstract class BaseAnalysisEngine
 
   private OutputChannel outputChannel;
 
-  private AsynchAECasManager casManager;
+  protected AsynchAECasManager casManager;
 
   private InProcessCache inProcessCache;
 
@@ -940,8 +940,7 @@ public abstract class BaseAnalysisEngine
                 new Object[] { getComponentName(), 
getUimaContextAdmin().getQualifiedContextName(),
                     aComponentCasPoolSize, anInitialCasHeapSize / 4 });
       }
-    }
-
+    } 
   }
 
   public boolean isTopLevelComponent() {
@@ -1403,12 +1402,11 @@ public abstract class BaseAnalysisEngine
     if ( entry != null ) {
       CAS cas = inProcessCache.getCasByReference(aCasReferenceId);
       if (deleteCacheEntry) {
-             // Release semaphore that is shared with a thread that received 
the CAS
-             // to unlock the thread. This thread is blocking to prevent it 
from 
-             // receiving another CAS.
-        Semaphore threadLocalSemaphore=null;
-        if ( !isPrimitive() && (threadLocalSemaphore = 
entry.getThreadCompletionSemaphore()) != null ) {
-          threadLocalSemaphore.release();
+             // Release semaphore which throttles ingestion of CASes from 
service
+             // input queue.
+        Semaphore semaphore=null;
+        if ( !isPrimitive() && (semaphore = 
entry.getThreadCompletionSemaphore()) != null ) {
+          semaphore.release();
         }
   
         inProcessCache.remove(aCasReferenceId);

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
 Wed Aug 14 20:54:08 2013
@@ -65,6 +65,12 @@ public class ProcessRequestHandler_impl 
 
   private Object mux = new Object();
 
+  // controlls access to Aggregates semaphore which
+  // throttles ingestion of CASes from service input queue
+  private Object lock = new Object();
+  
+  
+  
   public ProcessRequestHandler_impl(String aName) {
     super(aName);
   }
@@ -251,23 +257,32 @@ public class ProcessRequestHandler_impl 
               casReferenceId, marker, acceptsDeltaCas);
       
       /*
-       * In UIMA AS Aggregate the receiving thread must be blocked until a CAS 
is fully
-       * processed. This is to prevent the receiving thread from grabbing 
another CAS
-       * breaking prefetch throttling. The receiving thread takes a CAS from 
service queue,
-       * deserializes CAS, asks the FC for the next step and enqueues the CAS
-       * onto delegate's queue. Once the enqueue completes, the thread is done
-       * and ready to get more CASes from the service queue. The receiving 
must 
-       * therefor be blocked right after it enqueues the CAS on delegates 
queue. 
-       * To that end, while handling a new CAS, create a shared semaphore and
-       * associate it with a current thread as ThreadLocal variable. Also, 
associate the
-       * same semaphore with a CAS so that when the CAS is sent back to the 
client the
-       * the receiving thread is unblocked.
+               Lazily create a Semaphore that will be used to throttle CAS 
ingestion from service
+               input queue. This only applies to async aggregates. The 
semaphore is initialized
+               with the number of permits equal to the service CasPool size. 
The idea is that this
+               service should only ingest as many CASes as it is capable of 
processing without
+               waiting for a free instance of CAS from the service CasPool.
       */
-      if ( !getController().isPrimitive() ) {
-        Semaphore semaphore = new Semaphore(0);
-        //  threadCompletionMonitor is a ThreadLocal var
-        threadCompletionMonitor.set(semaphore);
-        entry.setThreadCompletionSemaphore(semaphore);
+      boolean inputCAS = aMessageContext
+           .getMessageStringProperty(AsynchAEMessage.InputCasReference) == 
null ? true : false;
+      if ( !getController().isPrimitive() && inputCAS) {
+        
+         try {
+                 synchronized(lock) {
+                         // lazily create a Semaphore on the first Process 
request. This semaphore
+                         // will throttle ingestion of CASes from service 
input queue.
+                         if (((AggregateAnalysisEngineController_impl) 
getController()).semaphore == null) {
+                                 ((AggregateAnalysisEngineController_impl) 
getController()).semaphore = 
+                                                 new Semaphore(
+                                                 
((AggregateAnalysisEngineController) getController())
+                                                 .getServiceCasPoolSize()-1);
+                                // semaphore.acquire();
+                         }
+                 }
+         } catch( Exception e) {
+                 throw e;
+         }
+        
entry.setThreadCompletionSemaphore(((AggregateAnalysisEngineController_impl) 
getController()).semaphore);
       }
       long timeToDeserializeCAS = getController().getCpuTime() - t1;
       getController().incrementDeserializationTime(timeToDeserializeCAS);
@@ -473,8 +488,9 @@ public class ProcessRequestHandler_impl 
       // its queue possibly from the same client. Only the first message for 
any given
       // CasReferenceId
       // should be processed.
+      CasStateEntry cse = null;
       if (!getController().getInProcessCache().entryExists(casReferenceId)) {
-        CasStateEntry cse = null;
+        
         if (getController().getLocalCache().lookupEntry(casReferenceId) == 
null) {
           // Create a new entry in the local cache for the CAS received from 
the remote
           cse = 
getController().getLocalCache().createCasStateEntry(casReferenceId);
@@ -533,16 +549,17 @@ public class ProcessRequestHandler_impl 
          * semaphore to block the thread. It will be unblocked when the 
aggregate is done with
          * the CAS.
          */
-        if (!getController().isPrimitive() ) {
-          Semaphore completionSemaphore = threadCompletionMonitor.get();
+        if (!getController().isPrimitive() && 
+                       cse != null && !cse.isSubordinate() ) {
+                       
           try {
-            //  Block until the CAS is fully processed or there is an error
-            completionSemaphore.acquire();
+                 synchronized(lock) {
+                         if ( entry.getThreadCompletionSemaphore() != null) {
+                        entry.getThreadCompletionSemaphore().acquire();
+                         }
+                 }
           } catch( InterruptedException ex) {
-          } finally {
-            //  remove ThreadLocal semaphore
-            threadCompletionMonitor.remove();
-          }
+          } 
         }
         
       } else {


Reply via email to