Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,267 @@
+package org.apache.uima.aae.component.dd;
+
+import java.io.File;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.uima.aae.UimaASUtils;
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.CasMultiplierComponent;
+import org.apache.uima.aae.component.CasMultiplierNature;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
+import org.apache.uima.aae.component.factory.AnalysisEngineComponentFactory;
+import org.apache.uima.resource.ResourceSpecifier;
+import 
org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
+import org.apache.uima.resourceSpecifier.AnalysisEngineType;
+import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType;
+import org.apache.uima.resourceSpecifier.ServiceType;
+import org.apache.xmlbeans.XmlDocumentProperties;
+
+public class DeploymentDescriptorProcessor {
+       
+       private AnalysisEngineDeploymentDescriptionDocument dd = null;
+       public DeploymentDescriptorProcessor() {
+               
+       }
+       public 
DeploymentDescriptorProcessor(AnalysisEngineDeploymentDescriptionDocument dd) {
+               this.dd = dd;
+       }
+       public AnalysisEngineComponent newComponent(String descriptorPath) 
throws Exception {
+               this.dd = parseDD(descriptorPath);
+               return newComponent();
+       }
+       
+       public TopLevelServiceComponent newComponent() throws Exception {
+               ServiceType service =
+                               
dd.getAnalysisEngineDeploymentDescription().getDeployment().getService();
+               XmlDocumentProperties dp = dd.documentProperties();
+               System.out.println(dp.getSourceName());
+
+               // get absolute path to resource specifier
+               String  aeDescriptor = 
+                               UimaASUtils.fixPath(dp.getSourceName(), 
getDescriptor(service));
+
+               // Get top level uima resource specifier
+               ResourceSpecifier resourceSpecifier = 
+                               
UimaClassFactory.produceResourceSpecifier(aeDescriptor);
+
+               AnalysisEngineComponentFactory componentFactory =
+                               new AnalysisEngineComponentFactory();
+               // Process top level AE resource specifier and all its 
delegates.
+               // For aggregates, recursively walk through a delegate tree, 
producing 
+               // a tree of AnalysisEngineComponent instances, one for every 
delegate.
+               
+               AnalysisEngineComponent aeComponent = 
+                               componentFactory.produce(resourceSpecifier, 
null);
+
+               // Decorate above with top level component functionality
+               TopLevelServiceComponent topLevelComponent = 
+                               new TopLevelServiceComponent(aeComponent, dd);
+
+//             if ( aeComponent.isPrimitive()) {
+//                     // The AE descriptor is for a primitive AE
+//             } else {
+                       // the AE descriptor is for an aggregate AE. Check DD to
+                       // see if its an async aggregate. Its async=true or
+                       // has delegates.
+//             if ( isAggregate(service.getAnalysisEngine()) ) {
+               if ( topLevelComponent.isAggregate()) {
+                               // All delegates will be colocated unless
+                               // a delegate is remote. That is determined 
+                               // below.
+//                             aeComponent.enableAsync();
+                               //if ( 
dd.getAnalysisEngineDeploymentDescription().getDeployment().getProtocol()
+                               DelegateAnalysisEngineType[] colocatedDelegates 
= null;
+                               if ( 
Objects.nonNull(service.getAnalysisEngine()) &&
+                                        
Objects.nonNull(service.getAnalysisEngine().getDelegates()) ) {
+                                       colocatedDelegates = 
service.getAnalysisEngine().
+                                                                               
        getDelegates().
+                                                                               
        getAnalysisEngineArray();
+                                       
+                               }
+                               handleColocatedDelegates(colocatedDelegates, 
aeComponent.getChildren());
+                               
+                               RemoteAnalysisEngineType[] remoteDelegates = 
null;
+                               if ( 
Objects.nonNull(service.getAnalysisEngine()) &&
+                                        
Objects.nonNull(service.getAnalysisEngine().getDelegates())) {
+                                       remoteDelegates = 
service.getAnalysisEngine().
+                                                                       
getDelegates().
+                                                                       
getRemoteAnalysisEngineArray();
+                               }
+                               handleRemoteDelegates(remoteDelegates, 
aeComponent.getChildren());
+
+//                             service.getAnalysisEngine().
+//                     getDelegates().
+                       //}
+               } 
+
+               
+               return topLevelComponent;
+       }
+       public AnalysisEngineDeploymentDescriptionDocument parseDD(String 
descriptorPath) throws Exception {
+               return 
AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new 
File(descriptorPath));     
+
+       }
+       private boolean isAggregate(AnalysisEngineType aet) {
+               return ("true".equals(aet.getAsync()) || aet.isSetAsync() || 
aet.isSetDelegates());
+       } 
+       private String getDescriptor(ServiceType service) {
+               String aeDescriptor = 
service.getTopDescriptor().getImport().getLocation();
+               if ( aeDescriptor == null ) {
+                       aeDescriptor = 
service.getTopDescriptor().getImport().getName();
+               }
+               return aeDescriptor;
+       }
+       
+       private void markAllDelegatesAsAsync(List<AnalysisEngineComponent> 
resourceSpecifierDelegates) {
+               for ( AnalysisEngineComponent aec : resourceSpecifierDelegates 
) {
+                       if ( !aec.isPrimitive() ) {
+                               handleColocatedDelegates(null, 
aec.getChildren());
+                       }
+                       aec.enableAsync();
+               }
+               
+       }
+       private void handleColocatedDelegates(DelegateAnalysisEngineType[] 
ddDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+               if ( Objects.isNull(ddDelegates)) {
+                       // the dd does not include delegates but is configured 
as an asynch service
+                       // so process resource specifiers recursively marking 
each part of a pipeline
+                       // as asynch so that it is deployed as a collocated 
asynch service.
+                       
handleDefaultColocatedDelegates(resourceSpecifierDelegates);
+                       //markAllDelegatesAsAsync(resourceSpecifierDelegates);
+                       return; 
+               }
+               // go through all delegates defined in the deployment 
descriptor (dd)
+               for( DelegateAnalysisEngineType ddDelegate : ddDelegates ) {
+                       // find a matching delegate in the AE resource specifier
+                       for( AnalysisEngineComponent resourceSpecifierDelegate 
: resourceSpecifierDelegates ) {
+                               if ( 
ddDelegate.getKey().equals(resourceSpecifierDelegate.getKey())) {
+                                       if (  
resourceSpecifierDelegate.isCasMultiplier() && 
Objects.nonNull(ddDelegate.getCasMultiplier())) {
+                                               // plugin cas multiplier 
settings from dd
+                                               CasMultiplierNature 
casMultiplier =
+                                                               new 
CasMultiplierComponent(ddDelegate.getCasMultiplier().getDisableJCasCache(), 
+                                                                               
                   
TypeConverter.convertStringToLong(ddDelegate.getCasMultiplier().getInitialFsHeapSize(),
 1000), 
+                                                                               
                   ddDelegate.getCasMultiplier().getPoolSize(),
+                                                                               
                   
TypeConverter.convertStringToBoolean(ddDelegate.getCasMultiplier().getProcessParentLast(),true)
 );
+                                               
resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+                                               
+                                       }
+                                       
resourceSpecifierDelegate.enableAsync();   // delegate is async
+                                       
+                                       resourceSpecifierDelegate.
+                                             
withScaleout(Objects.isNull(ddDelegate.getScaleout()) ? 1 
:ddDelegate.getScaleout().getNumberOfInstances()).
+                                             withRequestThreadPoolSize( 
TypeConverter.convertStringToInt(ddDelegate.getInputQueueScaleout(), 1)).
+                                             withReplyThreadPoolSize( 
TypeConverter.convertStringToInt(ddDelegate.getInternalReplyQueueScaleout(),1));
+                                                                               
+                                       if ( isAggregate(ddDelegate) ) {
+                                               
+                                               
resourceSpecifierDelegate.enableAsync();
+                                               for ( AnalysisEngineComponent 
aec : resourceSpecifierDelegate.getChildren() ) {
+                                                       aec.enableAsync();
+                                               }
+                                               if ( ddDelegate.getDelegates() 
!= null ) {
+                                                       // recursively process 
collocated delegates
+                                                       
handleColocatedDelegates(ddDelegate.getDelegates().getAnalysisEngineArray() , 
resourceSpecifierDelegate.getChildren());
+                                                       
handleRemoteDelegates(ddDelegate.getDelegates().getRemoteAnalysisEngineArray(), 
resourceSpecifierDelegate.getChildren());
+                                               }
+                                               
+                                       } 
+                                       break;  // found a match and completed 
processing it. We are done with it.
+                               }
+                       }
+               }
+       }
+
+       private void 
handleDefaultColocatedDelegates(List<AnalysisEngineComponent> 
resourceSpecifierDelegates) {
+               // find a matching delegate in the AE resource specifier
+               for (AnalysisEngineComponent resourceSpecifierDelegate : 
resourceSpecifierDelegates) {
+                       if (resourceSpecifierDelegate.isCasMultiplier()) {
+                               // plugin cas multiplier settings from dd
+                               CasMultiplierNature casMultiplier = new 
CasMultiplierComponent(false, 1000, 1, true);
+                               
resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier);
+                       }
+                       resourceSpecifierDelegate.withScaleout(1).
+                               withRequestThreadPoolSize(1).
+                               withReplyThreadPoolSize(1).
+                               enableAsync();
+                       
+                       if (!resourceSpecifierDelegate.isPrimitive()) {
+                               
handleDefaultColocatedDelegates(resourceSpecifierDelegate.getChildren());
+                       }
+               }
+       }
+       private void handleRemoteDelegates(RemoteAnalysisEngineType[] 
remoteDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) {
+               if ( Objects.isNull(remoteDelegates) ) {
+                       return;
+               }
+               for( RemoteAnalysisEngineType remoteDelegateType : 
remoteDelegates ) {
+                       // find a matching delegate in the AE resource specifier
+                       for( AnalysisEngineComponent resourceSpecifierDelegate 
: resourceSpecifierDelegates ) {
+                               if ( 
remoteDelegateType.getKey().equals(resourceSpecifierDelegate.getKey())) {
+                                       // find an index of the current 
component in the list. We 
+                                       // will decorate this component as a 
remote, and replace
+                                       // it in the list.
+                                       int index = 
+                                                       
resourceSpecifierDelegates.indexOf(resourceSpecifierDelegate);
+                                       // Decorate existing component with 
remote flavor
+                                       RemoteAnalysisEngineComponent 
remoteDelegate = 
+                                                       new 
RemoteAnalysisEngineComponent(resourceSpecifierDelegate, remoteDelegateType);
+                                       
+                                       //replace component with decorated 
remote
+                                       resourceSpecifierDelegates.set(index, 
remoteDelegate);
+                               }
+                               
+                       }
+                       
+               }
+       }
+/*     
+       public void parse(DelegateAnalysisEngineType colocatedDelegate, 
AnalysisEngineComponent component) {
+               if ( isAggregate(colocatedDelegate) ) {
+                       DelegateAnalysisEngineType[] colocatedDelegates = 
+                                       
colocatedDelegate.getDelegates().getAnalysisEngineArray();
+                       handleColocatedDelegates(colocatedDelegates, 
component.getChildren());
+               } else {
+                       
+               }
+       }
+*/
+       public static void main(String[] args) {
+               try {
+                       DeploymentDescriptorProcessor ddp = 
+                                       new DeploymentDescriptorProcessor();
+                       ddp.newComponent(args[0]);
+               } catch( Exception e) {
+                       e.printStackTrace();
+               }
+       }
+       private static class TypeConverter {
+               private static int convertStringToInt(String value, int 
defaultValue) {
+                       int returnValue = defaultValue;
+                       try {
+                               returnValue = Integer.parseInt(value);
+                       } catch( Exception e) {
+                       }
+                       return returnValue;
+               }
+               private static boolean convertStringToBoolean(String value, 
boolean defaultValue) {
+                       boolean returnValue = defaultValue;
+                       try {
+                               returnValue = Boolean.parseBoolean(value);
+                       } catch( Exception e) {
+                       }
+                       return returnValue;
+               }
+               private static long convertStringToLong(String value, long 
defaultValue) {
+                       long returnValue = defaultValue;
+                       try {
+                               returnValue = Long.parseLong(value);
+                       } catch( Exception e) {
+                       }
+                       return returnValue;
+               }
+       }
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.component.factory;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.uima.aae.UimaClassFactory;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.PrimitiveAnalysisEngineComponent;
+import org.apache.uima.analysis_engine.AnalysisEngineDescription;
+import org.apache.uima.resource.ResourceSpecifier;
+
+public class AnalysisEngineComponentFactory {
+       
+    private AnalysisEngineDescription getAeDescription(ResourceSpecifier rs) {
+               return (AnalysisEngineDescription) rs;
+    }
+    
+       public AnalysisEngineComponent produce(ResourceSpecifier rs, String 
key) throws Exception {
+               AnalysisEngineDescription aeDescriptor = getAeDescription(rs);
+               AnalysisEngineComponent component = null;
+               
+               if ( aeDescriptor.isPrimitive() ) {
+                       component = new PrimitiveAnalysisEngineComponent(key, 
rs);
+               } else {
+                       component = 
+                                       new 
AggregateAnalysisEngineComponent(key, rs);
+                       Map<String, ResourceSpecifier> delegates =
+                               
aeDescriptor.getDelegateAnalysisEngineSpecifiers();
+               for(Entry<String, ResourceSpecifier> delegateEntry: 
delegates.entrySet() ) {
+                       component.add(produce(delegateEntry.getValue(), 
delegateEntry.getKey() ));
+               }
+               }
+               
+               if ( 
aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed()
 ) {
+                       component.enableScaleout();
+               }
+               if ( 
aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes()
 ) {
+                       component.enableCasMultipler();
+               }
+
+               return component;
+       }
+       public static void main(String[] args ) {
+               try {
+                       AnalysisEngineComponentFactory factory = 
+                                       new AnalysisEngineComponentFactory();
+                       ResourceSpecifier resourceSpecifier = 
+                                       
UimaClassFactory.produceResourceSpecifier(args[0]);
+                       factory.produce(resourceSpecifier, "TopLevel");
+               } catch( Exception e) {
+                       e.printStackTrace();
+               }
+       }
+}

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
 Thu Oct 18 14:12:00 2018
@@ -2028,25 +2028,54 @@ implements
     return retValue;
   }
 
-  private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry 
cacheEntry, FinalStep aStep) {
-   
-         // Get the key of the Cas Producer
-    String casProducer = cacheEntry.getCasProducerAggregateName();
-    // CAS is considered new from the point of view of this service IF it was 
produced by it
-    boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && 
getComponentName().equals(
-            casProducer));
-    if (parent != null && parent.isFailed() && isNewCas) {
-      return true; // no point to continue if the CAS was produced in this 
aggregate and its parent
-                   // failed here
-    }
-    // If the CAS was generated by this component but the Flow Controller 
wants to drop the CAS OR
-    // this component
-    // is not a Cas Multiplier
-    if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 && 
(aStep.getForceCasToBeDropped() || !isCasMultiplier())) {
-      return true;
-    }
-    return false;
-  }
+       private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry 
cacheEntry, FinalStep aStep) {
+
+               // Get the key of the Cas Producer
+               String casProducer = cacheEntry.getCasProducerAggregateName();
+               // CAS is considered new from the point of view of this service 
IF it was
+               // produced by it
+               boolean isNewCas = (cacheEntry.isNewCas() && casProducer != 
null && getComponentName().equals(casProducer));
+               // force to drop a child CAS if this service is not a Cas 
Multiplier
+               if ( isNewCas ) {
+                       if ( !isCasMultiplier()) {
+                               System.out.println(">>>>>>>>>>>>>>>>>>> FORCE 
TO DROP THE CAS");
+                               return true;
+                       }
+                       if (parent != null && parent.isFailed()) {
+                               return true; // no point to continue if the CAS 
was produced in this aggregate and its parent
+                                                               // failed 
+                       }
+                       // If the CAS was generated by this component but the 
Flow Controller wants to
+                       // drop the CAS OR this component is not a Cas 
Multiplier
+                       if ( parent != null && 
parent.getSubordinateCasInPlayCount() == 0
+                                       && (aStep.getForceCasToBeDropped() || 
!isCasMultiplier())) {
+                               return true;
+                       }
+               }
+
+
+               /*
+               if (isNewCas && isTopLevelComponent() && !isCasMultiplier()) {
+                       System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP 
THE CAS");
+                       return true;
+               }
+               if (parent != null && parent.isFailed() && isNewCas) {
+                       return true; // no point to continue if the CAS was 
produced in this aggregate and its parent
+                                                       // failed here
+               }
+               // If the CAS was generated by this component but the Flow 
Controller wants to
+               // drop the CAS OR
+               // this component
+               // is not a Cas Multiplier
+               if (isNewCas && parent.getSubordinateCasInPlayCount() == 0
+                               && (aStep.getForceCasToBeDropped() || 
!isCasMultiplier())) {
+                       return true;
+               }
+               
+               */
+               
+               return false;
+       }
 
   private boolean casHasExceptions(CasStateEntry casStateEntry) {
     return (casStateEntry.getErrors().size() > 0) ? true : false;

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
 Thu Oct 18 14:12:00 2018
@@ -40,6 +40,7 @@ import org.apache.uima.aae.jmx.JmxManage
 import org.apache.uima.aae.jmx.ServiceErrors;
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.aae.message.Origin;
 import org.apache.uima.aae.monitor.Monitor;
 import org.apache.uima.aae.spi.transport.UimaMessageListener;
 import org.apache.uima.aae.spi.transport.UimaTransport;
@@ -55,6 +56,8 @@ public interface AnalysisEngineControlle
 
   public static final String AEInstanceCount = "AEInstanceCount";
 
+  public Origin getOrigin();
+  
   public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException;
 
   public ControllerLatch getControllerLatch();

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 Thu Oct 18 14:12:00 2018
@@ -78,6 +78,8 @@ import org.apache.uima.aae.jmx.ServiceEr
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.jmx.ServicePerformance;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
+import org.apache.uima.aae.message.UimaAsOrigin;
 import org.apache.uima.aae.monitor.Monitor;
 import org.apache.uima.aae.monitor.MonitorBaseImpl;
 import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
@@ -111,6 +113,7 @@ public abstract class BaseAnalysisEngine
        JMS,
        DIRECT
   };
+  private final Origin origin;
   private static final Class<?> CLASS_NAME = 
BaseAnalysisEngineController.class;
   private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
   public enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED 
};
@@ -284,7 +287,7 @@ public abstract class BaseAnalysisEngine
   protected abstract void doWarmUp(CAS cas, String casReferenceId) throws 
Exception;
 
   public BaseAnalysisEngineController() {
-
+         origin = new UimaAsOrigin("");
   }
  
   public BaseAnalysisEngineController(AnalysisEngineController 
aParentController,
@@ -316,7 +319,7 @@ public abstract class BaseAnalysisEngine
           Map aDestinationMap, JmxManagement aJmxManagement,boolean 
disableJCasCache) throws Exception {
     
        System.out.println("C'tor Called Descriptor:"+aDescriptor);
-
+    origin = new UimaAsOrigin(anEndpointName);
        casManager = aCasManager;
     inProcessCache = anInProcessCache;
     localCache = new LocalCache(this);
@@ -529,7 +532,9 @@ public abstract class BaseAnalysisEngine
          return uimaContext;
   }
 
-
+  public Origin getOrigin() {
+         return origin;
+  }
   public void setThreadFactory(ThreadPoolTaskExecutor factory) {
          threadFactory = factory;
   }

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
 Thu Oct 18 14:12:00 2018
@@ -21,6 +21,7 @@ package org.apache.uima.aae.controller;
 
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.jmx.ServiceInfo;
+import org.apache.uima.aae.message.Origin;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.TypeSystemImpl;
 
@@ -31,6 +32,12 @@ public interface Endpoint {
 
   public static final int DISABLED = 3;
 
+  public void setMessageOrigin(Origin origin);
+  
+  public Origin getMessageOrigin();
+  
+  public String getUniqueId();
+  
   public boolean isJavaRemote();
   
   public void setJavaRemote();

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
 Thu Oct 18 14:12:00 2018
@@ -20,11 +20,13 @@
 package org.apache.uima.aae.controller;
 
 import java.util.Timer;
+import java.util.UUID;
 
 import 
org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.Origin;
 import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.TypeSystemImpl;
 import org.apache.uima.resource.ResourceSpecifier;
@@ -32,6 +34,8 @@ import org.apache.uima.resource.Resource
 public class Endpoint_impl implements Endpoint, Cloneable {
   private static final Class<?> CLASS_NAME = Endpoint_impl.class;
 
+  private String uniqueId = UUID.randomUUID().toString();
+  
   private volatile boolean javaRemote=false;
   
   private volatile Object destination = null;
@@ -130,6 +134,18 @@ public class Endpoint_impl implements En
   
  private ResourceSpecifier resourceSpecifier;
   
+ private Origin messageOrigin;
+ 
+ public void setMessageOrigin(Origin origin) {
+        this.messageOrigin = origin;
+ }
+ 
+ public Origin getMessageOrigin() {
+        return messageOrigin;
+ }
+  public String getUniqueId() {
+         return uniqueId;
+  }
   public void setJavaRemote() {
         javaRemote = true;
   }

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.message.MessageProcessor;
+
+public abstract class AbstractUimaAsConsumer implements UimaAsConsumer{
+       
+       protected abstract void setMessageProcessor(MessageProcessor processor);
+
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,5 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ComponentConnector {
+       public Object getConnectionInfo();
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,31 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.definition.connectors.basic.BasicConnector;
+import org.apache.uima.aae.definition.connectors.basic.DirectConnector;
+
+public class ConnectorFactory {
+
+       public static ComponentConnector newConnector(String protocol, String 
vendor) {
+               ComponentConnector connector=null;
+               switch(protocol.toLowerCase()) {
+               case "jms":
+                       connector = getJmsConnector(vendor);
+                       break;
+               case "direct":
+                       connector = new DirectConnector();
+                       break;
+                       
+               default:
+                       connector = new BasicConnector();
+               }
+               return connector;
+       }
+       private static ComponentConnector getJmsConnector(String vendor) {
+               return null;
+       }
+       public static void main(String[] args) {
+               // TODO Auto-generated method stub
+
+       }
+
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,7 @@
+package org.apache.uima.aae.definition.connectors;
+
+public interface ListenerCallback {
+       public void onInitializationError(Exception e);
+       public boolean failedInitialization();
+       public Exception getException();
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,11 @@
+package org.apache.uima.aae.definition.connectors;
+
+import java.util.Map;
+
+public interface UimaAsConnector {
+
+       public UimaAsEndpoint createEndpoint(String uri, Map<String, Object> 
params) 
+       throws Exception;
+       
+       
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,17 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsConsumer extends Lifecycle {
+       public enum ConsumerType {GetMeta,ProcessCAS,Cpc,FreeCAS,Reply,Info};
+       
+       public void initialize() throws Exception;
+       public void initialize(AnalysisEngineController controller) throws 
Exception;
+       
+       public void consume(DirectMessage message) throws Exception;
+       
+       public ConsumerType getType();
+       
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,15 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.message.MessageContext;
+
+public interface UimaAsEndpoint extends Lifecycle {
+       public UimaAsProducer createProducer(String targetUri) throws Exception;
+       public UimaAsProducer createProducer(UimaAsConsumer consumer, String 
delegateKey)  throws Exception;
+       public UimaAsConsumer createConsumer(String targetUri, ConsumerType 
type, int consumerThreadCount) throws Exception;
+       public void dispatch(MessageContext messageContext) throws Exception;
+       public UimaAsConsumer getConsumer(String targetUri, ConsumerType type);
+       public MessageContext createMessage(int command, int messageType, 
Endpoint endpoint);
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors;
+
+import org.apache.uima.aae.Lifecycle;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsMessage;
+import org.apache.uima.as.client.DirectMessage;
+
+public interface UimaAsProducer extends Lifecycle {
+
+       public void dispatch(DirectMessage message) throws Exception;
+       public void dispatch(DirectMessage message, UimaAsConsumer target) 
throws Exception;
+
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,12 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class BasicConnector implements ComponentConnector {
+
+       @Override
+       public Object getConnectionInfo() {
+               return "";
+       }
+
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,13 @@
+package org.apache.uima.aae.definition.connectors.basic;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class DirectConnector implements ComponentConnector {
+
+       @Override
+       public Object getConnectionInfo() {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.definition.connectors.jms;
+
+import org.apache.uima.aae.definition.connectors.ComponentConnector;
+
+public class ActiveMqConnector implements ComponentConnector {
+       private final Object connection;
+       
+       ActiveMqConnector(String queue, String broker, int prefetch) {
+               connection = new ActiveMqConnection(queue, broker, prefetch);
+       }
+       @Override
+       public Object getConnectionInfo() {
+               return connection;
+       }
+
+       public class ActiveMqConnection {
+               private final String queueName;
+               private final String brokerUrl;
+               private final int prefetch;
+               
+               ActiveMqConnection(String queue, String broker, int prefetch) {
+                       this.queueName = queue;
+                       this.brokerUrl = broker;
+                       this.prefetch = prefetch;
+               }
+               public String getQueueName() {
+                       return queueName;
+               }
+               public String getBrokerUrl() {
+                       return brokerUrl;
+               }
+               public int getPrefetch() {
+                       return prefetch;
+               }
+       }
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,9 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.AnalysisEngineController;
+
+public interface MessageProcessor {
+
+       public void process(MessageContext message) throws Exception;
+       public AnalysisEngineController getController();
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,6 @@
+package org.apache.uima.aae.message;
+
+public interface Origin {
+       public String getUniqueId();
+       public String getName();
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,36 @@
+package org.apache.uima.aae.message;
+
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.AsynchAEException;
+
+public interface UimaAsMessage {
+       public enum Command {GetMetaRequest, GetMetaResponse, CpcRegeuest, 
CpcResponse, ProcessRequest, ProcessResponse };
+        
+         public String getMessageStringProperty(String aMessagePropertyName) 
throws AsynchAEException;
+
+         public int getMessageIntProperty(String aMessagePropertyName) throws 
AsynchAEException;
+
+         public long getMessageLongProperty(String aMessagePropertyName) 
throws AsynchAEException;
+
+         public Object getMessageObjectProperty(String aMessagePropertyName) 
throws AsynchAEException;
+
+         public boolean getMessageBooleanProperty(String aMessagePropertyName) 
throws AsynchAEException;
+
+         public Endpoint getEndpoint();
+
+         public String getStringMessage() throws AsynchAEException;
+
+         public Object getObjectMessage() throws AsynchAEException;
+
+         public byte[] getByteMessage() throws AsynchAEException;
+
+         public Object getRawMessage();
+
+         public boolean propertyExists(String aKey) throws AsynchAEException;
+
+         public void setMessageArrivalTime(long anArrivalTime);
+
+         public long getMessageArrivalTime();
+
+         public String getEndpointName();
+}

Added: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java?rev=1844241&view=auto
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java
 (added)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java
 Thu Oct 18 14:12:00 2018
@@ -0,0 +1,55 @@
+package org.apache.uima.aae.message;
+
+import java.util.UUID;
+
+public class UimaAsOrigin implements Origin {
+
+       private final String uniqueId = UUID.randomUUID().toString();
+       private final String name;
+       
+       public UimaAsOrigin(String name) {
+               this.name = name;
+       }
+       @Override
+       public String getUniqueId() {
+               return uniqueId;
+       }
+
+       @Override
+       public int hashCode() {
+               final int prime = 31;
+               int result = 1;
+               result = prime * result + ((name == null) ? 0 : 
name.hashCode());
+               result = prime * result + ((uniqueId == null) ? 0 : 
uniqueId.hashCode());
+               return result;
+       }
+       @Override
+       public boolean equals(Object obj) {
+               if (this == obj)
+                       return true;
+               if (obj == null)
+                       return false;
+               if (getClass() != obj.getClass())
+                       return false;
+               UimaAsOrigin other = (UimaAsOrigin) obj;
+               if (name == null) {
+                       if (other.name != null)
+                               return false;
+               } else if (!name.equals(other.name))
+                       return false;
+               if (uniqueId == null) {
+                       if (other.uniqueId != null)
+                               return false;
+               } else if (!uniqueId.equals(other.uniqueId))
+                       return false;
+               return true;
+       }
+       @Override
+       public String getName() {
+               return name;
+       }
+       @Override
+    public String toString() {
+        return "Origin[name: " + name + "] [id:"+uniqueId+"]";
+    }
+}

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java
 Thu Oct 18 14:12:00 2018
@@ -20,6 +20,7 @@ package org.apache.uima.aae.service;
 
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.uima.aae.InProcessCache;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.as.client.DirectMessage;
 import org.apache.uima.cas.CAS;
@@ -34,6 +35,7 @@ public interface UimaASService {
        public static final int STOP_NOW = 1001;
          
        public String getEndpoint();
+       public int getScaleout();
        public String getId();
        public void start() throws Exception;
        public void stop() throws Exception;
@@ -46,5 +48,5 @@ public interface UimaASService {
        public void releaseCAS(String casReferenceId, 
BlockingQueue<DirectMessage> releaseCASQueue ) throws Exception;
        public AnalysisEngineMetaData getMetaData() throws Exception; 
        public void removeFromCache(String casReferenceId);
-
+       public UimaASService withInProcessCache(InProcessCache cache);
 }

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java
 Thu Oct 18 14:12:00 2018
@@ -24,7 +24,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -37,9 +38,15 @@ import org.apache.uima.aae.OutputChannel
 import org.apache.uima.aae.UimaASUtils;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.AggregateAnalysisEngineComponent;
+import org.apache.uima.aae.component.AnalysisEngineComponent;
+import org.apache.uima.aae.component.ComponentCasPool;
+import org.apache.uima.aae.component.RemoteAnalysisEngineComponent;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.ControllerCallbackListener;
 import 
org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE;
 import org.apache.uima.aae.controller.DelegateEndpoint;
 import org.apache.uima.aae.controller.Endpoint;
@@ -51,7 +58,6 @@ import org.apache.uima.aae.error.ErrorHa
 import org.apache.uima.aae.error.Threshold;
 import org.apache.uima.aae.error.Thresholds;
 import org.apache.uima.aae.error.Thresholds.Action;
-import org.apache.uima.aae.error.UimaAsDelegateException;
 import org.apache.uima.aae.error.handler.CpcErrorHandler;
 import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
 import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
@@ -78,7 +84,6 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resource.ResourceManager;
 import org.apache.uima.resource.ResourceSpecifier;
 import 
org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
-import 
org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionType;
 import org.apache.uima.resourceSpecifier.AnalysisEngineType;
 import org.apache.uima.resourceSpecifier.AsyncAggregateErrorConfigurationType;
 import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
@@ -109,6 +114,246 @@ public abstract class AbstractUimaAsServ
        }
     
     protected abstract void addListenerForReplyHandling( 
AggregateAnalysisEngineController controller, Endpoint_impl endpoint, 
RemoteAnalysisEngineDelegate remoteDelegate) throws Exception;
+
+//    public AnalysisEngineController createController( 
AnalysisEngineComponent component, int howManyInstances) throws Exception {
+    public AnalysisEngineController createController( AnalysisEngineComponent 
component, ControllerCallbackListener aListener, String serviceId) throws 
Exception {
+       AnalysisEngineController controller =
+                       createController(component, null /*, 
component.getScaleout() */);
+       controller.setServiceId(serviceId);
+       controller.addControllerCallbackListener(aListener);
+       return controller;
+    }
+
+    /**
+     * Recursively walks through the AE descriptor creating instances of 
AnalysisEngineController
+     * and linking them in parent-child tree. 
+     * 
+     * @param d - wrapper around delegate defined in DD (may be null)
+     * @param resourceSpecifier - AE descriptor specifier
+     * @param name - name of the delegate
+     * @param parentController - reference to a parent controller. TopLevel 
has no parent
+     * @param howManyInstances - scalout for the delegate
+     * 
+     * @return
+     * @throws Exception
+     */
+    public AnalysisEngineController createController( AnalysisEngineComponent 
component, AnalysisEngineController parentController/*, int howManyInstances 
*/) throws Exception {
+
+       AnalysisEngineController controller = null;
+       System.out.println("---------Controller:"+
+                       component.getKey()+
+                       " resourceSpecifier:"+
+                       component.getResourceSpecifier().getClass().getName()+
+                       " 
ResourceCreationSpecifier:"+(component.getResourceSpecifier() instanceof 
ResourceCreationSpecifier) );
+
+       if ( component.isPrimitive()) {
+                       controller = new 
PrimitiveAnalysisEngineController_impl(parentController, component.getKey(), 
component.getResourceSpecifier().getSourceUrlString(),casManager, cache, 10, 
component.getScaleout());
+       } else {
+               // add an endpoint for each delegate in this aggregate. The 
endpoint Map is required
+               // during initialization of an aggregate controller.
+               Map<String, Endpoint> endpoints = new HashMap<>();
+               AggregateAnalysisEngineComponent aggregate;
+               if ( component instanceof AggregateAnalysisEngineComponent) {
+                       aggregate = (AggregateAnalysisEngineComponent)component;
+               } else if ( component instanceof TopLevelServiceComponent) {
+                       aggregate = 
((TopLevelServiceComponent)component).aggregateComponent();
+               } else {
+                       throw new RuntimeException("Expected instance of 
AggregateAnalysisEngineComponent, instead is instanceof 
"+component.getClass().getName());
+               }
+//             List<AnalysisEngineComponent> delegateComponents = 
((AggregateAnalysisEngineComponent)component).getChildren();
+               List<AnalysisEngineComponent> delegateComponents = 
aggregate.getChildren();
+               for( AnalysisEngineComponent delegateComponent : 
delegateComponents ) {
+                       endpoints.put(delegateComponent.getKey(), 
delegateComponent.getEndpoint());
+               }
+               controller = new 
AggregateAnalysisEngineController_impl(parentController, component.getKey(), 
component.getResourceSpecifier().getSourceUrlString(), casManager, cache, 
endpoints);
+               
addFlowController((AggregateAnalysisEngineController)controller, 
(AnalysisEngineDescription)component.getResourceSpecifier());
+               // recursively create delegate controllers for all async 
delegates
+               createDelegateControllers(aggregate, controller);
+       }
+           if ( !controller.isTopLevelComponent() ) {
+                       UimaASService service = 
createUimaASServiceWrapper(controller, component);
+           service.start();
+           }
+
+       return controller;
+    }
+
+       
+
+       private void createDelegateControllers(AggregateAnalysisEngineComponent 
aggregateComponent, AnalysisEngineController controller) throws Exception {
+               for (AnalysisEngineComponent delegateComponent : 
aggregateComponent.getChildren()) {
+                       // if error handling threshold has not been defined for 
the delegate, add
+                       // default thresholds.
+                       addDelegateDefaultErrorHandling(controller, 
delegateComponent.getKey());
+                       if (delegateComponent.isRemote()) {
+                               Endpoint endpoint = 
delegateComponent.getEndpoint();
+                               if ("java".equals(endpoint.getServerURI()) ) {
+                                       endpoint.setJavaRemote();
+                               }
+                               
+                       } else {
+                               if 
(Objects.isNull(controller.getOutputChannel(ENDPOINT_TYPE.DIRECT))) {
+                                       OutputChannel oc = new 
DirectOutputChannel().withController(controller);
+                                       oc.initialize();
+                                       controller.addOutputChannel(oc);
+                               }
+                               if 
(Objects.isNull(controller.getInputChannel(ENDPOINT_TYPE.DIRECT))) {
+                                       DirectInputChannel inputChannel = new 
DirectInputChannel(ChannelType.REQUEST_REPLY)
+                                                       
.withController(controller);
+// 10/11/18 For Direct messaging the message handlers are not needed. Its 
using command factory
+//                                     
inputChannel.setMessageHandler(getMessageHandler(controller));
+                                       
controller.addInputChannel(inputChannel);
+                                       
+                               }
+                               createController(delegateComponent,     
controller /*, scaleout */);
+                       }
+
+               }
+
+       }
+       
+       
+    private UimaASService createUimaASServiceWrapper(AnalysisEngineController 
controller, AnalysisEngineComponent component) throws Exception {
+        
+       AsynchronousUimaASService service = 
+                       new 
AsynchronousUimaASService(controller.getComponentName()).withController(controller);
+       // Need an OutputChannel to dispatch messages from this service
+       OutputChannel outputChannel;
+               if ( ( outputChannel = 
controller.getOutputChannel(ENDPOINT_TYPE.DIRECT)) == null) {
+                       outputChannel = getOutputChannel(controller);
+               }
+        
+       // Need an InputChannel to handle incoming messages
+       InputChannel inputChannel;
+       if ((inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) 
== null) {
+               inputChannel = getInputChannel(controller);
+               Handler messageHandlerChain = getMessageHandler(controller);
+                       inputChannel.setMessageHandler(messageHandlerChain);
+                       controller.addInputChannel(inputChannel);
+       }
+
+               // add reply queue listener to the parent aggregate controller
+               if ( !controller.isTopLevelComponent() ) {
+                       // For every delegate the parent controller needs a 
reply listener.
+                       DirectListener replyListener = 
+                                       addDelegateReplyListener(controller, 
component);
+                       // add process, getMeta, reply queues to an endpoint
+                       setDelegateDestinations(controller, service, 
replyListener);
+               }
+               DirectListener processListener =                
+                               
createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getProcessRequestQueue(),Type.ProcessCAS);
+               inputChannel.registerListener(processListener);
+               
+               DirectListener getMetaListener =
+                               
createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getMetaRequestQueue(),Type.GetMeta);
+               inputChannel.registerListener(getMetaListener);
+               if (controller.isCasMultiplier()) {
+                       DirectListener freCASChannelListener = 
+                               
createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getFreeCasQueue(),Type.FreeCAS);
       
+                       inputChannel.registerListener(freCASChannelListener);
+                       
((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+               }                       
+               
+               /*
+               DirectListener processListener = new 
DirectListener(Type.ProcessCAS).
+                               withController(controller).
+                               withConsumerThreads(component.getScaleout()).
+                               
withInputChannel((DirectInputChannel)inputChannel).
+                               withQueue(service.getProcessRequestQueue()).
+                               initialize();
+               inputChannel.registerListener(processListener);
+               
+               DirectListener getMetaListener = new 
DirectListener(Type.GetMeta).
+                               withController(controller).
+                               withConsumerThreads(getReplyScaleout(d)).
+                               
withInputChannel((DirectInputChannel)inputChannel).
+                               
withQueue(service.getMetaRequestQueue()).initialize();
+               inputChannel.registerListener(getMetaListener);
+
+               if (controller.isCasMultiplier()) {
+                       DirectListener freCASChannelListener = 
+                                       new DirectListener(Type.FreeCAS).
+                                       withController(controller).
+                                       
withConsumerThreads(component.getScaleout()).
+                                       
withInputChannel((DirectInputChannel)inputChannel).
+                                       withQueue(service.getFreeCasQueue()).
+                                       initialize();
+                       inputChannel.registerListener(freCASChannelListener);
+                       
((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue());
+               }
+       */
+       return service;
+    }
+       private DirectListener createDirectListener(AnalysisEngineController 
controller, int scaleout, DirectInputChannel inputChannel, 
BlockingQueue<DirectMessage> q, Type type) throws Exception{
+//             DirectListener listener = new DirectListener(type).
+               return new DirectListener(type).
+                               withController(controller).
+                               withConsumerThreads(scaleout).
+                               withInputChannel(inputChannel).
+                               withQueue(q).initialize();
+//             inputChannel.registerListener(listener);
+//             return listener;
+       }
+    private DirectListener addDelegateReplyListener(AnalysisEngineController 
controller, AnalysisEngineComponent component) throws Exception {
+               DirectInputChannel parentInputChannel;
+               // create parent controller's input channel if necessary
+               if 
((controller.getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT)) == 
null) {
+                       // create delegate 
+                       parentInputChannel = new 
DirectInputChannel(ChannelType.REQUEST_REPLY).
+                                       
withController(controller.getParentController());
+                       Handler messageHandlerChain = 
getMessageHandler(controller.getParentController());
+                       
parentInputChannel.setMessageHandler(messageHandlerChain);
+                       
controller.getParentController().addInputChannel(parentInputChannel);
+               } else {
+                       parentInputChannel = (DirectInputChannel) controller.
+                                       
getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT);
+               }
+               int replyScaleout = 1;
+               if ( component instanceof RemoteAnalysisEngineComponent) {
+                       
((RemoteAnalysisEngineComponent)component).getReplyConsumerCount();
+               }
+
+               // USE FACTORY HERE. CHANGE DirectListener to interface
+               // DirectListner replyListener = 
DirectListenerFactory.newReplyListener();
+               DirectListener replyListener = new DirectListener(Type.Reply).
+                               
withController(controller.getParentController()).
+                               withConsumerThreads(replyScaleout).
+                               withInputChannel(parentInputChannel).
+                               withQueue(new 
LinkedBlockingQueue<DirectMessage>()).
+                               withName(controller.getKey()).
+                               initialize();
+               parentInputChannel.registerListener(replyListener);
+               
+               return replyListener;
+    }
+       protected void initialize(UimaASService service, ComponentCasPool cp, 
Transport transport) {
+
+               resourceManager = UimaClassFactory.produceResourceManager();
+               casManager = new AsynchAECasManager_impl(resourceManager);
+               casManager.setCasPoolSize(cp.getPoolSize());
+               casManager.setDisableJCasCache(cp.isDisableJCasCache());
+               casManager.setInitialFsHeapSize(cp.getInitialHeapSize());
+
+               if ( transport.equals(Transport.JMS)) {
+                       cache = new InProcessCache();
+               } else if ( transport.equals(Transport.Java)) {
+                       
+                       // ?????????????????????????? is this test necessary?
+                       if ( (cache = 
(InProcessCache)System.getProperties().get("InProcessCache")) == null) {
+                               cache = new InProcessCache();
+                               System.getProperties().put("InProcessCache", 
cache);
+                       } 
+       
+               }
+//             if ( cache == null ) {
+//                     cache = new InProcessCache();
+//             }
+       }
+       
+       
+    /* 
+     * OLD CODE *****************************
+     */
     
     public AsyncPrimitiveErrorConfigurationType 
addDefaultErrorHandling(ServiceType s) {
        AsyncPrimitiveErrorConfigurationType pec;
@@ -196,13 +441,16 @@ public abstract class AbstractUimaAsServ
                return null;
     }
     private void addDelegateDefaultErrorHandling(AnalysisEngineController 
controller, String delegatKey) {
-       ErrorHandlerChain erc = controller.getErrorHandlerChain();
-       for( ErrorHandler eh : erc ) {
-               if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
-                       // add default error handling
-                       eh.getEndpointThresholdMap().put(delegatKey, 
Thresholds.newThreshold());
-               }
-       }       
+       if ( Objects.nonNull(controller.getErrorHandlerChain()) ) {
+               ErrorHandlerChain erc = controller.getErrorHandlerChain();
+               for( ErrorHandler eh : erc ) {
+                       if ( 
!eh.getEndpointThresholdMap().containsKey(delegatKey) ) {
+                               // add default error handling
+                               eh.getEndpointThresholdMap().put(delegatKey, 
Thresholds.newThreshold());
+                       }
+               }       
+       }
+ 
     }
     private OutputChannel getOutputChannel(AnalysisEngineController controller 
) throws Exception {
        OutputChannel outputChannel = null;
@@ -1244,7 +1492,7 @@ public abstract class AbstractUimaAsServ
        private boolean isAggregate(AnalysisEngineType aet) {
                // Is this an aggregate? An aggregate has a property async=true 
or has delegates.
                System.out.println("......"+aet.getKey()+" 
aet.getAsync()="+aet.getAsync()+" aet.isSetAsync()="+aet.isSetAsync()+" 
aet.isSetDelegates()="+aet.isSetDelegates() );
-
+               Objects.requireNonNull(aet, "AnalysisEngineType must be 
non-null");
                if ( "true".equals(aet.getAsync()) || aet.isSetDelegates() ) {
                        return true;
                }

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java
 Thu Oct 18 14:12:00 2018
@@ -24,13 +24,12 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import javax.management.ServiceNotFoundException;
-
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.InputChannel.ChannelType;
 import org.apache.uima.aae.OutputChannel;
 import org.apache.uima.aae.UimaClassFactory;
 import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport;
+import org.apache.uima.aae.component.TopLevelServiceComponent;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
 import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -46,7 +45,6 @@ import org.apache.uima.aae.error.handler
 import org.apache.uima.aae.error.handler.ProcessCasErrorHandler;
 import org.apache.uima.aae.handler.Handler;
 import org.apache.uima.aae.service.AsynchronousUimaASService;
-import org.apache.uima.aae.service.ServiceRegistry;
 import org.apache.uima.aae.service.UimaASService;
 import org.apache.uima.aae.service.UimaAsServiceRegistry;
 import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate;
@@ -61,8 +59,6 @@ import org.apache.uima.resource.Resource
 import 
org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument;
 import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType;
 import org.apache.uima.resourceSpecifier.CasPoolType;
-import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType;
-import org.apache.uima.resourceSpecifier.ProcessCasErrorsType;
 import org.apache.uima.resourceSpecifier.ServiceType;
 
 public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder  {
@@ -92,6 +88,144 @@ public class UimaAsDirectServiceBuilder
                }
        }
 
+
+       public UimaASService build(TopLevelServiceComponent topLevelComponent, 
ControllerCallbackListener callback)
+                       throws Exception {
+               AsynchronousUimaASService service = null;
+               
+               // is this the only one resource specifier type supported  by 
the current uima-as?
+               if (topLevelComponent.getResourceSpecifier() instanceof 
AnalysisEngineDescription) {
+                       AnalysisEngineDescription aeDescriptor = 
+                                       (AnalysisEngineDescription) 
topLevelComponent.getResourceSpecifier();
+                       String endpoint = 
resolvePlaceholder(topLevelComponent.getEndpoint().getEndpoint());
+                       // Create a Top Level Service (TLS) wrapper. This 
wrapper may contain
+                       // references to multiple TLS service instances if the 
TLS is scaled
+                       // up.
+                       service = new AsynchronousUimaASService(endpoint)
+                                       
.withName(aeDescriptor.getAnalysisEngineMetaData().getName())
+                                       
.withResourceSpecifier(topLevelComponent.getResourceSpecifier())
+                                       
.withScaleout(topLevelComponent.getScaleout());
+
+                       this.buildAndDeploy(topLevelComponent, service, 
callback);
+                       
+
+               }
+               return service;
+       }
+       public UimaASService buildAndDeploy(TopLevelServiceComponent 
topLevelComponent, AsynchronousUimaASService service, 
ControllerCallbackListener callback) throws Exception {
+               // create ResourceManager, CasManager, and InProcessCache
+               initialize(service, topLevelComponent.getComponentCasPool(), 
Transport.Java); 
+
+               AnalysisEngineController topLevelController = 
+                               createController(topLevelComponent, callback, 
service.getId());
+               
+               //topLevelController.addControllerCallbackListener(callback);
+
+               //topLevelController.setServiceId(service.getId());
+               
+               service.withInProcessCache(super.cache);
+               System.setProperty("BrokerURI", "Direct");
+               configureTopLevelService(topLevelController, service);//, 
topLevelComponent.getScaleout());
+               return service;
+
+       }
+       
+       private DirectOutputChannel outputChannel(AnalysisEngineController 
topLevelController) throws Exception {
+               DirectOutputChannel outputChannel = null;
+               if (topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT) 
== null) {
+                       outputChannel = new 
DirectOutputChannel().withController(topLevelController);
+                       topLevelController.addOutputChannel(outputChannel);
+               } else {
+                       outputChannel = (DirectOutputChannel) 
topLevelController.
+                                       getOutputChannel(ENDPOINT_TYPE.DIRECT);
+               }
+               return outputChannel;
+       }
+       private DirectInputChannel inputChannel(AnalysisEngineController 
topLevelController) throws Exception {
+               DirectInputChannel inputChannel;
+               if ((topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT)) 
== null) {
+                       inputChannel = new 
DirectInputChannel(ChannelType.REQUEST_REPLY).
+                                       withController(topLevelController);
+                       Handler messageHandlerChain = 
getMessageHandler(topLevelController);
+                       inputChannel.setMessageHandler(messageHandlerChain);
+                       topLevelController.addInputChannel(inputChannel);
+               } else {
+                       inputChannel = (DirectInputChannel) topLevelController.
+                                       getInputChannel(ENDPOINT_TYPE.DIRECT);
+               }
+               return inputChannel;
+       }
+/*     
+       private void configureTopLevelService(AnalysisEngineController 
topLevelController,
+                       AsynchronousUimaASService service,  int howMany) throws 
Exception {
+*/
+       private void configureTopLevelService(AnalysisEngineController 
topLevelController,
+                       AsynchronousUimaASService service) throws Exception {
+               
+               //addErrorHandling(topLevelController, pec);
+
+
+               // create a single instance of OutputChannel for Direct 
communication if
+               // necessary
+               DirectOutputChannel outputChannel = 
outputChannel(topLevelController);
+
+               DirectInputChannel inputChannel = 
inputChannel(topLevelController);
+
+               if ( topLevelController instanceof 
AggregateAnalysisEngineController ) {
+                       
((AggregateAnalysisEngineController_impl)topLevelController).
+                               setServiceEndpointName(service.getEndpoint());
+               }
+               BlockingQueue<DirectMessage> pQ = null; 
+               BlockingQueue<DirectMessage> mQ = null; 
+
+               // Lookup queue name in service registry. If this queue exists, 
the new service
+               // being
+               // created here will share the same queue to balance the load.
+               UimaASService s;
+               try {
+                       s = 
UimaAsServiceRegistry.getInstance().lookupByEndpoint(service.getEndpoint());
+                       if ( s instanceof AsynchronousUimaASService) {
+                               pQ = ((AsynchronousUimaASService) 
s).getProcessRequestQueue();
+                               mQ = ((AsynchronousUimaASService) 
s).getMetaRequestQueue();
+                       }
+
+               } catch( Exception ee) {
+                       pQ = service.getProcessRequestQueue();
+                       mQ = service.getMetaRequestQueue();
+               }
+
+               scaleout = service.getScaleout();
+               DirectListener processListener = new 
DirectListener(Type.ProcessCAS).withController(topLevelController)
+                               
.withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(pQ).
+                               initialize();
+
+               DirectListener getMetaListener = new 
DirectListener(Type.GetMeta).withController(topLevelController)
+                               
.withConsumerThreads(1).withInputChannel(inputChannel).
+                               withQueue(mQ).initialize();
+
+               addFreeCASListener(service, topLevelController, inputChannel, 
outputChannel, scaleout );
+
+               inputChannel.registerListener(getMetaListener);
+               inputChannel.registerListener(processListener);
+
+               service.withController(topLevelController);
+               
+       }
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       /* 
+        * OLD CODE 
**********************************************************************************
+        */
+       
+       
        
        public UimaASService build(AnalysisEngineDeploymentDescriptionDocument 
dd, ControllerCallbackListener callback)
                        throws Exception {

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java
 Thu Oct 18 14:12:00 2018
@@ -47,27 +47,30 @@ import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo;
 import org.apache.uima.cas.impl.Serialization;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.resource.metadata.ResourceMetaData;
 import org.apache.uima.util.Level;
 
 public abstract class AbstractUimaAsCommand implements UimaAsCommand {
        protected AnalysisEngineController controller;
        private Object mux = new Object();
-
-       protected AbstractUimaAsCommand(AnalysisEngineController controller) {
+       private final MessageContext messageContext;
+       
+       protected AbstractUimaAsCommand(AnalysisEngineController controller, 
MessageContext aMessageContext) {
                this.controller = controller;
+               this.messageContext = aMessageContext;
        }
 
-       protected String getCasReferenceId(Class<?> concreteClassName, 
MessageContext aMessageContext) throws AsynchAEException {
-               if 
(!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
+       protected String getCasReferenceId(Class<?> concreteClassName/*, 
MessageContext aMessageContext */) throws AsynchAEException {
+               if 
(!messageContext.propertyExists(AsynchAEMessage.CasReference)) {
                        if 
(UIMAFramework.getLogger(concreteClassName).isLoggable(Level.INFO)) {
                                
UIMAFramework.getLogger(concreteClassName).logrb(Level.INFO, 
concreteClassName.getName(),
                                                "getCasReferenceId", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                                                
"UIMAEE_message_has_cas_refid__INFO",
-                                               new Object[] { 
aMessageContext.getEndpoint().getEndpoint() });
+                                               new Object[] { 
messageContext.getEndpoint().getEndpoint() });
                        }
                        return null;
                }
-               return 
aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+               return 
messageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
        }
 
        protected CacheEntry getCacheEntryForCas(String casReferenceId) {
@@ -95,7 +98,7 @@ public abstract class AbstractUimaAsComm
                return (controller.isTopLevelComponent() && controller 
instanceof AggregateAnalysisEngineController);
        }
 
-       protected void handleError(Exception e, CacheEntry cacheEntry, 
MessageContext mc) {
+       protected void handleError(Exception e, CacheEntry cacheEntry/*, 
MessageContext mc */) {
                if 
(UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) {
                        
UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), 
"handleError",
                                        
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING",
@@ -105,7 +108,7 @@ public abstract class AbstractUimaAsComm
                                        
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
                }
                ErrorContext errorContext = new ErrorContext();
-               errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+               errorContext.add(AsynchAEMessage.Endpoint, 
messageContext.getEndpoint());
                errorContext.add(AsynchAEMessage.Command, 
AsynchAEMessage.Process);
                errorContext.add(AsynchAEMessage.CasReference, 
cacheEntry.getCasReferenceId());
                controller.dropCAS(cacheEntry.getCas());
@@ -154,31 +157,69 @@ public abstract class AbstractUimaAsComm
 
        }
 
-       protected static ErrorContext populateErrorContext(MessageContext 
aMessageCtx) {
+       protected ErrorContext populateErrorContext(/*MessageContext 
aMessageCtx */) {
                ErrorContext errorContext = new ErrorContext();
-               if (aMessageCtx != null) {
+               if (messageContext != null) {
                        try {
-                               if 
(aMessageCtx.propertyExists(AsynchAEMessage.Command)) {
+                               if 
(messageContext.propertyExists(AsynchAEMessage.Command)) {
                                        
errorContext.add(AsynchAEMessage.Command,
-                                                       
aMessageCtx.getMessageIntProperty(AsynchAEMessage.Command));
+                                                       
messageContext.getMessageIntProperty(AsynchAEMessage.Command));
                                }
 
-                               if 
(aMessageCtx.propertyExists(AsynchAEMessage.MessageType)) {
+                               if 
(messageContext.propertyExists(AsynchAEMessage.MessageType)) {
                                        
errorContext.add(AsynchAEMessage.MessageType,
-                                                       
aMessageCtx.getMessageIntProperty(AsynchAEMessage.MessageType));
+                                                       
messageContext.getMessageIntProperty(AsynchAEMessage.MessageType));
                                }
 
-                               if 
(aMessageCtx.propertyExists(AsynchAEMessage.CasReference)) {
+                               if 
(messageContext.propertyExists(AsynchAEMessage.CasReference)) {
                                        
errorContext.add(AsynchAEMessage.CasReference,
-                                                       
aMessageCtx.getMessageStringProperty(AsynchAEMessage.CasReference));
+                                                       
messageContext.getMessageStringProperty(AsynchAEMessage.CasReference));
                                }
-                               errorContext.add(UIMAMessage.RawMsg, 
aMessageCtx.getRawMessage());
+                               errorContext.add(UIMAMessage.RawMsg, 
messageContext.getRawMessage());
                        } catch (Exception e) { /* ignore */
                        }
                }
                return errorContext;
        }
-
+       protected Endpoint getEndpoint() {
+               return messageContext.getEndpoint();
+       }
+       protected int getMessageIntProperty(String propertyName) throws 
Exception {
+               return messageContext.getMessageIntProperty(propertyName);
+       }
+       protected String getMessageStringProperty(String propertyName) throws 
Exception {
+               return messageContext.getMessageStringProperty(propertyName);
+       }
+       protected ResourceMetaData getResourceMetaData() throws Exception {
+               return 
(ResourceMetaData)messageContext.getMessageObjectProperty(AsynchAEMessage.AEMetadata);
+       }
+       protected String getStringMessage() throws Exception {
+               return messageContext.getStringMessage();
+       }
+       protected Object getMessageObjectProperty(String propertyName) throws 
Exception {
+               return messageContext.getMessageObjectProperty(propertyName);
+       }
+       protected boolean getMessageBooleanProperty(String propertyName) throws 
Exception {
+               return messageContext.getMessageBooleanProperty(propertyName);
+       }
+       protected long getMessageLongProperty( String propertyName) throws 
Exception {
+               return messageContext.getMessageLongProperty(propertyName);
+       }
+       protected boolean propertyExists(String propertyName) throws Exception {
+               return messageContext.propertyExists(propertyName);
+       }
+       protected String getEndpointName() {
+               return messageContext.getEndpointName();
+       }
+       protected Object getObjectMessage() throws Exception {
+               return messageContext.getObjectMessage();
+       }
+       protected byte[] getByteMessage() throws Exception {
+               return messageContext.getByteMessage();
+       }
+       protected MessageContext getMessageContext() {
+               return messageContext;
+       }
        protected Endpoint fetchParentCasOrigin(String parentCasId) throws 
AsynchAEException {
                Endpoint endpoint = null;
                String parentId = parentCasId;
@@ -224,8 +265,8 @@ public abstract class AbstractUimaAsComm
                return cas;
        }
 
-       protected SerializationResult deserializeChildCAS(String 
casMultiplierDelegateKey, Endpoint endpoint,
-                       MessageContext mc) throws Exception {
+       protected SerializationResult deserializeChildCAS(String 
casMultiplierDelegateKey, Endpoint endpoint
+                       /*MessageContext mc*/) throws Exception {
                SerializationResult result = new SerializationResult();
 
                // Aggregate time spent waiting for a CAS in the shadow cas pool
@@ -250,17 +291,17 @@ public abstract class AbstractUimaAsComm
                // Create deserialized wrapper for XMI, BINARY, COMPRESSED 
formats. To add
                // a new serialization format add a new class which implements
                // UimaASDeserializer and modify DeserializerFactory class.
-               UimaASDeserializer deserializer = 
DeserializerFactory.newDeserializer(endpoint, mc);
+               UimaASDeserializer deserializer = 
DeserializerFactory.newDeserializer(endpoint, messageContext);
                deserializer.deserialize(result);
 
                return result;
        }
 
-       protected SerializationResult deserializeInputCAS( MessageContext mc)
+       protected SerializationResult deserializeInputCAS()
                        throws Exception {
                SerializationResult result = new SerializationResult();
-               String origin = mc.getEndpoint().getEndpoint();
-               Endpoint endpoint = mc.getEndpoint();
+               String origin = messageContext.getEndpoint().getEndpoint();
+               Endpoint endpoint = messageContext.getEndpoint();
                
                // Time how long we wait on Cas Pool to fetch a new CAS
                long t1 = controller.getCpuTime();
@@ -276,20 +317,20 @@ public abstract class AbstractUimaAsComm
                        return null;
                }
 
-               UimaASDeserializer deserializer = 
DeserializerFactory.newDeserializer(endpoint, mc);
+               UimaASDeserializer deserializer = 
DeserializerFactory.newDeserializer(endpoint, messageContext);
                deserializer.deserialize(result);
 
                return result;
        }
-       protected Delegate getDelegate(MessageContext mc) throws 
AsynchAEException {
+       protected Delegate getDelegate(/* MessageContext mc */) throws 
AsynchAEException {
                String delegateKey = null;
-               if (mc.getEndpoint().getEndpoint() == null || 
mc.getEndpoint().getEndpoint().trim().length() == 0) {
-                       String fromEndpoint = 
mc.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+               if (messageContext.getEndpoint().getEndpoint() == null || 
messageContext.getEndpoint().getEndpoint().trim().length() == 0) {
+                       String fromEndpoint = 
messageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
                        delegateKey = ((AggregateAnalysisEngineController) 
controller)
                                        .lookUpDelegateKey(fromEndpoint);
                } else {
                        delegateKey = ((AggregateAnalysisEngineController) 
controller)
-                                       
.lookUpDelegateKey(mc.getEndpoint().getEndpoint());
+                                       
.lookUpDelegateKey(messageContext.getEndpoint().getEndpoint());
                }
                return ((AggregateAnalysisEngineController) 
controller).lookupDelegate(delegateKey);
        }

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java
 Thu Oct 18 14:12:00 2018
@@ -23,14 +23,14 @@ import org.apache.uima.aae.controller.En
 import org.apache.uima.aae.message.MessageContext;
 
 public class CollectionProcessCompleteRequestCommand  extends 
AbstractUimaAsCommand {
-       private MessageContext mc;
+//     private MessageContext mc;
        
        public CollectionProcessCompleteRequestCommand(MessageContext mc, 
AnalysisEngineController controller) {
-               super(controller);
-               this.mc = mc;
+               super(controller, mc);
+//             this.mc = mc;
        }
        public void execute() throws Exception {
-        Endpoint endpoint = mc.getEndpoint();
-        controller.collectionProcessComplete(endpoint);
+//        Endpoint endpoint = mc.getEndpoint();
+        controller.collectionProcessComplete(super.getEndpoint());
        }
 }

Modified: 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff
==============================================================================
--- 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
 (original)
+++ 
uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java
 Thu Oct 18 14:12:00 2018
@@ -26,14 +26,14 @@ import org.apache.uima.aae.message.Async
 import org.apache.uima.aae.message.MessageContext;
 
 public class CollectionProcessCompleteResponseCommand  extends 
AbstractUimaAsCommand {
-       private MessageContext mc;
+//     private MessageContext mc;
        
        public CollectionProcessCompleteResponseCommand(MessageContext mc, 
AnalysisEngineController controller) {
-               super(controller);
-               this.mc = mc;
+               super(controller,mc);
+//             this.mc = mc;
        }
        public void execute() throws Exception {
-               Delegate delegate = super.getDelegate(mc);
+               Delegate delegate = super.getDelegate();
            try {
                System.out.println("..... 
Controller:"+controller.getComponentName()+" Handling CPC From 
"+delegate.getKey());
                  ((AggregateAnalysisEngineController)controller)
@@ -41,7 +41,7 @@ public class CollectionProcessCompleteRe
              } catch (Exception e) {
                ErrorContext errorContext = new ErrorContext();
                errorContext.add(AsynchAEMessage.Command, 
AsynchAEMessage.CollectionProcessComplete);
-               errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint());
+               errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint());
                controller.getErrorHandlerChain().handle(e, errorContext, 
controller);
              }
        }


Reply via email to