Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,267 @@ +package org.apache.uima.aae.component.dd; + +import java.io.File; +import java.util.List; +import java.util.Objects; + +import org.apache.uima.aae.UimaASUtils; +import org.apache.uima.aae.UimaClassFactory; +import org.apache.uima.aae.component.AnalysisEngineComponent; +import org.apache.uima.aae.component.CasMultiplierComponent; +import org.apache.uima.aae.component.CasMultiplierNature; +import org.apache.uima.aae.component.RemoteAnalysisEngineComponent; +import org.apache.uima.aae.component.TopLevelServiceComponent; +import org.apache.uima.aae.component.factory.AnalysisEngineComponentFactory; +import org.apache.uima.resource.ResourceSpecifier; +import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument; +import org.apache.uima.resourceSpecifier.AnalysisEngineType; +import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType; +import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType; +import org.apache.uima.resourceSpecifier.ServiceType; +import org.apache.xmlbeans.XmlDocumentProperties; + +public class DeploymentDescriptorProcessor { + + private AnalysisEngineDeploymentDescriptionDocument dd = null; + public DeploymentDescriptorProcessor() { + + } + public DeploymentDescriptorProcessor(AnalysisEngineDeploymentDescriptionDocument dd) { + this.dd = dd; + } + public AnalysisEngineComponent newComponent(String descriptorPath) throws Exception { + this.dd = parseDD(descriptorPath); + return newComponent(); + } + + public TopLevelServiceComponent newComponent() throws Exception { + ServiceType service = + dd.getAnalysisEngineDeploymentDescription().getDeployment().getService(); + XmlDocumentProperties dp = dd.documentProperties(); + System.out.println(dp.getSourceName()); + + // get absolute path to resource specifier + String aeDescriptor = + UimaASUtils.fixPath(dp.getSourceName(), getDescriptor(service)); + + // Get top level uima resource specifier + ResourceSpecifier resourceSpecifier = + UimaClassFactory.produceResourceSpecifier(aeDescriptor); + + AnalysisEngineComponentFactory componentFactory = + new AnalysisEngineComponentFactory(); + // Process top level AE resource specifier and all its delegates. + // For aggregates, recursively walk through a delegate tree, producing + // a tree of AnalysisEngineComponent instances, one for every delegate. + + AnalysisEngineComponent aeComponent = + componentFactory.produce(resourceSpecifier, null); + + // Decorate above with top level component functionality + TopLevelServiceComponent topLevelComponent = + new TopLevelServiceComponent(aeComponent, dd); + +// if ( aeComponent.isPrimitive()) { +// // The AE descriptor is for a primitive AE +// } else { + // the AE descriptor is for an aggregate AE. Check DD to + // see if its an async aggregate. Its async=true or + // has delegates. +// if ( isAggregate(service.getAnalysisEngine()) ) { + if ( topLevelComponent.isAggregate()) { + // All delegates will be colocated unless + // a delegate is remote. That is determined + // below. +// aeComponent.enableAsync(); + //if ( dd.getAnalysisEngineDeploymentDescription().getDeployment().getProtocol() + DelegateAnalysisEngineType[] colocatedDelegates = null; + if ( Objects.nonNull(service.getAnalysisEngine()) && + Objects.nonNull(service.getAnalysisEngine().getDelegates()) ) { + colocatedDelegates = service.getAnalysisEngine(). + getDelegates(). + getAnalysisEngineArray(); + + } + handleColocatedDelegates(colocatedDelegates, aeComponent.getChildren()); + + RemoteAnalysisEngineType[] remoteDelegates = null; + if ( Objects.nonNull(service.getAnalysisEngine()) && + Objects.nonNull(service.getAnalysisEngine().getDelegates())) { + remoteDelegates = service.getAnalysisEngine(). + getDelegates(). + getRemoteAnalysisEngineArray(); + } + handleRemoteDelegates(remoteDelegates, aeComponent.getChildren()); + +// service.getAnalysisEngine(). +// getDelegates(). + //} + } + + + return topLevelComponent; + } + public AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception { + return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath)); + + } + private boolean isAggregate(AnalysisEngineType aet) { + return ("true".equals(aet.getAsync()) || aet.isSetAsync() || aet.isSetDelegates()); + } + private String getDescriptor(ServiceType service) { + String aeDescriptor = service.getTopDescriptor().getImport().getLocation(); + if ( aeDescriptor == null ) { + aeDescriptor = service.getTopDescriptor().getImport().getName(); + } + return aeDescriptor; + } + + private void markAllDelegatesAsAsync(List<AnalysisEngineComponent> resourceSpecifierDelegates) { + for ( AnalysisEngineComponent aec : resourceSpecifierDelegates ) { + if ( !aec.isPrimitive() ) { + handleColocatedDelegates(null, aec.getChildren()); + } + aec.enableAsync(); + } + + } + private void handleColocatedDelegates(DelegateAnalysisEngineType[] ddDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) { + if ( Objects.isNull(ddDelegates)) { + // the dd does not include delegates but is configured as an asynch service + // so process resource specifiers recursively marking each part of a pipeline + // as asynch so that it is deployed as a collocated asynch service. + handleDefaultColocatedDelegates(resourceSpecifierDelegates); + //markAllDelegatesAsAsync(resourceSpecifierDelegates); + return; + } + // go through all delegates defined in the deployment descriptor (dd) + for( DelegateAnalysisEngineType ddDelegate : ddDelegates ) { + // find a matching delegate in the AE resource specifier + for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) { + if ( ddDelegate.getKey().equals(resourceSpecifierDelegate.getKey())) { + if ( resourceSpecifierDelegate.isCasMultiplier() && Objects.nonNull(ddDelegate.getCasMultiplier())) { + // plugin cas multiplier settings from dd + CasMultiplierNature casMultiplier = + new CasMultiplierComponent(ddDelegate.getCasMultiplier().getDisableJCasCache(), + TypeConverter.convertStringToLong(ddDelegate.getCasMultiplier().getInitialFsHeapSize(), 1000), + ddDelegate.getCasMultiplier().getPoolSize(), + TypeConverter.convertStringToBoolean(ddDelegate.getCasMultiplier().getProcessParentLast(),true) ); + resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier); + + } + resourceSpecifierDelegate.enableAsync(); // delegate is async + + resourceSpecifierDelegate. + withScaleout(Objects.isNull(ddDelegate.getScaleout()) ? 1 :ddDelegate.getScaleout().getNumberOfInstances()). + withRequestThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInputQueueScaleout(), 1)). + withReplyThreadPoolSize( TypeConverter.convertStringToInt(ddDelegate.getInternalReplyQueueScaleout(),1)); + + if ( isAggregate(ddDelegate) ) { + + resourceSpecifierDelegate.enableAsync(); + for ( AnalysisEngineComponent aec : resourceSpecifierDelegate.getChildren() ) { + aec.enableAsync(); + } + if ( ddDelegate.getDelegates() != null ) { + // recursively process collocated delegates + handleColocatedDelegates(ddDelegate.getDelegates().getAnalysisEngineArray() , resourceSpecifierDelegate.getChildren()); + handleRemoteDelegates(ddDelegate.getDelegates().getRemoteAnalysisEngineArray(), resourceSpecifierDelegate.getChildren()); + } + + } + break; // found a match and completed processing it. We are done with it. + } + } + } + } + + private void handleDefaultColocatedDelegates(List<AnalysisEngineComponent> resourceSpecifierDelegates) { + // find a matching delegate in the AE resource specifier + for (AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates) { + if (resourceSpecifierDelegate.isCasMultiplier()) { + // plugin cas multiplier settings from dd + CasMultiplierNature casMultiplier = new CasMultiplierComponent(false, 1000, 1, true); + resourceSpecifierDelegate.enableCasMultiplierNatureWith(casMultiplier); + } + resourceSpecifierDelegate.withScaleout(1). + withRequestThreadPoolSize(1). + withReplyThreadPoolSize(1). + enableAsync(); + + if (!resourceSpecifierDelegate.isPrimitive()) { + handleDefaultColocatedDelegates(resourceSpecifierDelegate.getChildren()); + } + } + } + private void handleRemoteDelegates(RemoteAnalysisEngineType[] remoteDelegates, List<AnalysisEngineComponent> resourceSpecifierDelegates ) { + if ( Objects.isNull(remoteDelegates) ) { + return; + } + for( RemoteAnalysisEngineType remoteDelegateType : remoteDelegates ) { + // find a matching delegate in the AE resource specifier + for( AnalysisEngineComponent resourceSpecifierDelegate : resourceSpecifierDelegates ) { + if ( remoteDelegateType.getKey().equals(resourceSpecifierDelegate.getKey())) { + // find an index of the current component in the list. We + // will decorate this component as a remote, and replace + // it in the list. + int index = + resourceSpecifierDelegates.indexOf(resourceSpecifierDelegate); + // Decorate existing component with remote flavor + RemoteAnalysisEngineComponent remoteDelegate = + new RemoteAnalysisEngineComponent(resourceSpecifierDelegate, remoteDelegateType); + + //replace component with decorated remote + resourceSpecifierDelegates.set(index, remoteDelegate); + } + + } + + } + } +/* + public void parse(DelegateAnalysisEngineType colocatedDelegate, AnalysisEngineComponent component) { + if ( isAggregate(colocatedDelegate) ) { + DelegateAnalysisEngineType[] colocatedDelegates = + colocatedDelegate.getDelegates().getAnalysisEngineArray(); + handleColocatedDelegates(colocatedDelegates, component.getChildren()); + } else { + + } + } +*/ + public static void main(String[] args) { + try { + DeploymentDescriptorProcessor ddp = + new DeploymentDescriptorProcessor(); + ddp.newComponent(args[0]); + } catch( Exception e) { + e.printStackTrace(); + } + } + private static class TypeConverter { + private static int convertStringToInt(String value, int defaultValue) { + int returnValue = defaultValue; + try { + returnValue = Integer.parseInt(value); + } catch( Exception e) { + } + return returnValue; + } + private static boolean convertStringToBoolean(String value, boolean defaultValue) { + boolean returnValue = defaultValue; + try { + returnValue = Boolean.parseBoolean(value); + } catch( Exception e) { + } + return returnValue; + } + private static long convertStringToLong(String value, long defaultValue) { + long returnValue = defaultValue; + try { + returnValue = Long.parseLong(value); + } catch( Exception e) { + } + return returnValue; + } + } +}
Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,55 @@ +package org.apache.uima.aae.component.factory; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.uima.aae.UimaClassFactory; +import org.apache.uima.aae.component.AggregateAnalysisEngineComponent; +import org.apache.uima.aae.component.AnalysisEngineComponent; +import org.apache.uima.aae.component.PrimitiveAnalysisEngineComponent; +import org.apache.uima.analysis_engine.AnalysisEngineDescription; +import org.apache.uima.resource.ResourceSpecifier; + +public class AnalysisEngineComponentFactory { + + private AnalysisEngineDescription getAeDescription(ResourceSpecifier rs) { + return (AnalysisEngineDescription) rs; + } + + public AnalysisEngineComponent produce(ResourceSpecifier rs, String key) throws Exception { + AnalysisEngineDescription aeDescriptor = getAeDescription(rs); + AnalysisEngineComponent component = null; + + if ( aeDescriptor.isPrimitive() ) { + component = new PrimitiveAnalysisEngineComponent(key, rs); + } else { + component = + new AggregateAnalysisEngineComponent(key, rs); + Map<String, ResourceSpecifier> delegates = + aeDescriptor.getDelegateAnalysisEngineSpecifiers(); + for(Entry<String, ResourceSpecifier> delegateEntry: delegates.entrySet() ) { + component.add(produce(delegateEntry.getValue(), delegateEntry.getKey() )); + } + } + + if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed() ) { + component.enableScaleout(); + } + if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() ) { + component.enableCasMultipler(); + } + + return component; + } + public static void main(String[] args ) { + try { + AnalysisEngineComponentFactory factory = + new AnalysisEngineComponentFactory(); + ResourceSpecifier resourceSpecifier = + UimaClassFactory.produceResourceSpecifier(args[0]); + factory.produce(resourceSpecifier, "TopLevel"); + } catch( Exception e) { + e.printStackTrace(); + } + } +} Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Oct 18 14:12:00 2018 @@ -2028,25 +2028,54 @@ implements return retValue; } - private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) { - - // Get the key of the Cas Producer - String casProducer = cacheEntry.getCasProducerAggregateName(); - // CAS is considered new from the point of view of this service IF it was produced by it - boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals( - casProducer)); - if (parent != null && parent.isFailed() && isNewCas) { - return true; // no point to continue if the CAS was produced in this aggregate and its parent - // failed here - } - // If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR - // this component - // is not a Cas Multiplier - if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) { - return true; - } - return false; - } + private boolean forceToDropTheCas(CasStateEntry parent, CacheEntry cacheEntry, FinalStep aStep) { + + // Get the key of the Cas Producer + String casProducer = cacheEntry.getCasProducerAggregateName(); + // CAS is considered new from the point of view of this service IF it was + // produced by it + boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer)); + // force to drop a child CAS if this service is not a Cas Multiplier + if ( isNewCas ) { + if ( !isCasMultiplier()) { + System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS"); + return true; + } + if (parent != null && parent.isFailed()) { + return true; // no point to continue if the CAS was produced in this aggregate and its parent + // failed + } + // If the CAS was generated by this component but the Flow Controller wants to + // drop the CAS OR this component is not a Cas Multiplier + if ( parent != null && parent.getSubordinateCasInPlayCount() == 0 + && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) { + return true; + } + } + + + /* + if (isNewCas && isTopLevelComponent() && !isCasMultiplier()) { + System.out.println(">>>>>>>>>>>>>>>>>>> FORCE TO DROP THE CAS"); + return true; + } + if (parent != null && parent.isFailed() && isNewCas) { + return true; // no point to continue if the CAS was produced in this aggregate and its parent + // failed here + } + // If the CAS was generated by this component but the Flow Controller wants to + // drop the CAS OR + // this component + // is not a Cas Multiplier + if (isNewCas && parent.getSubordinateCasInPlayCount() == 0 + && (aStep.getForceCasToBeDropped() || !isCasMultiplier())) { + return true; + } + + */ + + return false; + } private boolean casHasExceptions(CasStateEntry casStateEntry) { return (casStateEntry.getErrors().size() > 0) ? true : false; Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Thu Oct 18 14:12:00 2018 @@ -40,6 +40,7 @@ import org.apache.uima.aae.jmx.JmxManage import org.apache.uima.aae.jmx.ServiceErrors; import org.apache.uima.aae.jmx.ServiceInfo; import org.apache.uima.aae.jmx.ServicePerformance; +import org.apache.uima.aae.message.Origin; import org.apache.uima.aae.monitor.Monitor; import org.apache.uima.aae.spi.transport.UimaMessageListener; import org.apache.uima.aae.spi.transport.UimaTransport; @@ -55,6 +56,8 @@ public interface AnalysisEngineControlle public static final String AEInstanceCount = "AEInstanceCount"; + public Origin getOrigin(); + public void sendMetadata(Endpoint anEndpoint) throws AsynchAEException; public ControllerLatch getControllerLatch(); Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Thu Oct 18 14:12:00 2018 @@ -78,6 +78,8 @@ import org.apache.uima.aae.jmx.ServiceEr import org.apache.uima.aae.jmx.ServiceInfo; import org.apache.uima.aae.jmx.ServicePerformance; import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.Origin; +import org.apache.uima.aae.message.UimaAsOrigin; import org.apache.uima.aae.monitor.Monitor; import org.apache.uima.aae.monitor.MonitorBaseImpl; import org.apache.uima.aae.monitor.statistics.LongNumericStatistic; @@ -111,6 +113,7 @@ public abstract class BaseAnalysisEngine JMS, DIRECT }; + private final Origin origin; private static final Class<?> CLASS_NAME = BaseAnalysisEngineController.class; private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME"; public enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED }; @@ -284,7 +287,7 @@ public abstract class BaseAnalysisEngine protected abstract void doWarmUp(CAS cas, String casReferenceId) throws Exception; public BaseAnalysisEngineController() { - + origin = new UimaAsOrigin(""); } public BaseAnalysisEngineController(AnalysisEngineController aParentController, @@ -316,7 +319,7 @@ public abstract class BaseAnalysisEngine Map aDestinationMap, JmxManagement aJmxManagement,boolean disableJCasCache) throws Exception { System.out.println("C'tor Called Descriptor:"+aDescriptor); - + origin = new UimaAsOrigin(anEndpointName); casManager = aCasManager; inProcessCache = anInProcessCache; localCache = new LocalCache(this); @@ -529,7 +532,9 @@ public abstract class BaseAnalysisEngine return uimaContext; } - + public Origin getOrigin() { + return origin; + } public void setThreadFactory(ThreadPoolTaskExecutor factory) { threadFactory = factory; } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Thu Oct 18 14:12:00 2018 @@ -21,6 +21,7 @@ package org.apache.uima.aae.controller; import org.apache.uima.aae.error.AsynchAEException; import org.apache.uima.aae.jmx.ServiceInfo; +import org.apache.uima.aae.message.Origin; import org.apache.uima.cas.SerialFormat; import org.apache.uima.cas.impl.TypeSystemImpl; @@ -31,6 +32,12 @@ public interface Endpoint { public static final int DISABLED = 3; + public void setMessageOrigin(Origin origin); + + public Origin getMessageOrigin(); + + public String getUniqueId(); + public boolean isJavaRemote(); public void setJavaRemote(); Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Thu Oct 18 14:12:00 2018 @@ -20,11 +20,13 @@ package org.apache.uima.aae.controller; import java.util.Timer; +import java.util.UUID; import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState; import org.apache.uima.aae.error.AsynchAEException; import org.apache.uima.aae.jmx.ServiceInfo; import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.Origin; import org.apache.uima.cas.SerialFormat; import org.apache.uima.cas.impl.TypeSystemImpl; import org.apache.uima.resource.ResourceSpecifier; @@ -32,6 +34,8 @@ import org.apache.uima.resource.Resource public class Endpoint_impl implements Endpoint, Cloneable { private static final Class<?> CLASS_NAME = Endpoint_impl.class; + private String uniqueId = UUID.randomUUID().toString(); + private volatile boolean javaRemote=false; private volatile Object destination = null; @@ -130,6 +134,18 @@ public class Endpoint_impl implements En private ResourceSpecifier resourceSpecifier; + private Origin messageOrigin; + + public void setMessageOrigin(Origin origin) { + this.messageOrigin = origin; + } + + public Origin getMessageOrigin() { + return messageOrigin; + } + public String getUniqueId() { + return uniqueId; + } public void setJavaRemote() { javaRemote = true; } Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,9 @@ +package org.apache.uima.aae.definition.connectors; + +import org.apache.uima.aae.message.MessageProcessor; + +public abstract class AbstractUimaAsConsumer implements UimaAsConsumer{ + + protected abstract void setMessageProcessor(MessageProcessor processor); + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,5 @@ +package org.apache.uima.aae.definition.connectors; + +public interface ComponentConnector { + public Object getConnectionInfo(); +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,31 @@ +package org.apache.uima.aae.definition.connectors; + +import org.apache.uima.aae.definition.connectors.basic.BasicConnector; +import org.apache.uima.aae.definition.connectors.basic.DirectConnector; + +public class ConnectorFactory { + + public static ComponentConnector newConnector(String protocol, String vendor) { + ComponentConnector connector=null; + switch(protocol.toLowerCase()) { + case "jms": + connector = getJmsConnector(vendor); + break; + case "direct": + connector = new DirectConnector(); + break; + + default: + connector = new BasicConnector(); + } + return connector; + } + private static ComponentConnector getJmsConnector(String vendor) { + return null; + } + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,7 @@ +package org.apache.uima.aae.definition.connectors; + +public interface ListenerCallback { + public void onInitializationError(Exception e); + public boolean failedInitialization(); + public Exception getException(); +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,11 @@ +package org.apache.uima.aae.definition.connectors; + +import java.util.Map; + +public interface UimaAsConnector { + + public UimaAsEndpoint createEndpoint(String uri, Map<String, Object> params) + throws Exception; + + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,17 @@ +package org.apache.uima.aae.definition.connectors; + +import org.apache.uima.aae.Lifecycle; +import org.apache.uima.aae.controller.AnalysisEngineController; +import org.apache.uima.as.client.DirectMessage; + +public interface UimaAsConsumer extends Lifecycle { + public enum ConsumerType {GetMeta,ProcessCAS,Cpc,FreeCAS,Reply,Info}; + + public void initialize() throws Exception; + public void initialize(AnalysisEngineController controller) throws Exception; + + public void consume(DirectMessage message) throws Exception; + + public ConsumerType getType(); + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,15 @@ +package org.apache.uima.aae.definition.connectors; + +import org.apache.uima.aae.Lifecycle; +import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType; +import org.apache.uima.aae.message.MessageContext; + +public interface UimaAsEndpoint extends Lifecycle { + public UimaAsProducer createProducer(String targetUri) throws Exception; + public UimaAsProducer createProducer(UimaAsConsumer consumer, String delegateKey) throws Exception; + public UimaAsConsumer createConsumer(String targetUri, ConsumerType type, int consumerThreadCount) throws Exception; + public void dispatch(MessageContext messageContext) throws Exception; + public UimaAsConsumer getConsumer(String targetUri, ConsumerType type); + public MessageContext createMessage(int command, int messageType, Endpoint endpoint); +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,13 @@ +package org.apache.uima.aae.definition.connectors; + +import org.apache.uima.aae.Lifecycle; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.UimaAsMessage; +import org.apache.uima.as.client.DirectMessage; + +public interface UimaAsProducer extends Lifecycle { + + public void dispatch(DirectMessage message) throws Exception; + public void dispatch(DirectMessage message, UimaAsConsumer target) throws Exception; + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,12 @@ +package org.apache.uima.aae.definition.connectors.basic; + +import org.apache.uima.aae.definition.connectors.ComponentConnector; + +public class BasicConnector implements ComponentConnector { + + @Override + public Object getConnectionInfo() { + return ""; + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,13 @@ +package org.apache.uima.aae.definition.connectors.basic; + +import org.apache.uima.aae.definition.connectors.ComponentConnector; + +public class DirectConnector implements ComponentConnector { + + @Override + public Object getConnectionInfo() { + // TODO Auto-generated method stub + return null; + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,36 @@ +package org.apache.uima.aae.definition.connectors.jms; + +import org.apache.uima.aae.definition.connectors.ComponentConnector; + +public class ActiveMqConnector implements ComponentConnector { + private final Object connection; + + ActiveMqConnector(String queue, String broker, int prefetch) { + connection = new ActiveMqConnection(queue, broker, prefetch); + } + @Override + public Object getConnectionInfo() { + return connection; + } + + public class ActiveMqConnection { + private final String queueName; + private final String brokerUrl; + private final int prefetch; + + ActiveMqConnection(String queue, String broker, int prefetch) { + this.queueName = queue; + this.brokerUrl = broker; + this.prefetch = prefetch; + } + public String getQueueName() { + return queueName; + } + public String getBrokerUrl() { + return brokerUrl; + } + public int getPrefetch() { + return prefetch; + } + } +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,9 @@ +package org.apache.uima.aae.message; + +import org.apache.uima.aae.controller.AnalysisEngineController; + +public interface MessageProcessor { + + public void process(MessageContext message) throws Exception; + public AnalysisEngineController getController(); +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,6 @@ +package org.apache.uima.aae.message; + +public interface Origin { + public String getUniqueId(); + public String getName(); +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,36 @@ +package org.apache.uima.aae.message; + +import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.error.AsynchAEException; + +public interface UimaAsMessage { + public enum Command {GetMetaRequest, GetMetaResponse, CpcRegeuest, CpcResponse, ProcessRequest, ProcessResponse }; + + public String getMessageStringProperty(String aMessagePropertyName) throws AsynchAEException; + + public int getMessageIntProperty(String aMessagePropertyName) throws AsynchAEException; + + public long getMessageLongProperty(String aMessagePropertyName) throws AsynchAEException; + + public Object getMessageObjectProperty(String aMessagePropertyName) throws AsynchAEException; + + public boolean getMessageBooleanProperty(String aMessagePropertyName) throws AsynchAEException; + + public Endpoint getEndpoint(); + + public String getStringMessage() throws AsynchAEException; + + public Object getObjectMessage() throws AsynchAEException; + + public byte[] getByteMessage() throws AsynchAEException; + + public Object getRawMessage(); + + public boolean propertyExists(String aKey) throws AsynchAEException; + + public void setMessageArrivalTime(long anArrivalTime); + + public long getMessageArrivalTime(); + + public String getEndpointName(); +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,55 @@ +package org.apache.uima.aae.message; + +import java.util.UUID; + +public class UimaAsOrigin implements Origin { + + private final String uniqueId = UUID.randomUUID().toString(); + private final String name; + + public UimaAsOrigin(String name) { + this.name = name; + } + @Override + public String getUniqueId() { + return uniqueId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((uniqueId == null) ? 0 : uniqueId.hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + UimaAsOrigin other = (UimaAsOrigin) obj; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + if (uniqueId == null) { + if (other.uniqueId != null) + return false; + } else if (!uniqueId.equals(other.uniqueId)) + return false; + return true; + } + @Override + public String getName() { + return name; + } + @Override + public String toString() { + return "Origin[name: " + name + "] [id:"+uniqueId+"]"; + } +} Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java Thu Oct 18 14:12:00 2018 @@ -20,6 +20,7 @@ package org.apache.uima.aae.service; import java.util.concurrent.BlockingQueue; +import org.apache.uima.aae.InProcessCache; import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData; import org.apache.uima.as.client.DirectMessage; import org.apache.uima.cas.CAS; @@ -34,6 +35,7 @@ public interface UimaASService { public static final int STOP_NOW = 1001; public String getEndpoint(); + public int getScaleout(); public String getId(); public void start() throws Exception; public void stop() throws Exception; @@ -46,5 +48,5 @@ public interface UimaASService { public void releaseCAS(String casReferenceId, BlockingQueue<DirectMessage> releaseCASQueue ) throws Exception; public AnalysisEngineMetaData getMetaData() throws Exception; public void removeFromCache(String casReferenceId); - + public UimaASService withInProcessCache(InProcessCache cache); } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java Thu Oct 18 14:12:00 2018 @@ -24,7 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -37,9 +38,15 @@ import org.apache.uima.aae.OutputChannel import org.apache.uima.aae.UimaASUtils; import org.apache.uima.aae.UimaClassFactory; import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport; +import org.apache.uima.aae.component.AggregateAnalysisEngineComponent; +import org.apache.uima.aae.component.AnalysisEngineComponent; +import org.apache.uima.aae.component.ComponentCasPool; +import org.apache.uima.aae.component.RemoteAnalysisEngineComponent; +import org.apache.uima.aae.component.TopLevelServiceComponent; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl; import org.apache.uima.aae.controller.AnalysisEngineController; +import org.apache.uima.aae.controller.ControllerCallbackListener; import org.apache.uima.aae.controller.BaseAnalysisEngineController.ENDPOINT_TYPE; import org.apache.uima.aae.controller.DelegateEndpoint; import org.apache.uima.aae.controller.Endpoint; @@ -51,7 +58,6 @@ import org.apache.uima.aae.error.ErrorHa import org.apache.uima.aae.error.Threshold; import org.apache.uima.aae.error.Thresholds; import org.apache.uima.aae.error.Thresholds.Action; -import org.apache.uima.aae.error.UimaAsDelegateException; import org.apache.uima.aae.error.handler.CpcErrorHandler; import org.apache.uima.aae.error.handler.GetMetaErrorHandler; import org.apache.uima.aae.error.handler.ProcessCasErrorHandler; @@ -78,7 +84,6 @@ import org.apache.uima.resource.Resource import org.apache.uima.resource.ResourceManager; import org.apache.uima.resource.ResourceSpecifier; import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument; -import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionType; import org.apache.uima.resourceSpecifier.AnalysisEngineType; import org.apache.uima.resourceSpecifier.AsyncAggregateErrorConfigurationType; import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType; @@ -109,6 +114,246 @@ public abstract class AbstractUimaAsServ } protected abstract void addListenerForReplyHandling( AggregateAnalysisEngineController controller, Endpoint_impl endpoint, RemoteAnalysisEngineDelegate remoteDelegate) throws Exception; + +// public AnalysisEngineController createController( AnalysisEngineComponent component, int howManyInstances) throws Exception { + public AnalysisEngineController createController( AnalysisEngineComponent component, ControllerCallbackListener aListener, String serviceId) throws Exception { + AnalysisEngineController controller = + createController(component, null /*, component.getScaleout() */); + controller.setServiceId(serviceId); + controller.addControllerCallbackListener(aListener); + return controller; + } + + /** + * Recursively walks through the AE descriptor creating instances of AnalysisEngineController + * and linking them in parent-child tree. + * + * @param d - wrapper around delegate defined in DD (may be null) + * @param resourceSpecifier - AE descriptor specifier + * @param name - name of the delegate + * @param parentController - reference to a parent controller. TopLevel has no parent + * @param howManyInstances - scalout for the delegate + * + * @return + * @throws Exception + */ + public AnalysisEngineController createController( AnalysisEngineComponent component, AnalysisEngineController parentController/*, int howManyInstances */) throws Exception { + + AnalysisEngineController controller = null; + System.out.println("---------Controller:"+ + component.getKey()+ + " resourceSpecifier:"+ + component.getResourceSpecifier().getClass().getName()+ + " ResourceCreationSpecifier:"+(component.getResourceSpecifier() instanceof ResourceCreationSpecifier) ); + + if ( component.isPrimitive()) { + controller = new PrimitiveAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(),casManager, cache, 10, component.getScaleout()); + } else { + // add an endpoint for each delegate in this aggregate. The endpoint Map is required + // during initialization of an aggregate controller. + Map<String, Endpoint> endpoints = new HashMap<>(); + AggregateAnalysisEngineComponent aggregate; + if ( component instanceof AggregateAnalysisEngineComponent) { + aggregate = (AggregateAnalysisEngineComponent)component; + } else if ( component instanceof TopLevelServiceComponent) { + aggregate = ((TopLevelServiceComponent)component).aggregateComponent(); + } else { + throw new RuntimeException("Expected instance of AggregateAnalysisEngineComponent, instead is instanceof "+component.getClass().getName()); + } +// List<AnalysisEngineComponent> delegateComponents = ((AggregateAnalysisEngineComponent)component).getChildren(); + List<AnalysisEngineComponent> delegateComponents = aggregate.getChildren(); + for( AnalysisEngineComponent delegateComponent : delegateComponents ) { + endpoints.put(delegateComponent.getKey(), delegateComponent.getEndpoint()); + } + controller = new AggregateAnalysisEngineController_impl(parentController, component.getKey(), component.getResourceSpecifier().getSourceUrlString(), casManager, cache, endpoints); + addFlowController((AggregateAnalysisEngineController)controller, (AnalysisEngineDescription)component.getResourceSpecifier()); + // recursively create delegate controllers for all async delegates + createDelegateControllers(aggregate, controller); + } + if ( !controller.isTopLevelComponent() ) { + UimaASService service = createUimaASServiceWrapper(controller, component); + service.start(); + } + + return controller; + } + + + + private void createDelegateControllers(AggregateAnalysisEngineComponent aggregateComponent, AnalysisEngineController controller) throws Exception { + for (AnalysisEngineComponent delegateComponent : aggregateComponent.getChildren()) { + // if error handling threshold has not been defined for the delegate, add + // default thresholds. + addDelegateDefaultErrorHandling(controller, delegateComponent.getKey()); + if (delegateComponent.isRemote()) { + Endpoint endpoint = delegateComponent.getEndpoint(); + if ("java".equals(endpoint.getServerURI()) ) { + endpoint.setJavaRemote(); + } + + } else { + if (Objects.isNull(controller.getOutputChannel(ENDPOINT_TYPE.DIRECT))) { + OutputChannel oc = new DirectOutputChannel().withController(controller); + oc.initialize(); + controller.addOutputChannel(oc); + } + if (Objects.isNull(controller.getInputChannel(ENDPOINT_TYPE.DIRECT))) { + DirectInputChannel inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY) + .withController(controller); +// 10/11/18 For Direct messaging the message handlers are not needed. Its using command factory +// inputChannel.setMessageHandler(getMessageHandler(controller)); + controller.addInputChannel(inputChannel); + + } + createController(delegateComponent, controller /*, scaleout */); + } + + } + + } + + + private UimaASService createUimaASServiceWrapper(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception { + + AsynchronousUimaASService service = + new AsynchronousUimaASService(controller.getComponentName()).withController(controller); + // Need an OutputChannel to dispatch messages from this service + OutputChannel outputChannel; + if ( ( outputChannel = controller.getOutputChannel(ENDPOINT_TYPE.DIRECT)) == null) { + outputChannel = getOutputChannel(controller); + } + + // Need an InputChannel to handle incoming messages + InputChannel inputChannel; + if ((inputChannel = controller.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) { + inputChannel = getInputChannel(controller); + Handler messageHandlerChain = getMessageHandler(controller); + inputChannel.setMessageHandler(messageHandlerChain); + controller.addInputChannel(inputChannel); + } + + // add reply queue listener to the parent aggregate controller + if ( !controller.isTopLevelComponent() ) { + // For every delegate the parent controller needs a reply listener. + DirectListener replyListener = + addDelegateReplyListener(controller, component); + // add process, getMeta, reply queues to an endpoint + setDelegateDestinations(controller, service, replyListener); + } + DirectListener processListener = + createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getProcessRequestQueue(),Type.ProcessCAS); + inputChannel.registerListener(processListener); + + DirectListener getMetaListener = + createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getMetaRequestQueue(),Type.GetMeta); + inputChannel.registerListener(getMetaListener); + if (controller.isCasMultiplier()) { + DirectListener freCASChannelListener = + createDirectListener(controller,component.getScaleout(),(DirectInputChannel)inputChannel,service.getFreeCasQueue(),Type.FreeCAS); + inputChannel.registerListener(freCASChannelListener); + ((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue()); + } + + /* + DirectListener processListener = new DirectListener(Type.ProcessCAS). + withController(controller). + withConsumerThreads(component.getScaleout()). + withInputChannel((DirectInputChannel)inputChannel). + withQueue(service.getProcessRequestQueue()). + initialize(); + inputChannel.registerListener(processListener); + + DirectListener getMetaListener = new DirectListener(Type.GetMeta). + withController(controller). + withConsumerThreads(getReplyScaleout(d)). + withInputChannel((DirectInputChannel)inputChannel). + withQueue(service.getMetaRequestQueue()).initialize(); + inputChannel.registerListener(getMetaListener); + + if (controller.isCasMultiplier()) { + DirectListener freCASChannelListener = + new DirectListener(Type.FreeCAS). + withController(controller). + withConsumerThreads(component.getScaleout()). + withInputChannel((DirectInputChannel)inputChannel). + withQueue(service.getFreeCasQueue()). + initialize(); + inputChannel.registerListener(freCASChannelListener); + ((DirectOutputChannel)outputChannel).setFreeCASQueue(service.getFreeCasQueue()); + } + */ + return service; + } + private DirectListener createDirectListener(AnalysisEngineController controller, int scaleout, DirectInputChannel inputChannel, BlockingQueue<DirectMessage> q, Type type) throws Exception{ +// DirectListener listener = new DirectListener(type). + return new DirectListener(type). + withController(controller). + withConsumerThreads(scaleout). + withInputChannel(inputChannel). + withQueue(q).initialize(); +// inputChannel.registerListener(listener); +// return listener; + } + private DirectListener addDelegateReplyListener(AnalysisEngineController controller, AnalysisEngineComponent component) throws Exception { + DirectInputChannel parentInputChannel; + // create parent controller's input channel if necessary + if ((controller.getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) { + // create delegate + parentInputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY). + withController(controller.getParentController()); + Handler messageHandlerChain = getMessageHandler(controller.getParentController()); + parentInputChannel.setMessageHandler(messageHandlerChain); + controller.getParentController().addInputChannel(parentInputChannel); + } else { + parentInputChannel = (DirectInputChannel) controller. + getParentController().getInputChannel(ENDPOINT_TYPE.DIRECT); + } + int replyScaleout = 1; + if ( component instanceof RemoteAnalysisEngineComponent) { + ((RemoteAnalysisEngineComponent)component).getReplyConsumerCount(); + } + + // USE FACTORY HERE. CHANGE DirectListener to interface + // DirectListner replyListener = DirectListenerFactory.newReplyListener(); + DirectListener replyListener = new DirectListener(Type.Reply). + withController(controller.getParentController()). + withConsumerThreads(replyScaleout). + withInputChannel(parentInputChannel). + withQueue(new LinkedBlockingQueue<DirectMessage>()). + withName(controller.getKey()). + initialize(); + parentInputChannel.registerListener(replyListener); + + return replyListener; + } + protected void initialize(UimaASService service, ComponentCasPool cp, Transport transport) { + + resourceManager = UimaClassFactory.produceResourceManager(); + casManager = new AsynchAECasManager_impl(resourceManager); + casManager.setCasPoolSize(cp.getPoolSize()); + casManager.setDisableJCasCache(cp.isDisableJCasCache()); + casManager.setInitialFsHeapSize(cp.getInitialHeapSize()); + + if ( transport.equals(Transport.JMS)) { + cache = new InProcessCache(); + } else if ( transport.equals(Transport.Java)) { + + // ?????????????????????????? is this test necessary? + if ( (cache = (InProcessCache)System.getProperties().get("InProcessCache")) == null) { + cache = new InProcessCache(); + System.getProperties().put("InProcessCache", cache); + } + + } +// if ( cache == null ) { +// cache = new InProcessCache(); +// } + } + + + /* + * OLD CODE ***************************** + */ public AsyncPrimitiveErrorConfigurationType addDefaultErrorHandling(ServiceType s) { AsyncPrimitiveErrorConfigurationType pec; @@ -196,13 +441,16 @@ public abstract class AbstractUimaAsServ return null; } private void addDelegateDefaultErrorHandling(AnalysisEngineController controller, String delegatKey) { - ErrorHandlerChain erc = controller.getErrorHandlerChain(); - for( ErrorHandler eh : erc ) { - if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) { - // add default error handling - eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold()); - } - } + if ( Objects.nonNull(controller.getErrorHandlerChain()) ) { + ErrorHandlerChain erc = controller.getErrorHandlerChain(); + for( ErrorHandler eh : erc ) { + if ( !eh.getEndpointThresholdMap().containsKey(delegatKey) ) { + // add default error handling + eh.getEndpointThresholdMap().put(delegatKey, Thresholds.newThreshold()); + } + } + } + } private OutputChannel getOutputChannel(AnalysisEngineController controller ) throws Exception { OutputChannel outputChannel = null; @@ -1244,7 +1492,7 @@ public abstract class AbstractUimaAsServ private boolean isAggregate(AnalysisEngineType aet) { // Is this an aggregate? An aggregate has a property async=true or has delegates. System.out.println("......"+aet.getKey()+" aet.getAsync()="+aet.getAsync()+" aet.isSetAsync()="+aet.isSetAsync()+" aet.isSetDelegates()="+aet.isSetDelegates() ); - + Objects.requireNonNull(aet, "AnalysisEngineType must be non-null"); if ( "true".equals(aet.getAsync()) || aet.isSetDelegates() ) { return true; } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java Thu Oct 18 14:12:00 2018 @@ -24,13 +24,12 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import javax.management.ServiceNotFoundException; - import org.apache.uima.aae.InputChannel; import org.apache.uima.aae.InputChannel.ChannelType; import org.apache.uima.aae.OutputChannel; import org.apache.uima.aae.UimaClassFactory; import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport; +import org.apache.uima.aae.component.TopLevelServiceComponent; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl; import org.apache.uima.aae.controller.AnalysisEngineController; @@ -46,7 +45,6 @@ import org.apache.uima.aae.error.handler import org.apache.uima.aae.error.handler.ProcessCasErrorHandler; import org.apache.uima.aae.handler.Handler; import org.apache.uima.aae.service.AsynchronousUimaASService; -import org.apache.uima.aae.service.ServiceRegistry; import org.apache.uima.aae.service.UimaASService; import org.apache.uima.aae.service.UimaAsServiceRegistry; import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate; @@ -61,8 +59,6 @@ import org.apache.uima.resource.Resource import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument; import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType; import org.apache.uima.resourceSpecifier.CasPoolType; -import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType; -import org.apache.uima.resourceSpecifier.ProcessCasErrorsType; import org.apache.uima.resourceSpecifier.ServiceType; public class UimaAsDirectServiceBuilder extends AbstractUimaAsServiceBuilder { @@ -92,6 +88,144 @@ public class UimaAsDirectServiceBuilder } } + + public UimaASService build(TopLevelServiceComponent topLevelComponent, ControllerCallbackListener callback) + throws Exception { + AsynchronousUimaASService service = null; + + // is this the only one resource specifier type supported by the current uima-as? + if (topLevelComponent.getResourceSpecifier() instanceof AnalysisEngineDescription) { + AnalysisEngineDescription aeDescriptor = + (AnalysisEngineDescription) topLevelComponent.getResourceSpecifier(); + String endpoint = resolvePlaceholder(topLevelComponent.getEndpoint().getEndpoint()); + // Create a Top Level Service (TLS) wrapper. This wrapper may contain + // references to multiple TLS service instances if the TLS is scaled + // up. + service = new AsynchronousUimaASService(endpoint) + .withName(aeDescriptor.getAnalysisEngineMetaData().getName()) + .withResourceSpecifier(topLevelComponent.getResourceSpecifier()) + .withScaleout(topLevelComponent.getScaleout()); + + this.buildAndDeploy(topLevelComponent, service, callback); + + + } + return service; + } + public UimaASService buildAndDeploy(TopLevelServiceComponent topLevelComponent, AsynchronousUimaASService service, ControllerCallbackListener callback) throws Exception { + // create ResourceManager, CasManager, and InProcessCache + initialize(service, topLevelComponent.getComponentCasPool(), Transport.Java); + + AnalysisEngineController topLevelController = + createController(topLevelComponent, callback, service.getId()); + + //topLevelController.addControllerCallbackListener(callback); + + //topLevelController.setServiceId(service.getId()); + + service.withInProcessCache(super.cache); + System.setProperty("BrokerURI", "Direct"); + configureTopLevelService(topLevelController, service);//, topLevelComponent.getScaleout()); + return service; + + } + + private DirectOutputChannel outputChannel(AnalysisEngineController topLevelController) throws Exception { + DirectOutputChannel outputChannel = null; + if (topLevelController.getOutputChannel(ENDPOINT_TYPE.DIRECT) == null) { + outputChannel = new DirectOutputChannel().withController(topLevelController); + topLevelController.addOutputChannel(outputChannel); + } else { + outputChannel = (DirectOutputChannel) topLevelController. + getOutputChannel(ENDPOINT_TYPE.DIRECT); + } + return outputChannel; + } + private DirectInputChannel inputChannel(AnalysisEngineController topLevelController) throws Exception { + DirectInputChannel inputChannel; + if ((topLevelController.getInputChannel(ENDPOINT_TYPE.DIRECT)) == null) { + inputChannel = new DirectInputChannel(ChannelType.REQUEST_REPLY). + withController(topLevelController); + Handler messageHandlerChain = getMessageHandler(topLevelController); + inputChannel.setMessageHandler(messageHandlerChain); + topLevelController.addInputChannel(inputChannel); + } else { + inputChannel = (DirectInputChannel) topLevelController. + getInputChannel(ENDPOINT_TYPE.DIRECT); + } + return inputChannel; + } +/* + private void configureTopLevelService(AnalysisEngineController topLevelController, + AsynchronousUimaASService service, int howMany) throws Exception { +*/ + private void configureTopLevelService(AnalysisEngineController topLevelController, + AsynchronousUimaASService service) throws Exception { + + //addErrorHandling(topLevelController, pec); + + + // create a single instance of OutputChannel for Direct communication if + // necessary + DirectOutputChannel outputChannel = outputChannel(topLevelController); + + DirectInputChannel inputChannel = inputChannel(topLevelController); + + if ( topLevelController instanceof AggregateAnalysisEngineController ) { + ((AggregateAnalysisEngineController_impl)topLevelController). + setServiceEndpointName(service.getEndpoint()); + } + BlockingQueue<DirectMessage> pQ = null; + BlockingQueue<DirectMessage> mQ = null; + + // Lookup queue name in service registry. If this queue exists, the new service + // being + // created here will share the same queue to balance the load. + UimaASService s; + try { + s = UimaAsServiceRegistry.getInstance().lookupByEndpoint(service.getEndpoint()); + if ( s instanceof AsynchronousUimaASService) { + pQ = ((AsynchronousUimaASService) s).getProcessRequestQueue(); + mQ = ((AsynchronousUimaASService) s).getMetaRequestQueue(); + } + + } catch( Exception ee) { + pQ = service.getProcessRequestQueue(); + mQ = service.getMetaRequestQueue(); + } + + scaleout = service.getScaleout(); + DirectListener processListener = new DirectListener(Type.ProcessCAS).withController(topLevelController) + .withConsumerThreads(scaleout).withInputChannel(inputChannel).withQueue(pQ). + initialize(); + + DirectListener getMetaListener = new DirectListener(Type.GetMeta).withController(topLevelController) + .withConsumerThreads(1).withInputChannel(inputChannel). + withQueue(mQ).initialize(); + + addFreeCASListener(service, topLevelController, inputChannel, outputChannel, scaleout ); + + inputChannel.registerListener(getMetaListener); + inputChannel.registerListener(processListener); + + service.withController(topLevelController); + + } + + + + + + + + + + + /* + * OLD CODE ********************************************************************************** + */ + + public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback) throws Exception { Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java Thu Oct 18 14:12:00 2018 @@ -47,27 +47,30 @@ import org.apache.uima.cas.SerialFormat; import org.apache.uima.cas.impl.BinaryCasSerDes6.ReuseInfo; import org.apache.uima.cas.impl.Serialization; import org.apache.uima.cas.impl.XmiSerializationSharedData; +import org.apache.uima.resource.metadata.ResourceMetaData; import org.apache.uima.util.Level; public abstract class AbstractUimaAsCommand implements UimaAsCommand { protected AnalysisEngineController controller; private Object mux = new Object(); - - protected AbstractUimaAsCommand(AnalysisEngineController controller) { + private final MessageContext messageContext; + + protected AbstractUimaAsCommand(AnalysisEngineController controller, MessageContext aMessageContext) { this.controller = controller; + this.messageContext = aMessageContext; } - protected String getCasReferenceId(Class<?> concreteClassName, MessageContext aMessageContext) throws AsynchAEException { - if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) { + protected String getCasReferenceId(Class<?> concreteClassName/*, MessageContext aMessageContext */) throws AsynchAEException { + if (!messageContext.propertyExists(AsynchAEMessage.CasReference)) { if (UIMAFramework.getLogger(concreteClassName).isLoggable(Level.INFO)) { UIMAFramework.getLogger(concreteClassName).logrb(Level.INFO, concreteClassName.getName(), "getCasReferenceId", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_cas_refid__INFO", - new Object[] { aMessageContext.getEndpoint().getEndpoint() }); + new Object[] { messageContext.getEndpoint().getEndpoint() }); } return null; } - return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference); + return messageContext.getMessageStringProperty(AsynchAEMessage.CasReference); } protected CacheEntry getCacheEntryForCas(String casReferenceId) { @@ -95,7 +98,7 @@ public abstract class AbstractUimaAsComm return (controller.isTopLevelComponent() && controller instanceof AggregateAnalysisEngineController); } - protected void handleError(Exception e, CacheEntry cacheEntry, MessageContext mc) { + protected void handleError(Exception e, CacheEntry cacheEntry/*, MessageContext mc */) { if (UIMAFramework.getLogger(getClass()).isLoggable(Level.WARNING)) { UIMAFramework.getLogger(getClass()).logrb(Level.WARNING, getClass().getName(), "handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_exception_WARNING", @@ -105,7 +108,7 @@ public abstract class AbstractUimaAsComm UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); } ErrorContext errorContext = new ErrorContext(); - errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint()); + errorContext.add(AsynchAEMessage.Endpoint, messageContext.getEndpoint()); errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process); errorContext.add(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId()); controller.dropCAS(cacheEntry.getCas()); @@ -154,31 +157,69 @@ public abstract class AbstractUimaAsComm } - protected static ErrorContext populateErrorContext(MessageContext aMessageCtx) { + protected ErrorContext populateErrorContext(/*MessageContext aMessageCtx */) { ErrorContext errorContext = new ErrorContext(); - if (aMessageCtx != null) { + if (messageContext != null) { try { - if (aMessageCtx.propertyExists(AsynchAEMessage.Command)) { + if (messageContext.propertyExists(AsynchAEMessage.Command)) { errorContext.add(AsynchAEMessage.Command, - aMessageCtx.getMessageIntProperty(AsynchAEMessage.Command)); + messageContext.getMessageIntProperty(AsynchAEMessage.Command)); } - if (aMessageCtx.propertyExists(AsynchAEMessage.MessageType)) { + if (messageContext.propertyExists(AsynchAEMessage.MessageType)) { errorContext.add(AsynchAEMessage.MessageType, - aMessageCtx.getMessageIntProperty(AsynchAEMessage.MessageType)); + messageContext.getMessageIntProperty(AsynchAEMessage.MessageType)); } - if (aMessageCtx.propertyExists(AsynchAEMessage.CasReference)) { + if (messageContext.propertyExists(AsynchAEMessage.CasReference)) { errorContext.add(AsynchAEMessage.CasReference, - aMessageCtx.getMessageStringProperty(AsynchAEMessage.CasReference)); + messageContext.getMessageStringProperty(AsynchAEMessage.CasReference)); } - errorContext.add(UIMAMessage.RawMsg, aMessageCtx.getRawMessage()); + errorContext.add(UIMAMessage.RawMsg, messageContext.getRawMessage()); } catch (Exception e) { /* ignore */ } } return errorContext; } - + protected Endpoint getEndpoint() { + return messageContext.getEndpoint(); + } + protected int getMessageIntProperty(String propertyName) throws Exception { + return messageContext.getMessageIntProperty(propertyName); + } + protected String getMessageStringProperty(String propertyName) throws Exception { + return messageContext.getMessageStringProperty(propertyName); + } + protected ResourceMetaData getResourceMetaData() throws Exception { + return (ResourceMetaData)messageContext.getMessageObjectProperty(AsynchAEMessage.AEMetadata); + } + protected String getStringMessage() throws Exception { + return messageContext.getStringMessage(); + } + protected Object getMessageObjectProperty(String propertyName) throws Exception { + return messageContext.getMessageObjectProperty(propertyName); + } + protected boolean getMessageBooleanProperty(String propertyName) throws Exception { + return messageContext.getMessageBooleanProperty(propertyName); + } + protected long getMessageLongProperty( String propertyName) throws Exception { + return messageContext.getMessageLongProperty(propertyName); + } + protected boolean propertyExists(String propertyName) throws Exception { + return messageContext.propertyExists(propertyName); + } + protected String getEndpointName() { + return messageContext.getEndpointName(); + } + protected Object getObjectMessage() throws Exception { + return messageContext.getObjectMessage(); + } + protected byte[] getByteMessage() throws Exception { + return messageContext.getByteMessage(); + } + protected MessageContext getMessageContext() { + return messageContext; + } protected Endpoint fetchParentCasOrigin(String parentCasId) throws AsynchAEException { Endpoint endpoint = null; String parentId = parentCasId; @@ -224,8 +265,8 @@ public abstract class AbstractUimaAsComm return cas; } - protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint, - MessageContext mc) throws Exception { + protected SerializationResult deserializeChildCAS(String casMultiplierDelegateKey, Endpoint endpoint + /*MessageContext mc*/) throws Exception { SerializationResult result = new SerializationResult(); // Aggregate time spent waiting for a CAS in the shadow cas pool @@ -250,17 +291,17 @@ public abstract class AbstractUimaAsComm // Create deserialized wrapper for XMI, BINARY, COMPRESSED formats. To add // a new serialization format add a new class which implements // UimaASDeserializer and modify DeserializerFactory class. - UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc); + UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext); deserializer.deserialize(result); return result; } - protected SerializationResult deserializeInputCAS( MessageContext mc) + protected SerializationResult deserializeInputCAS() throws Exception { SerializationResult result = new SerializationResult(); - String origin = mc.getEndpoint().getEndpoint(); - Endpoint endpoint = mc.getEndpoint(); + String origin = messageContext.getEndpoint().getEndpoint(); + Endpoint endpoint = messageContext.getEndpoint(); // Time how long we wait on Cas Pool to fetch a new CAS long t1 = controller.getCpuTime(); @@ -276,20 +317,20 @@ public abstract class AbstractUimaAsComm return null; } - UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, mc); + UimaASDeserializer deserializer = DeserializerFactory.newDeserializer(endpoint, messageContext); deserializer.deserialize(result); return result; } - protected Delegate getDelegate(MessageContext mc) throws AsynchAEException { + protected Delegate getDelegate(/* MessageContext mc */) throws AsynchAEException { String delegateKey = null; - if (mc.getEndpoint().getEndpoint() == null || mc.getEndpoint().getEndpoint().trim().length() == 0) { - String fromEndpoint = mc.getMessageStringProperty(AsynchAEMessage.MessageFrom); + if (messageContext.getEndpoint().getEndpoint() == null || messageContext.getEndpoint().getEndpoint().trim().length() == 0) { + String fromEndpoint = messageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom); delegateKey = ((AggregateAnalysisEngineController) controller) .lookUpDelegateKey(fromEndpoint); } else { delegateKey = ((AggregateAnalysisEngineController) controller) - .lookUpDelegateKey(mc.getEndpoint().getEndpoint()); + .lookUpDelegateKey(messageContext.getEndpoint().getEndpoint()); } return ((AggregateAnalysisEngineController) controller).lookupDelegate(delegateKey); } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java Thu Oct 18 14:12:00 2018 @@ -23,14 +23,14 @@ import org.apache.uima.aae.controller.En import org.apache.uima.aae.message.MessageContext; public class CollectionProcessCompleteRequestCommand extends AbstractUimaAsCommand { - private MessageContext mc; +// private MessageContext mc; public CollectionProcessCompleteRequestCommand(MessageContext mc, AnalysisEngineController controller) { - super(controller); - this.mc = mc; + super(controller, mc); +// this.mc = mc; } public void execute() throws Exception { - Endpoint endpoint = mc.getEndpoint(); - controller.collectionProcessComplete(endpoint); +// Endpoint endpoint = mc.getEndpoint(); + controller.collectionProcessComplete(super.getEndpoint()); } } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java Thu Oct 18 14:12:00 2018 @@ -26,14 +26,14 @@ import org.apache.uima.aae.message.Async import org.apache.uima.aae.message.MessageContext; public class CollectionProcessCompleteResponseCommand extends AbstractUimaAsCommand { - private MessageContext mc; +// private MessageContext mc; public CollectionProcessCompleteResponseCommand(MessageContext mc, AnalysisEngineController controller) { - super(controller); - this.mc = mc; + super(controller,mc); +// this.mc = mc; } public void execute() throws Exception { - Delegate delegate = super.getDelegate(mc); + Delegate delegate = super.getDelegate(); try { System.out.println("..... Controller:"+controller.getComponentName()+" Handling CPC From "+delegate.getKey()); ((AggregateAnalysisEngineController)controller) @@ -41,7 +41,7 @@ public class CollectionProcessCompleteRe } catch (Exception e) { ErrorContext errorContext = new ErrorContext(); errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.CollectionProcessComplete); - errorContext.add(AsynchAEMessage.Endpoint, mc.getEndpoint()); + errorContext.add(AsynchAEMessage.Endpoint, super.getEndpoint()); controller.getErrorHandlerChain().handle(e, errorContext, controller); } }