Author: cwiklik
Date: Tue Nov 24 19:13:33 2015
New Revision: 1716236
URL: http://svn.apache.org/viewvc?rev=1716236&view=rev
Log:
UIMA-2492 fixed performance metrics for asynchronous pipelines
Added:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
(with props)
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
Tue Nov 24 19:13:33 2015
@@ -37,6 +37,7 @@ import org.apache.uima.aae.controller.Ev
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.statistics.AEMetrics;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.aae.monitor.statistics.DelegateStats;
import org.apache.uima.cas.CAS;
@@ -67,6 +68,8 @@ public class InProcessCache implements I
private BaseAnalysisEngineController controller;
+
+
/**
Register controller to call when the cache becomes empty.
This call is made when the controller enters quiesce
@@ -614,9 +617,16 @@ public class InProcessCache implements I
// via ThreadLocal var
private Semaphore threadCompletionSemaphore;
- private Map<String,List<AnalysisEnginePerformanceMetrics>> delegateMetrics
=
- new ConcurrentHashMap<String,
List<AnalysisEnginePerformanceMetrics>>();
+// private Map<String,List<AnalysisEnginePerformanceMetrics>>
delegateMetrics =
+// new ConcurrentHashMap<String,
List<AnalysisEnginePerformanceMetrics>>();
+
+ private Map<String,AEMetrics> delegateMetrics =
+ new ConcurrentHashMap<String, AEMetrics>();
+
+ private List<AnalysisEnginePerformanceMetrics> performanceBreakdownList =
+ new ArrayList<AnalysisEnginePerformanceMetrics>();
+
public Semaphore getThreadCompletionSemaphore() {
return threadCompletionSemaphore;
}
@@ -961,36 +971,40 @@ public class InProcessCache implements I
}
public void addDelegateMetrics(String delegateKey,
List<AnalysisEnginePerformanceMetrics> metrics, boolean remote) {
-/*
- System.out.println("................ Adding metrics for
delegate:"+delegateKey+" Metrics Size:"+metrics.size()+"
CAS:"+getCasReferenceId());
- if ( remote && delegateMetrics.containsKey(delegateKey)) {
-// List<AnalysisEnginePerformanceMetrics> delegateMetrics =
-// delegateMetrics.get(delegateKey);
- List<AnalysisEnginePerformanceMetrics> currentMetrics =
- delegateMetrics.get(delegateKey);
- for( AnalysisEnginePerformanceMetrics rm : metrics) {
- for( AnalysisEnginePerformanceMetrics cm : currentMetrics ) {
- if ( cm.getUniqueName().equals(rm.getUniqueName())) {
- AnalysisEnginePerformanceMetrics apm =
- new
AnalysisEnginePerformanceMetrics(rm.getName(),rm.getUniqueName(),rm.getAnalysisTime(),cm.getNumProcessed()+rm.getNumProcessed());
- currentMetrics.remove(cm);
- currentMetrics.add(apm);
- break;
- }
- }
- }
- } else {
- delegateMetrics.put(delegateKey, metrics);
+ // Store AE performance metrics in a Map where the key
+ // is the unique name of AE. The value is AEMetrics
+ // instance which aggregates stats in AtomicLongs. The
+ // AEMetrics is used internally only. The service
+ // returns metrics to the client in AnalysisEnginePerformanceMetrics
+ // instance. The AnalysisEnginePerformanceMetrics is immutable so
+ // once populated no changes can be done. Also, this class is
+ // used by user code so its best to not change it.
+ for( AnalysisEnginePerformanceMetrics m : metrics ) {
+ AEMetrics aem;
+ // If there are multiple instances of AE (each in its own thread)
+ // performance stats are aggregated for all of them in
+ // in a single instance of AEMetrics.
+ if ( delegateMetrics.containsKey(m.getUniqueName())) {
+ aem = delegateMetrics.get(m.getUniqueName());
+ } else {
+ aem = new AEMetrics();
+ aem.setName(m.getName());
+ delegateMetrics.put(m.getUniqueName(), aem);
+ }
+ aem.incrementAnalysisTime(m.getAnalysisTime());
+ aem.incrementNumProcessed(1);
}
-*/
- delegateMetrics.put(delegateKey, metrics);
+
}
public List<AnalysisEnginePerformanceMetrics> getDelegateMetrics() {
- List<AnalysisEnginePerformanceMetrics> metrics = new
ArrayList<AnalysisEnginePerformanceMetrics>();
- for( Entry<String,List<AnalysisEnginePerformanceMetrics>> dm :
delegateMetrics.entrySet()) {
- for(AnalysisEnginePerformanceMetrics metric : dm.getValue()) {
- metrics.add(metric);
- }
+
+ List<AnalysisEnginePerformanceMetrics> metrics = new
ArrayList<AnalysisEnginePerformanceMetrics>();
+ for( Entry<String,AEMetrics> dm : delegateMetrics.entrySet()) {
+ metrics.add(
+ new
AnalysisEnginePerformanceMetrics(dm.getValue().getName(),
+ dm.getKey(),
+
dm.getValue().getAnalysisTime().get(),
+
dm.getValue().getNumProcessed().get()));
}
return metrics;
}
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
Tue Nov 24 19:13:33 2015
@@ -133,7 +133,7 @@ public abstract class BaseAnalysisEngine
protected AnalysisEngineController parentController;
- private String endpointName;
+ protected String endpointName;
protected ResourceSpecifier resourceSpecifier;
@@ -453,6 +453,7 @@ public abstract class BaseAnalysisEngine
}
}
paramsMap.put(Resource.PARAM_UIMA_CONTEXT, childContext);
+
initialize(resourceSpecifier, paramsMap);
initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize,
disableJCasCache);
if (parentController instanceof AggregateAnalysisEngineController) {
@@ -1051,7 +1052,6 @@ public abstract class BaseAnalysisEngine
key = aDelegateEndpointName;
}
}
-
if (key == null) {
throw new AsynchAEException(getName() + "-Unable to look up delegate "
+ aDelegateEndpointName + " in internal map");
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
Tue Nov 24 19:13:33 2015
@@ -21,10 +21,10 @@ package org.apache.uima.aae.controller;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@@ -32,8 +32,8 @@ import java.util.concurrent.atomic.Atomi
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.UIMAEE_Constants;
-import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.monitor.statistics.AEMetrics;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.flow.FinalStep;
import org.apache.uima.util.Level;
@@ -246,6 +246,8 @@ public class LocalCache extends Concurre
// reply from the 1st delegate.
private CountDownLatch latch=null;
+ protected HashMap<String, AEMetrics > casMetrics = new HashMap<String,
AEMetrics>();
+
public boolean waitingForChildrenToFinish() {
return waitingForChildren;
}
@@ -256,6 +258,33 @@ public class LocalCache extends Concurre
return hostIpProcessingCAS;
}
+ public void addMetrics(List<AnalysisEnginePerformanceMetrics> metrics) {
+ for( AnalysisEnginePerformanceMetrics m : metrics) {
+ AEMetrics aeMetrics;
+ if ( casMetrics.containsKey(m.getUniqueName()) ) {
+ aeMetrics = casMetrics.get(m.getUniqueName());
+ } else {
+ aeMetrics = new AEMetrics();
+ aeMetrics.setName(m.getName());
+ casMetrics.put(m.getUniqueName(), aeMetrics);
+ }
+ aeMetrics.incrementAnalysisTime(m.getAnalysisTime());
+ aeMetrics.incrementNumProcessed(m.getNumProcessed());
+ }
+ }
+ public List<AnalysisEnginePerformanceMetrics> getCASMetrics() {
+ List<AnalysisEnginePerformanceMetrics> list = new
ArrayList<AnalysisEnginePerformanceMetrics>();
+ for( Entry<String, AEMetrics> m : casMetrics.entrySet()) {
+ AnalysisEnginePerformanceMetrics metrics =
+ new AnalysisEnginePerformanceMetrics(
+ m.getValue().getName(),
+ m.getKey(),
+
m.getValue().getAnalysisTime().get(),
+
m.getValue().getNumProcessed().get() );
+ list.add(metrics);
+ }
+ return list;
+ }
public void setHostIpProcessingCAS(String hostIpProcessingCAS) {
this.hostIpProcessingCAS = hostIpProcessingCAS;
}
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
Tue Nov 24 19:13:33 2015
@@ -19,13 +19,17 @@
package org.apache.uima.aae.controller;
-import java.lang.reflect.Field;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
@@ -33,12 +37,15 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.Semaphore;
import org.apache.uima.UIMAFramework;
+import org.apache.uima.UimaContext;
+import org.apache.uima.UimaContextAdmin;
import org.apache.uima.aae.AsynchAECasManager;
import org.apache.uima.aae.InProcessCache;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaClassFactory;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.ErrorHandler;
@@ -55,23 +62,19 @@ import org.apache.uima.analysis_engine.A
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.analysis_engine.AnalysisEngineManagement;
import org.apache.uima.analysis_engine.CasIterator;
-import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.CASImpl;
-import org.apache.uima.cas.impl.OutOfTypeSystemData;
import org.apache.uima.collection.CollectionReaderDescription;
+import org.apache.uima.impl.RootUimaContext_impl;
+import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceInitializationException;
-import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceSpecifier;
import org.apache.uima.resource.metadata.ConfigurationParameter;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
import org.apache.uima.util.Level;
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.io.xml.DomDriver;
-
public class PrimitiveAnalysisEngineController_impl extends
BaseAnalysisEngineController implements
PrimitiveAnalysisEngineController {
private static final Class CLASS_NAME =
PrimitiveAnalysisEngineController_impl.class;
@@ -97,7 +100,6 @@ public class PrimitiveAnalysisEngineCont
// meaning each thread executes the same AE instance.
protected AnalysisEngineInstancePool aeInstancePool = null;
- private String abortedCASReferenceId = null;
// Create a shared semaphore to serialize creation of AE instances.
// There is a single instance of this semaphore per JVM and it
// guards uima core code that is not thread safe.
@@ -105,7 +107,6 @@ public class PrimitiveAnalysisEngineCont
static private Object threadDumpMonitor = new Object();
static private Long lastDump = Long.valueOf(0);
- private XStream xstream = new XStream(new DomDriver());
// 6 args
public PrimitiveAnalysisEngineController_impl(String anEndpointName,
@@ -186,7 +187,40 @@ public class PrimitiveAnalysisEngineCont
public int getAEInstanceCount() {
return analysisEnginePoolSize;
}
-
+ public static Object copy(Object orig) {
+ Object obj = null;
+ try {
+ // Write the object out to a byte array
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos);
+ out.writeObject(orig);
+ out.flush();
+ out.close();
+
+ // Make an input stream from the byte array and read
+ // a copy of the object back in.
+ ObjectInputStream in = new ObjectInputStream(
+ new ByteArrayInputStream(bos.toByteArray()));
+ obj = in.readObject();
+ }
+ catch(IOException e) {
+ e.printStackTrace();
+ }
+ catch(ClassNotFoundException cnfe) {
+ cnfe.printStackTrace();
+ }
+ return obj;
+ }
+ public void dumpContext(UimaContextAdmin ctx) {
+ if ( !(ctx instanceof RootUimaContext_impl) ) {
+ dumpContext(ctx.getRootContext());
+ } else {
+ Map<String, AnalysisEngineManagement> m =
ctx.getManagementInterface().getComponents();
+ for( Entry<String, AnalysisEngineManagement> e :
m.entrySet()) {
+ System.out.println(">>>>>>>> "+e.getKey()+"
"+e.getValue().getUniqueMBeanName());
+ }
+ }
+ }
public void initializeAnalysisEngine() throws
ResourceInitializationException {
ResourceSpecifier rSpecifier = null;
@@ -198,27 +232,38 @@ public class PrimitiveAnalysisEngineCont
sharedInitSemaphore.acquire();
// Parse the descriptor in the calling thread.
rSpecifier =
UimaClassFactory.produceResourceSpecifier(super.aeDescriptor);
-/*
- if ( rSpecifier instanceof AnalysisEngineDescription ) {
- String name =
((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().getName();
-
((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().setName(name+"-"+Thread.currentThread().getId());
- // System.out.println(getUimaContextAdmin().);
- Field f
=getUimaContextAdmin().getClass().getDeclaredField("mQualifiedContextName");//getQualifiedContextName()
- f.setAccessible(true);
-
-
f.get(getUimaContextAdmin().getQualifiedContextName()+Thread.currentThread().getId());
- }
- */
- //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX,
String.valueOf(Thread.currentThread().getId()));
- //String p =
(String)paramsMap.get(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);//
+"-"+Thread.currentThread().getId();
- //p = p.substring(0, p.lastIndexOf(","))+"
"+Thread.currentThread().getId()+",";
- //paramsMap.remove(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);
- //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, p);
+ UimaContextAdmin uctx = null;
+ if ( parentController != null ) {
+ int scaleout = 1;
+ if (parentController instanceof AggregateAnalysisEngineController) {
+ String key = ((AggregateAnalysisEngineController)
parentController)
+ .lookUpDelegateKey(endpointName);
+ if (key == null) {
+ if (((AggregateAnalysisEngineController)
parentController).isDelegateKeyValid(endpointName)) {
+ key = endpointName;
+ }
+ }
+ if (key == null) {
+ throw new AsynchAEException(getName() + "-Unable to look up
delegate "
+ + endpointName + " in internal map");
+ }
+ Delegate d = ((AggregateAnalysisEngineController)
parentController)
+ .lookupDelegate(key);
+ scaleout = d.getEndpoint().getConcurrentRequestConsumers();
+ }
+ if ( scaleout > 1) {
+ uctx =
(UimaContextAdmin)parentController.getChildUimaContext(endpointName);
+ if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ dumpContext(uctx);
+ }
+ paramsMap.remove(Resource.PARAM_UIMA_CONTEXT);
+ paramsMap.put(Resource.PARAM_UIMA_CONTEXT, uctx);
+ } else {
+ uctx =
(UimaContextAdmin)paramsMap.get(Resource.PARAM_UIMA_CONTEXT);
+ }
+ }
AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier,
paramsMap);
- //AnalysisEngineManagementImpl aemi =
(AnalysisEngineManagementImpl)ae.getManagementInterface();
- //System.out.println("..... Created AE instance - Mgmt Instance
Hashcode:"+aemi.hashCode()+" Unique MBean Name:"+aemi.getUniqueMBeanName());
- // ae.getManagementInterface().getClass().
// Call to produceAnalysisEngine() may take a long time to complete.
While this
// method was executing, the service may have been stopped. Before
continuing
// check if the service has been stopped. If so, destroy AE instance
and return.
@@ -578,10 +623,7 @@ public class PrimitiveAnalysisEngineCont
* @param uimaFullyQualifiedAEContext
*/
private void getLeafManagementObjects(AnalysisEngineManagement aem,
List<AnalysisEnginePerformanceMetrics> result, String
uimaFullyQualifiedAEContext) {
-
- if (aem.getComponents().isEmpty()) {
- // skip Flow Controller
- if (!aem.getName().equals("Fixed Flow Controller")) {
+ if (aem.getComponents().isEmpty() && Thread.currentThread().getId() ==
aem.getThreadId()) {
// is this primitive AE delegate in an aggregate. If so the mbean
unique name will have "p0=" string. An examples mbean
// name looks like this:
// org.apache.uima:type=ee.jms.services,s=Top Level Aggregate TAE Uima
EE Service,p0=Top Level Aggregate TAE Components,p1=SecondLevelAggregateCM
Components,p2=ThirdLevelAggregateCM Components,name=Multiplier1
@@ -602,11 +644,11 @@ public class PrimitiveAnalysisEngineCont
}
}
}
- result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
- }
+ AnalysisEnginePerformanceMetrics m = deepCopyMetrics(aem,
uimaFullyQualifiedAEContext);
+ result.add(m);
} else {
for (AnalysisEngineManagement child :
(Iterable<AnalysisEngineManagement>) aem.getComponents().values()) {
- getLeafManagementObjects(child, result, produceUniqueName(aem));
+ getLeafManagementObjects(child, result, produceUniqueName(aem));
}
}
}
@@ -628,7 +670,6 @@ public class PrimitiveAnalysisEngineCont
}
private String produceUniqueName(AnalysisEngineManagement aem) {
-// System.out.println(">>>>>>>>>>>>>>>>>>>
Thread:"+Thread.currentThread().getId()+" MBean:"+aem.getUniqueMBeanName());
String[] parts = aem.getUniqueMBeanName().split(",");
StringBuffer sb = new StringBuffer();
for( String part : parts) {
@@ -652,8 +693,6 @@ public class PrimitiveAnalysisEngineCont
sb.append("/").append(part.substring(part.trim().indexOf("=")+1));
}
}
- // System.out.println("<<<<<<<<<<<<<<<<<<<
Thread:"+Thread.currentThread().getId()+" MBean:"+sb.toString());
-
return sb.toString();
}
@@ -699,6 +738,7 @@ public class PrimitiveAnalysisEngineCont
uimaFullyQualifiedAEContext,
aem.getAnalysisTime(),
aem.getNumberOfCASesProcessed());
+
}
/**
@@ -762,7 +802,6 @@ public class PrimitiveAnalysisEngineCont
AnalysisEngineManagement rootAem = ae.getManagementInterface();
- //System.out.println("%%%%%%%%%%%%%%%%%%%% Unique MBean
Name:"+rootAem.getUniqueMBeanName()+" AE Instance Hashcode"+ae.hashCode());
if ( rootAem.getComponents().size() > 0 ) {
getLeafManagementObjects(rootAem, beforeAnalysisManagementObjects);
} else {
@@ -1056,7 +1095,6 @@ public class PrimitiveAnalysisEngineCont
// Set total number of children generated from this CAS
// Store total time spent processing this input CAS
getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
-
// Fetch AE's management information that includes per component
performance stats
// These stats are internally maintained in a Map. If the AE is an
aggregate
// the Map will contain AnalysisEngineManagement instance for each AE.
@@ -1066,16 +1104,11 @@ public class PrimitiveAnalysisEngineCont
// primitive AE's AnalysisEngineManagement instance and placing it
in
// afterAnalysisManagementObjects List.
getLeafManagementObjects(aem, afterAnalysisManagementObjects);
- //
System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
- //System.out.println("-----------------Simple1:"+aem.getName());
} else {
String path=produceUniqueName(aem);
- //
System.out.println("-----------------Unique2:"+aem.getUniqueMBeanName());
- // System.out.println("-----------------Simple2:"+aem.getName());
afterAnalysisManagementObjects.add(deepCopyMetrics(aem, path));
}
-
// Create a List to hold per CAS analysisTime and total number of CASes
processed
// by each AE. This list will be serialized and sent to the client
List<AnalysisEnginePerformanceMetrics> performanceList =
@@ -1084,23 +1117,43 @@ public class PrimitiveAnalysisEngineCont
// metrics
for (AnalysisEnginePerformanceMetrics after :
afterAnalysisManagementObjects) {
for( AnalysisEnginePerformanceMetrics before:
beforeAnalysisManagementObjects) {
- if ( before.getUniqueName().equals(after.getUniqueName())) {
-
- AnalysisEnginePerformanceMetrics metrics =
- new AnalysisEnginePerformanceMetrics(after.getName(),
- after.getUniqueName(),
- after.getAnalysisTime()- before.getAnalysisTime(),
- after.getNumProcessed());
- //
System.out.println("********************"+metrics.getUniqueName()+" Analysis
Time:"+metrics.getAnalysisTime());
- //System.out.println("********************"+metrics.getName()+"
Analysis Time:"+metrics.getAnalysisTime());
+ if ( before.getUniqueName().equals(after.getUniqueName())) {
+ boolean found = false;
+ AnalysisEnginePerformanceMetrics metrics = null;
+ for( AnalysisEnginePerformanceMetrics met :
parentCasStateEntry.getAEPerformanceList() ) {
+ String un = after.getUniqueName();
+ if ( un.indexOf("Components") >= -1 ) {
+ un = un.substring(un.indexOf("/"));
+ }
+ if ( met.getUniqueName().equals(un)) {
+ long at = after.getAnalysisTime()-
before.getAnalysisTime();
+ metrics = new
AnalysisEnginePerformanceMetrics(after.getName(),
+ un,//after.getUniqueName(),
+ met.getAnalysisTime()+at,
+ after.getNumProcessed());
+ found = true;
+ parentCasStateEntry.getAEPerformanceList().remove(met);
+ break;
+ }
+ }
+ if ( !found ) {
+ String un = after.getUniqueName();
+
+ if ( un.indexOf("Components") >= -1 ) {
+ un = un.substring(un.indexOf("/"));
+ }
+ metrics = new
AnalysisEnginePerformanceMetrics(after.getName(),
+ un,//after.getUniqueName(),
+ after.getAnalysisTime()- before.getAnalysisTime(),
+ after.getNumProcessed());
+
+ }
performanceList.add(metrics);
break;
}
}
}
- // Save this component performance metrics
parentCasStateEntry.getAEPerformanceList().addAll(performanceList);
-
if (!anEndpoint.isRemote()) {
inputCASReturned = true;
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
@@ -1112,7 +1165,6 @@ public class PrimitiveAnalysisEngineCont
getInProcessCache().
getTopAncestorCasEntry(getInProcessCache().getCacheEntryForCAS(aCasReferenceId));
if ( ancestor != null ) {
- // Set a flag on the input CAS to indicate that the processing
was aborted
ancestor.addDelegateMetrics(getKey(), performanceList);
}
} catch (Exception e) {
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
(original)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
Tue Nov 24 19:13:33 2015
@@ -196,27 +196,51 @@ public class ProcessResponseHandler exte
// during deserialization
CacheEntry cacheEntry =
getController().getInProcessCache().getCacheEntryForCAS(
casReferenceId);
+ // check if the client requested Performance Metrics for the CAS
if (
aMessageContext.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
try {
+ // find top ancestor of this CAS. All metrics are accumulated there
since
+ // this is what will be returned to the client
CacheEntry ancestor =
getController().
getInProcessCache().
getTopAncestorCasEntry(cacheEntry);
if ( ancestor != null ) {
+ // fetch Performance Metrics from remote delegate reply
List<AnalysisEnginePerformanceMetrics> metrics =
UimaSerializer.deserializePerformanceMetrics(aMessageContext.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
-
List<AnalysisEnginePerformanceMetrics> adjustedMetrics =
new ArrayList<AnalysisEnginePerformanceMetrics>();
for(AnalysisEnginePerformanceMetrics delegateMetric : metrics ) {
- String tmp =
-
delegateMetric.getUniqueName().substring(delegateMetric.getUniqueName().indexOf(","));
- String adjustedUniqueName =
- ((AggregateAnalysisEngineController)
getController()).getJMXDomain()+((AggregateAnalysisEngineController)
getController()).getJmxContext()+tmp;
- AnalysisEnginePerformanceMetrics metric =
- new
AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
+ String adjustedUniqueName = ((AggregateAnalysisEngineController)
getController()).getJmxContext();
+
+ if ( adjustedUniqueName.startsWith("p0=")) {
+ adjustedUniqueName = adjustedUniqueName.substring(3); //
skip p0=
+ }
+ adjustedUniqueName = adjustedUniqueName.replaceAll("
Components", "");
+ if (!adjustedUniqueName.startsWith("/")) {
+ adjustedUniqueName = "/"+adjustedUniqueName;
+ }
+ adjustedUniqueName += delegateMetric.getUniqueName();
+
+ boolean found = false;
+ AnalysisEnginePerformanceMetrics metric = null;
+ for( AnalysisEnginePerformanceMetrics met :
ancestor.getDelegateMetrics() ) {
+ if ( met.getUniqueName().equals(adjustedUniqueName)) {
+ long at = delegateMetric.getAnalysisTime();
+ long count = delegateMetric.getNumProcessed();
+ metric = new
AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,at,count);
+ found = true;
+ ancestor.getDelegateMetrics().remove(met);
+ break;
+ }
+ }
+ if ( !found ) {
+ metric = new
AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
+ }
adjustedMetrics.add(metric);
}
+
ancestor.addDelegateMetrics(delegateKey, adjustedMetrics, true);
// true=remote
}
} catch (Exception e) {
Added:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java?rev=1716236&view=auto
==============================================================================
---
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
(added)
+++
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
Tue Nov 24 19:13:33 2015
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.monitor.statistics;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AEMetrics {
+ private String name;
+ private AtomicLong analysisTime = new AtomicLong();
+ private AtomicLong numProcessed = new AtomicLong();
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public AtomicLong getAnalysisTime() {
+ return analysisTime;
+ }
+ public void incrementAnalysisTime(long analysisTime) {
+ this.analysisTime.addAndGet( analysisTime );
+ }
+ public AtomicLong getNumProcessed() {
+ return numProcessed;
+ }
+ public void incrementNumProcessed(long numProcessed) {
+ this.numProcessed.addAndGet(numProcessed);
+ }
+}
Propchange:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
------------------------------------------------------------------------------
svn:mime-type = text/plain