Author: cwiklik
Date: Tue Nov 24 19:13:33 2015
New Revision: 1716236

URL: http://svn.apache.org/viewvc?rev=1716236&view=rev
Log:
UIMA-2492 fixed performance metrics for asynchronous pipelines

Added:
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
   (with props)
Modified:
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
 Tue Nov 24 19:13:33 2015
@@ -37,6 +37,7 @@ import org.apache.uima.aae.controller.Ev
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.statistics.AEMetrics;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.aae.monitor.statistics.DelegateStats;
 import org.apache.uima.cas.CAS;
@@ -67,6 +68,8 @@ public class InProcessCache implements I
 
   private BaseAnalysisEngineController controller;
   
+  
+  
   /**
          Register controller to call when the cache becomes empty.
     This call is made when the controller enters quiesce
@@ -614,9 +617,16 @@ public class InProcessCache implements I
                //  via ThreadLocal var
     private Semaphore threadCompletionSemaphore;
     
-    private Map<String,List<AnalysisEnginePerformanceMetrics>> delegateMetrics 
=
-            new ConcurrentHashMap<String, 
List<AnalysisEnginePerformanceMetrics>>();
+//    private Map<String,List<AnalysisEnginePerformanceMetrics>> 
delegateMetrics =
+//            new ConcurrentHashMap<String, 
List<AnalysisEnginePerformanceMetrics>>();
+
+    private Map<String,AEMetrics> delegateMetrics =
+            new ConcurrentHashMap<String, AEMetrics>();
+
     
+    private List<AnalysisEnginePerformanceMetrics> performanceBreakdownList = 
+                 new ArrayList<AnalysisEnginePerformanceMetrics>();
+
     public Semaphore getThreadCompletionSemaphore() {
       return threadCompletionSemaphore;
     }
@@ -961,36 +971,40 @@ public class InProcessCache implements I
     }
 
     public void addDelegateMetrics(String delegateKey, 
List<AnalysisEnginePerformanceMetrics> metrics, boolean remote) {
-/*
-      System.out.println("................ Adding metrics for 
delegate:"+delegateKey+" Metrics Size:"+metrics.size()+" 
CAS:"+getCasReferenceId());
-      if ( remote && delegateMetrics.containsKey(delegateKey)) {
-//        List<AnalysisEnginePerformanceMetrics> delegateMetrics = 
-//                delegateMetrics.get(delegateKey);
-        List<AnalysisEnginePerformanceMetrics> currentMetrics = 
-            delegateMetrics.get(delegateKey);
-        for( AnalysisEnginePerformanceMetrics rm : metrics) {
-          for( AnalysisEnginePerformanceMetrics cm : currentMetrics ) {
-            if ( cm.getUniqueName().equals(rm.getUniqueName())) {
-              AnalysisEnginePerformanceMetrics apm = 
-                      new 
AnalysisEnginePerformanceMetrics(rm.getName(),rm.getUniqueName(),rm.getAnalysisTime(),cm.getNumProcessed()+rm.getNumProcessed());
-              currentMetrics.remove(cm);
-              currentMetrics.add(apm);
-              break;
-            }
-          }
-        }
-      } else {
-        delegateMetrics.put(delegateKey, metrics);
+      // Store AE performance metrics in a Map where the key
+      // is the unique name of AE. The value is AEMetrics
+      // instance which aggregates stats in AtomicLongs. The
+      // AEMetrics is used internally only. The service 
+      // returns metrics to the client in AnalysisEnginePerformanceMetrics
+      // instance. The AnalysisEnginePerformanceMetrics is immutable so
+      // once populated no changes can be done. Also, this class is 
+      // used by user code so its best to not change it.
+      for( AnalysisEnginePerformanceMetrics m : metrics ) {
+         AEMetrics aem;
+         // If there are multiple instances of AE (each in its own thread)
+         // performance stats are aggregated for all of them in 
+         // in a single instance of AEMetrics.
+         if ( delegateMetrics.containsKey(m.getUniqueName())) {
+                 aem = delegateMetrics.get(m.getUniqueName());
+         } else {
+                 aem = new AEMetrics();
+                 aem.setName(m.getName());
+                 delegateMetrics.put(m.getUniqueName(), aem);
+         }
+         aem.incrementAnalysisTime(m.getAnalysisTime());
+         aem.incrementNumProcessed(1);
       }
-*/
-      delegateMetrics.put(delegateKey, metrics);
+      
     }
     public List<AnalysisEnginePerformanceMetrics> getDelegateMetrics() {
-      List<AnalysisEnginePerformanceMetrics> metrics = new 
ArrayList<AnalysisEnginePerformanceMetrics>();
-      for( Entry<String,List<AnalysisEnginePerformanceMetrics>> dm : 
delegateMetrics.entrySet()) {
-        for(AnalysisEnginePerformanceMetrics metric : dm.getValue()) {
-          metrics.add(metric);
-        }
+  
+       List<AnalysisEnginePerformanceMetrics> metrics = new 
ArrayList<AnalysisEnginePerformanceMetrics>();
+      for( Entry<String,AEMetrics> dm : delegateMetrics.entrySet()) {
+         metrics.add(
+                         new 
AnalysisEnginePerformanceMetrics(dm.getValue().getName(),
+                                         dm.getKey(), 
+                                         
dm.getValue().getAnalysisTime().get(), 
+                                         
dm.getValue().getNumProcessed().get()));
       }
       return metrics;
     }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 Tue Nov 24 19:13:33 2015
@@ -133,7 +133,7 @@ public abstract class BaseAnalysisEngine
 
   protected AnalysisEngineController parentController;
 
-  private String endpointName;
+  protected String endpointName;
 
   protected ResourceSpecifier resourceSpecifier;
 
@@ -453,6 +453,7 @@ public abstract class BaseAnalysisEngine
         }
       }
       paramsMap.put(Resource.PARAM_UIMA_CONTEXT, childContext);
+
       initialize(resourceSpecifier, paramsMap);
       initializeComponentCasPool(aComponentCasPoolSize, anInitialCasHeapSize, 
disableJCasCache);
       if (parentController instanceof AggregateAnalysisEngineController) {
@@ -1051,7 +1052,6 @@ public abstract class BaseAnalysisEngine
           key = aDelegateEndpointName;
         }
       }
-
       if (key == null) {
         throw new AsynchAEException(getName() + "-Unable to look up delegate "
                 + aDelegateEndpointName + " in internal map");

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
 Tue Nov 24 19:13:33 2015
@@ -21,10 +21,10 @@ package org.apache.uima.aae.controller;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
@@ -32,8 +32,8 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
-import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.delegate.Delegate;
+import org.apache.uima.aae.monitor.statistics.AEMetrics;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.flow.FinalStep;
 import org.apache.uima.util.Level;
@@ -246,6 +246,8 @@ public class LocalCache extends Concurre
     // reply from the 1st delegate.
     private CountDownLatch latch=null;
     
+    protected HashMap<String, AEMetrics > casMetrics = new HashMap<String, 
AEMetrics>();
+    
     public boolean waitingForChildrenToFinish() {
        return waitingForChildren;
     }
@@ -256,6 +258,33 @@ public class LocalCache extends Concurre
       return hostIpProcessingCAS;
     }
 
+    public void addMetrics(List<AnalysisEnginePerformanceMetrics> metrics) {
+       for( AnalysisEnginePerformanceMetrics m : metrics) {
+               AEMetrics aeMetrics;
+               if ( casMetrics.containsKey(m.getUniqueName()) ) {
+                       aeMetrics = casMetrics.get(m.getUniqueName());
+               } else {
+                       aeMetrics = new AEMetrics();
+                       aeMetrics.setName(m.getName());
+                       casMetrics.put(m.getUniqueName(), aeMetrics);
+               }
+                       aeMetrics.incrementAnalysisTime(m.getAnalysisTime());
+                       aeMetrics.incrementNumProcessed(m.getNumProcessed());
+       }
+    }
+    public List<AnalysisEnginePerformanceMetrics> getCASMetrics() {
+       List<AnalysisEnginePerformanceMetrics> list = new 
ArrayList<AnalysisEnginePerformanceMetrics>();
+       for( Entry<String, AEMetrics> m : casMetrics.entrySet()) {
+               AnalysisEnginePerformanceMetrics metrics = 
+                               new AnalysisEnginePerformanceMetrics(
+                                               m.getValue().getName(),
+                                               m.getKey(),
+                                               
m.getValue().getAnalysisTime().get(),
+                                               
m.getValue().getNumProcessed().get() );
+               list.add(metrics);
+       }
+       return list;
+    }
     public void setHostIpProcessingCAS(String hostIpProcessingCAS) {
       this.hostIpProcessingCAS = hostIpProcessingCAS;
     }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
 Tue Nov 24 19:13:33 2015
@@ -19,13 +19,17 @@
 
 package org.apache.uima.aae.controller;
 
-import java.lang.reflect.Field;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -33,12 +37,15 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.Semaphore;
 
 import org.apache.uima.UIMAFramework;
+import org.apache.uima.UimaContext;
+import org.apache.uima.UimaContextAdmin;
 import org.apache.uima.aae.AsynchAECasManager;
 import org.apache.uima.aae.InProcessCache;
 import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
+import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.ErrorContext;
 import org.apache.uima.aae.error.ErrorHandler;
@@ -55,23 +62,19 @@ import org.apache.uima.analysis_engine.A
 import org.apache.uima.analysis_engine.AnalysisEngineDescription;
 import org.apache.uima.analysis_engine.AnalysisEngineManagement;
 import org.apache.uima.analysis_engine.CasIterator;
-import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.CASImpl;
-import org.apache.uima.cas.impl.OutOfTypeSystemData;
 import org.apache.uima.collection.CollectionReaderDescription;
+import org.apache.uima.impl.RootUimaContext_impl;
+import org.apache.uima.resource.Resource;
 import org.apache.uima.resource.ResourceInitializationException;
-import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resource.metadata.ConfigurationParameter;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
 import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
 import org.apache.uima.util.Level;
 
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.io.xml.DomDriver;
-
 public class PrimitiveAnalysisEngineController_impl extends 
BaseAnalysisEngineController implements
         PrimitiveAnalysisEngineController {
   private static final Class CLASS_NAME = 
PrimitiveAnalysisEngineController_impl.class;
@@ -97,7 +100,6 @@ public class PrimitiveAnalysisEngineCont
   // meaning each thread executes the same AE instance.
   protected AnalysisEngineInstancePool aeInstancePool = null;
 
-  private String abortedCASReferenceId = null;
   // Create a shared semaphore to serialize creation of AE instances.
   // There is a single instance of this semaphore per JVM and it
   // guards uima core code that is not thread safe.
@@ -105,7 +107,6 @@ public class PrimitiveAnalysisEngineCont
 
   static private Object threadDumpMonitor = new Object();
   static private Long lastDump = Long.valueOf(0);
-  private XStream xstream = new XStream(new DomDriver());
 
   // 6 args
   public PrimitiveAnalysisEngineController_impl(String anEndpointName,
@@ -186,7 +187,40 @@ public class PrimitiveAnalysisEngineCont
   public int getAEInstanceCount() {
     return analysisEnginePoolSize;
   }
-
+  public static Object copy(Object orig) {
+      Object obj = null;
+      try {
+          // Write the object out to a byte array
+          ByteArrayOutputStream bos = new ByteArrayOutputStream();
+          ObjectOutputStream out = new ObjectOutputStream(bos);
+          out.writeObject(orig);
+          out.flush();
+          out.close();
+
+          // Make an input stream from the byte array and read
+          // a copy of the object back in.
+          ObjectInputStream in = new ObjectInputStream(
+              new ByteArrayInputStream(bos.toByteArray()));
+          obj = in.readObject();
+      }
+      catch(IOException e) {
+          e.printStackTrace();
+      }
+      catch(ClassNotFoundException cnfe) {
+          cnfe.printStackTrace();
+      }
+      return obj;
+  }
+  public void dumpContext(UimaContextAdmin ctx) {
+         if ( !(ctx instanceof RootUimaContext_impl) ) {
+                 dumpContext(ctx.getRootContext());
+         } else {
+                 Map<String, AnalysisEngineManagement> m = 
ctx.getManagementInterface().getComponents();
+                 for( Entry<String, AnalysisEngineManagement> e : 
m.entrySet()) {
+                         System.out.println(">>>>>>>> "+e.getKey()+" 
"+e.getValue().getUniqueMBeanName());
+                 }
+         }
+  }
   public void initializeAnalysisEngine() throws 
ResourceInitializationException {
     ResourceSpecifier rSpecifier = null;
     
@@ -198,27 +232,38 @@ public class PrimitiveAnalysisEngineCont
       sharedInitSemaphore.acquire();
       // Parse the descriptor in the calling thread.
       rSpecifier = 
UimaClassFactory.produceResourceSpecifier(super.aeDescriptor);
-/*      
-      if ( rSpecifier instanceof AnalysisEngineDescription ) {
-          String name = 
((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().getName();
-          
((AnalysisEngineDescription)rSpecifier).getAnalysisEngineMetaData().setName(name+"-"+Thread.currentThread().getId());
-      //    System.out.println(getUimaContextAdmin().);
-          Field f 
=getUimaContextAdmin().getClass().getDeclaredField("mQualifiedContextName");//getQualifiedContextName()
-          f.setAccessible(true);
-         
-          
f.get(getUimaContextAdmin().getQualifiedContextName()+Thread.currentThread().getId());
-      }
-      */
-      //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, 
String.valueOf(Thread.currentThread().getId()));
-      //String p = 
(String)paramsMap.get(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);//      
+"-"+Thread.currentThread().getId();
-      //p = p.substring(0, p.lastIndexOf(","))+" 
"+Thread.currentThread().getId()+",";
-      //paramsMap.remove(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX);
-      //paramsMap.put(AnalysisEngine.PARAM_MBEAN_NAME_PREFIX, p);
+      UimaContextAdmin uctx = null;
+      if ( parentController != null ) {
+               int scaleout = 1;
+           if (parentController instanceof AggregateAnalysisEngineController) {
+               String key = ((AggregateAnalysisEngineController) 
parentController)
+                       .lookUpDelegateKey(endpointName);
+               if (key == null) {
+                 if (((AggregateAnalysisEngineController) 
parentController).isDelegateKeyValid(endpointName)) {
+                   key = endpointName;
+                 }
+               }
+               if (key == null) {
+                 throw new AsynchAEException(getName() + "-Unable to look up 
delegate "
+                         + endpointName + " in internal map");
+               }
+               Delegate d = ((AggregateAnalysisEngineController) 
parentController)
+                       .lookupDelegate(key);
+               scaleout = d.getEndpoint().getConcurrentRequestConsumers();
+           }
+           if ( scaleout > 1) {
+                   uctx = 
(UimaContextAdmin)parentController.getChildUimaContext(endpointName);
+                   if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                      dumpContext(uctx);
+                   }
+               paramsMap.remove(Resource.PARAM_UIMA_CONTEXT);
+               paramsMap.put(Resource.PARAM_UIMA_CONTEXT, uctx);
+           } else {
+               uctx = 
(UimaContextAdmin)paramsMap.get(Resource.PARAM_UIMA_CONTEXT);
+           }
+      }
       AnalysisEngine ae = UIMAFramework.produceAnalysisEngine(rSpecifier, 
paramsMap);
    
-      //AnalysisEngineManagementImpl aemi = 
(AnalysisEngineManagementImpl)ae.getManagementInterface();
-      //System.out.println("..... Created AE instance - Mgmt Instance 
Hashcode:"+aemi.hashCode()+" Unique MBean Name:"+aemi.getUniqueMBeanName());
-         // ae.getManagementInterface().getClass().
       //  Call to produceAnalysisEngine() may take a long time to complete. 
While this
         //  method was executing, the service may have been stopped. Before 
continuing 
         //  check if the service has been stopped. If so, destroy AE instance 
and return.
@@ -578,10 +623,7 @@ public class PrimitiveAnalysisEngineCont
    * @param uimaFullyQualifiedAEContext
    */
   private void getLeafManagementObjects(AnalysisEngineManagement aem, 
List<AnalysisEnginePerformanceMetrics> result, String 
uimaFullyQualifiedAEContext) {
-   
-    if (aem.getComponents().isEmpty()) {
-      // skip Flow Controller
-      if (!aem.getName().equals("Fixed Flow Controller")) {
+    if (aem.getComponents().isEmpty() && Thread.currentThread().getId() == 
aem.getThreadId()) {
         // is this primitive AE delegate in an aggregate. If so the mbean 
unique name will have "p0=" string. An examples mbean
         // name looks like this:
         // org.apache.uima:type=ee.jms.services,s=Top Level Aggregate TAE Uima 
EE Service,p0=Top Level Aggregate TAE Components,p1=SecondLevelAggregateCM 
Components,p2=ThirdLevelAggregateCM Components,name=Multiplier1
@@ -602,11 +644,11 @@ public class PrimitiveAnalysisEngineCont
             }
           }
         }
-        result.add(deepCopyMetrics(aem, uimaFullyQualifiedAEContext));
-      } 
+        AnalysisEnginePerformanceMetrics m = deepCopyMetrics(aem, 
uimaFullyQualifiedAEContext);
+        result.add(m);
     } else {
       for (AnalysisEngineManagement child : 
(Iterable<AnalysisEngineManagement>) aem.getComponents().values()) {
-        getLeafManagementObjects(child, result, produceUniqueName(aem));
+         getLeafManagementObjects(child, result, produceUniqueName(aem));
       }
     }
   }
@@ -628,7 +670,6 @@ public class PrimitiveAnalysisEngineCont
   }
    
   private String produceUniqueName(AnalysisEngineManagement aem) {
-//       System.out.println(">>>>>>>>>>>>>>>>>>> 
Thread:"+Thread.currentThread().getId()+" MBean:"+aem.getUniqueMBeanName());
     String[] parts = aem.getUniqueMBeanName().split(",");
     StringBuffer sb = new StringBuffer();
     for( String part : parts) {
@@ -652,8 +693,6 @@ public class PrimitiveAnalysisEngineCont
         sb.append("/").append(part.substring(part.trim().indexOf("=")+1));
       }
     }
-       //  System.out.println("<<<<<<<<<<<<<<<<<<< 
Thread:"+Thread.currentThread().getId()+" MBean:"+sb.toString());
-
     return sb.toString();
   }
 
@@ -699,6 +738,7 @@ public class PrimitiveAnalysisEngineCont
             uimaFullyQualifiedAEContext,
             aem.getAnalysisTime(),
             aem.getNumberOfCASesProcessed());
+    
   }
   
   /**
@@ -762,7 +802,6 @@ public class PrimitiveAnalysisEngineCont
       
       
       AnalysisEngineManagement rootAem = ae.getManagementInterface();
-      //System.out.println("%%%%%%%%%%%%%%%%%%%% Unique MBean 
Name:"+rootAem.getUniqueMBeanName()+" AE Instance Hashcode"+ae.hashCode());
       if ( rootAem.getComponents().size() > 0 ) {
           getLeafManagementObjects(rootAem, beforeAnalysisManagementObjects);
       } else {
@@ -1056,7 +1095,6 @@ public class PrimitiveAnalysisEngineCont
       // Set total number of children generated from this CAS
       // Store total time spent processing this input CAS
       
getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
-
       //  Fetch AE's management information that includes per component 
performance stats
       //  These stats are internally maintained in a Map. If the AE is an 
aggregate
       //  the Map will contain AnalysisEngineManagement instance for each AE.
@@ -1066,16 +1104,11 @@ public class PrimitiveAnalysisEngineCont
           //  primitive AE's AnalysisEngineManagement instance and placing it 
in 
           //  afterAnalysisManagementObjects List.
           getLeafManagementObjects(aem, afterAnalysisManagementObjects);
-         // 
System.out.println("-----------------Unique1:"+aem.getUniqueMBeanName());
-          //System.out.println("-----------------Simple1:"+aem.getName());
       } else {
            String path=produceUniqueName(aem);
-        //   
System.out.println("-----------------Unique2:"+aem.getUniqueMBeanName());
-         // System.out.println("-----------------Simple2:"+aem.getName());
           afterAnalysisManagementObjects.add(deepCopyMetrics(aem, path));   
           
       }
-
       //  Create a List to hold per CAS analysisTime and total number of CASes 
processed
       //  by each AE. This list will be serialized and sent to the client
       List<AnalysisEnginePerformanceMetrics> performanceList = 
@@ -1084,23 +1117,43 @@ public class PrimitiveAnalysisEngineCont
       //  metrics
       for (AnalysisEnginePerformanceMetrics after : 
afterAnalysisManagementObjects) {
         for( AnalysisEnginePerformanceMetrics before: 
beforeAnalysisManagementObjects) {
-          if ( before.getUniqueName().equals(after.getUniqueName())) {
-            
-            AnalysisEnginePerformanceMetrics metrics = 
-              new AnalysisEnginePerformanceMetrics(after.getName(),
-                      after.getUniqueName(),
-                      after.getAnalysisTime()- before.getAnalysisTime(),
-                      after.getNumProcessed());
-           // 
System.out.println("********************"+metrics.getUniqueName()+" Analysis 
Time:"+metrics.getAnalysisTime());
-            //System.out.println("********************"+metrics.getName()+" 
Analysis Time:"+metrics.getAnalysisTime());
+               if ( before.getUniqueName().equals(after.getUniqueName())) {
+                 boolean found = false;
+                 AnalysisEnginePerformanceMetrics metrics = null;
+                 for( AnalysisEnginePerformanceMetrics met : 
parentCasStateEntry.getAEPerformanceList() ) {
+                  String un = after.getUniqueName();
+                         if ( un.indexOf("Components") >= -1 ) {
+                                 un = un.substring(un.indexOf("/"));
+                         }
+                         if ( met.getUniqueName().equals(un)) {
+                      long at = after.getAnalysisTime()- 
before.getAnalysisTime();
+                      metrics = new 
AnalysisEnginePerformanceMetrics(after.getName(),
+                              un,//after.getUniqueName(),
+                              met.getAnalysisTime()+at,
+                              after.getNumProcessed());
+                      found = true;
+                      parentCasStateEntry.getAEPerformanceList().remove(met);
+                      break;
+                         } 
+                 }
+                 if ( !found ) {
+                         String un = after.getUniqueName();
+                         
+                         if ( un.indexOf("Components") >= -1 ) {
+                                 un = un.substring(un.indexOf("/"));
+                         }
+                  metrics = new 
AnalysisEnginePerformanceMetrics(after.getName(),
+                          un,//after.getUniqueName(),
+                          after.getAnalysisTime()- before.getAnalysisTime(),
+                          after.getNumProcessed());
+                         
+                 }
             performanceList.add(metrics);
             break;
           }
         }
       }
-      //  Save this component performance metrics
       parentCasStateEntry.getAEPerformanceList().addAll(performanceList);
-      
       if (!anEndpoint.isRemote()) {
         inputCASReturned = true;
         UimaTransport transport = getTransport(anEndpoint.getEndpoint());
@@ -1112,7 +1165,6 @@ public class PrimitiveAnalysisEngineCont
                       getInProcessCache().
                         
getTopAncestorCasEntry(getInProcessCache().getCacheEntryForCAS(aCasReferenceId));
             if ( ancestor != null ) {
-                // Set a flag on the input CAS to indicate that the processing 
was aborted
                ancestor.addDelegateMetrics(getKey(), performanceList);
             }
           } catch (Exception e) {

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1716236&r1=1716235&r2=1716236&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
 Tue Nov 24 19:13:33 2015
@@ -196,27 +196,51 @@ public class ProcessResponseHandler exte
       // during deserialization
       CacheEntry cacheEntry = 
getController().getInProcessCache().getCacheEntryForCAS(
               casReferenceId);
+      // check if the client requested Performance Metrics for the CAS
       if ( 
aMessageContext.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
         try {
+          // find top ancestor of this CAS. All metrics are accumulated there 
since
+          // this is what will be returned to the client
           CacheEntry ancestor = 
                   getController().
                     getInProcessCache().
                       getTopAncestorCasEntry(cacheEntry);
           if ( ancestor != null ) {
+               // fetch Performance Metrics from remote delegate reply
             List<AnalysisEnginePerformanceMetrics> metrics = 
                     
UimaSerializer.deserializePerformanceMetrics(aMessageContext.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
-            
             List<AnalysisEnginePerformanceMetrics> adjustedMetrics =
                     new ArrayList<AnalysisEnginePerformanceMetrics>();
             for(AnalysisEnginePerformanceMetrics delegateMetric : metrics ) {
-              String tmp =
-                      
delegateMetric.getUniqueName().substring(delegateMetric.getUniqueName().indexOf(","));
-              String adjustedUniqueName =
-                ((AggregateAnalysisEngineController) 
getController()).getJMXDomain()+((AggregateAnalysisEngineController) 
getController()).getJmxContext()+tmp;
-              AnalysisEnginePerformanceMetrics metric =
-                      new 
AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
+              String adjustedUniqueName = ((AggregateAnalysisEngineController) 
getController()).getJmxContext();
+
+              if ( adjustedUniqueName.startsWith("p0=")) {
+                 adjustedUniqueName = adjustedUniqueName.substring(3);  // 
skip p0=
+              }
+              adjustedUniqueName = adjustedUniqueName.replaceAll(" 
Components", "");
+              if (!adjustedUniqueName.startsWith("/")) {
+                 adjustedUniqueName = "/"+adjustedUniqueName;
+              }
+              adjustedUniqueName += delegateMetric.getUniqueName();
+              
+              boolean found = false;
+              AnalysisEnginePerformanceMetrics metric = null;
+              for( AnalysisEnginePerformanceMetrics met : 
ancestor.getDelegateMetrics() ) {
+                 if ( met.getUniqueName().equals(adjustedUniqueName)) {
+                         long at = delegateMetric.getAnalysisTime();
+                         long count = delegateMetric.getNumProcessed();
+                         metric = new 
AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,at,count);
+                         found = true;
+                         ancestor.getDelegateMetrics().remove(met);
+                         break;
+                 }
+              }
+              if ( !found ) {
+                  metric = new 
AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
+              }
               adjustedMetrics.add(metric);
             }
+            
             ancestor.addDelegateMetrics(delegateKey, adjustedMetrics, true);  
// true=remote
           }
         } catch (Exception e) {

Added: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java?rev=1716236&view=auto
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
 (added)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
 Tue Nov 24 19:13:33 2015
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.aae.monitor.statistics;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AEMetrics {
+         private String name;
+         private AtomicLong analysisTime = new AtomicLong();
+         private AtomicLong numProcessed = new AtomicLong();
+       public String getName() {
+               return name;
+       }
+       public void setName(String name) {
+               this.name = name;
+       }
+       public AtomicLong getAnalysisTime() {
+               return analysisTime;
+       }
+       public void incrementAnalysisTime(long analysisTime) {
+               this.analysisTime.addAndGet( analysisTime );
+       }
+       public AtomicLong getNumProcessed() {
+               return numProcessed;
+       }
+       public void incrementNumProcessed(long numProcessed) {
+               this.numProcessed.addAndGet(numProcessed);
+       }
+}

Propchange: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/monitor/statistics/AEMetrics.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to