Author: cwiklik
Date: Wed Aug 14 20:54:08 2013
New Revision: 1514040
URL: http://svn.apache.org/r1514040
Log:
UIMA-3160 added support for concurrent CAS processing in the async aggregate
Added:
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
(original)
+++
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
Wed Aug 14 20:54:08 2013
@@ -723,7 +723,7 @@ public abstract class BaseTestSupport ex
System.out.println("runTest: Incrementing ProcessTimeout
Counter");
timeoutCounter++;
}
- } else if (engine != null && e instanceof UimaASPingTimeout) {
+ } else if (engine != null && (e instanceof UimaASPingTimeout ||
(e.getCause() != null && e.getCause() instanceof UimaASPingTimeout) )) {
System.out.println("runTest: Ping Timeout - service Not Responding
To Ping");
if (cpcLatch != null) {
cpcLatch.countDown();
Added:
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml?rev=1514040&view=auto
==============================================================================
---
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
(added)
+++
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateNoOpWithLongDelay.xml
Wed Aug 14 20:54:08 2013
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+ <!--
+ ***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ ***************************************************************
+ -->
+
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier">
+ <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+ <primitive>false</primitive>
+ <delegateAnalysisEngineSpecifiers>
+
+ <delegateAnalysisEngine key="TestMultiplier">
+ <import location="../multiplier/SimpleCasGenerator.xml"/>
+ </delegateAnalysisEngine>
+
+
+ <delegateAnalysisEngine key="NoOp">
+ <import location="NoOpAnnotatorWithLongDelay.xml"/>
+ </delegateAnalysisEngine>
+
+ </delegateAnalysisEngineSpecifiers>
+ <analysisEngineMetaData>
+ <name>Test Aggregate TAE</name>
+ <description>Detects Nothing</description>
+ <configurationParameters/>
+ <configurationParameterSettings/>
+ <flowConstraints>
+ <fixedFlow>
+
+ <node>TestMultiplier</node>
+ <node>NoOp</node>
+ </fixedFlow>
+ </flowConstraints>
+ <capabilities>
+ <capability>
+ <inputs/>
+ <outputs>
+ </outputs>
+ <languagesSupported>
+ <language>en</language>
+ </languagesSupported>
+ </capability>
+ </capabilities>
+ <operationalProperties>
+ <modifiesCas>true</modifiesCas>
+ <multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+ <outputsNewCASes>false</outputsNewCASes>
+ </operationalProperties>
+ </analysisEngineMetaData>
+</analysisEngineDescription>
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
Wed Aug 14 20:54:08 2013
@@ -132,4 +132,5 @@ public interface AggregateAnalysisEngine
public void changeCollocatedDelegateState( String delegateKey, ServiceState
state ) throws Exception;
+ public int getServiceCasPoolSize();
}
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Wed Aug 14 20:54:08 2013
@@ -37,6 +37,7 @@ import org.apache.uima.UIMARuntimeExcept
import org.apache.uima.aae.AsynchAECasManager;
import org.apache.uima.aae.InProcessCache;
import org.apache.uima.aae.InProcessCache.CacheEntry;
+import org.apache.uima.aae.AsynchAECasManager_impl;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaClassFactory;
@@ -160,7 +161,19 @@ public class AggregateAnalysisEngineCont
private HashMap<String, String> casLogDirMap; // = new HashMap<String,
String>();
// Base time used for default XmiCas file names.
private Long initializationTime = System.nanoTime();
-
+
+ // Used by async aggregate to throttle CAS ingestion from an input queue.
The
+ // semaphore is lazily created on the first Process request and is assigned
+ // the number of permits equal to the size of CasPool configured for this
+ // service. A permit is acquired right after process() is called which means
+ // that the CAS has been handed off to another async delegate. The permit is
+ // released when the CAS is fully processed and reply sent to a client. This
+ // allows the async aggregate to concurrently process as many CASes as there
+ // are CASes in a CasPool. It also ensures that this aggregate doesnt try
+ // to ingest more CASes as it is currently capable of processing without
waiting
+ // for a free CAS from the CasPool.
+ public Semaphore semaphore = null;
+
/**
*
* @param anEndpointName
@@ -3136,7 +3149,17 @@ public class AggregateAnalysisEngineCont
}
public void stop() {
- super.stop(true); // shutdown now
+ super.stop(true); // shutdown now
+
+ // release all permits
+ if ( semaphore != null ) {
+ while ( semaphore.availablePermits() > 0) {
+ semaphore.release();
+ }
+ }
+
+
+
this.cleanUp();
// dont kill jUnit tests
if (isTopLevelComponent() && System.getProperty("dontKill") == null) {
@@ -3254,4 +3277,7 @@ public class AggregateAnalysisEngineCont
enableCasLogMap.put(key, false);
initializationTime = System.nanoTime();
}
+ public int getServiceCasPoolSize() {
+ return ((AsynchAECasManager_impl)casManager).getCasPoolSize();
+ }
}
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
Wed Aug 14 20:54:08 2013
@@ -122,7 +122,7 @@ public abstract class BaseAnalysisEngine
private OutputChannel outputChannel;
- private AsynchAECasManager casManager;
+ protected AsynchAECasManager casManager;
private InProcessCache inProcessCache;
@@ -940,8 +940,7 @@ public abstract class BaseAnalysisEngine
new Object[] { getComponentName(),
getUimaContextAdmin().getQualifiedContextName(),
aComponentCasPoolSize, anInitialCasHeapSize / 4 });
}
- }
-
+ }
}
public boolean isTopLevelComponent() {
@@ -1403,12 +1402,11 @@ public abstract class BaseAnalysisEngine
if ( entry != null ) {
CAS cas = inProcessCache.getCasByReference(aCasReferenceId);
if (deleteCacheEntry) {
- // Release semaphore that is shared with a thread that received
the CAS
- // to unlock the thread. This thread is blocking to prevent it
from
- // receiving another CAS.
- Semaphore threadLocalSemaphore=null;
- if ( !isPrimitive() && (threadLocalSemaphore =
entry.getThreadCompletionSemaphore()) != null ) {
- threadLocalSemaphore.release();
+ // Release semaphore which throttles ingestion of CASes from
service
+ // input queue.
+ Semaphore semaphore=null;
+ if ( !isPrimitive() && (semaphore =
entry.getThreadCompletionSemaphore()) != null ) {
+ semaphore.release();
}
inProcessCache.remove(aCasReferenceId);
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1514040&r1=1514039&r2=1514040&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
Wed Aug 14 20:54:08 2013
@@ -65,6 +65,12 @@ public class ProcessRequestHandler_impl
private Object mux = new Object();
+ // controlls access to Aggregates semaphore which
+ // throttles ingestion of CASes from service input queue
+ private Object lock = new Object();
+
+
+
public ProcessRequestHandler_impl(String aName) {
super(aName);
}
@@ -251,23 +257,32 @@ public class ProcessRequestHandler_impl
casReferenceId, marker, acceptsDeltaCas);
/*
- * In UIMA AS Aggregate the receiving thread must be blocked until a CAS
is fully
- * processed. This is to prevent the receiving thread from grabbing
another CAS
- * breaking prefetch throttling. The receiving thread takes a CAS from
service queue,
- * deserializes CAS, asks the FC for the next step and enqueues the CAS
- * onto delegate's queue. Once the enqueue completes, the thread is done
- * and ready to get more CASes from the service queue. The receiving
must
- * therefor be blocked right after it enqueues the CAS on delegates
queue.
- * To that end, while handling a new CAS, create a shared semaphore and
- * associate it with a current thread as ThreadLocal variable. Also,
associate the
- * same semaphore with a CAS so that when the CAS is sent back to the
client the
- * the receiving thread is unblocked.
+ Lazily create a Semaphore that will be used to throttle CAS
ingestion from service
+ input queue. This only applies to async aggregates. The
semaphore is initialized
+ with the number of permits equal to the service CasPool size.
The idea is that this
+ service should only ingest as many CASes as it is capable of
processing without
+ waiting for a free instance of CAS from the service CasPool.
*/
- if ( !getController().isPrimitive() ) {
- Semaphore semaphore = new Semaphore(0);
- // threadCompletionMonitor is a ThreadLocal var
- threadCompletionMonitor.set(semaphore);
- entry.setThreadCompletionSemaphore(semaphore);
+ boolean inputCAS = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.InputCasReference) ==
null ? true : false;
+ if ( !getController().isPrimitive() && inputCAS) {
+
+ try {
+ synchronized(lock) {
+ // lazily create a Semaphore on the first Process
request. This semaphore
+ // will throttle ingestion of CASes from service
input queue.
+ if (((AggregateAnalysisEngineController_impl)
getController()).semaphore == null) {
+ ((AggregateAnalysisEngineController_impl)
getController()).semaphore =
+ new Semaphore(
+
((AggregateAnalysisEngineController) getController())
+ .getServiceCasPoolSize()-1);
+ // semaphore.acquire();
+ }
+ }
+ } catch( Exception e) {
+ throw e;
+ }
+
entry.setThreadCompletionSemaphore(((AggregateAnalysisEngineController_impl)
getController()).semaphore);
}
long timeToDeserializeCAS = getController().getCpuTime() - t1;
getController().incrementDeserializationTime(timeToDeserializeCAS);
@@ -473,8 +488,9 @@ public class ProcessRequestHandler_impl
// its queue possibly from the same client. Only the first message for
any given
// CasReferenceId
// should be processed.
+ CasStateEntry cse = null;
if (!getController().getInProcessCache().entryExists(casReferenceId)) {
- CasStateEntry cse = null;
+
if (getController().getLocalCache().lookupEntry(casReferenceId) ==
null) {
// Create a new entry in the local cache for the CAS received from
the remote
cse =
getController().getLocalCache().createCasStateEntry(casReferenceId);
@@ -533,16 +549,17 @@ public class ProcessRequestHandler_impl
* semaphore to block the thread. It will be unblocked when the
aggregate is done with
* the CAS.
*/
- if (!getController().isPrimitive() ) {
- Semaphore completionSemaphore = threadCompletionMonitor.get();
+ if (!getController().isPrimitive() &&
+ cse != null && !cse.isSubordinate() ) {
+
try {
- // Block until the CAS is fully processed or there is an error
- completionSemaphore.acquire();
+ synchronized(lock) {
+ if ( entry.getThreadCompletionSemaphore() != null) {
+ entry.getThreadCompletionSemaphore().acquire();
+ }
+ }
} catch( InterruptedException ex) {
- } finally {
- // remove ThreadLocal semaphore
- threadCompletionMonitor.remove();
- }
+ }
}
} else {