Author: cwiklik Date: Thu Oct 18 14:12:00 2018 New Revision: 1844241 URL: http://svn.apache.org/viewvc?rev=1844241&view=rev Log: uima-5501
Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/dd/DeploymentDescriptorProcessor.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/factory/AnalysisEngineComponentFactory.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/AbstractUimaAsConsumer.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ComponentConnector.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ConnectorFactory.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/ListenerCallback.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConnector.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsConsumer.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsEndpoint.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/UimaAsProducer.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/BasicConnector.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/basic/DirectConnector.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/definition/connectors/jms/ActiveMqConnector.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/MessageProcessor.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/Origin.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsMessage.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/message/UimaAsOrigin.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/UimaAsMessageProcessor.java Modified: uima/uima-as/branches/uima-as-3/aggregate-uima-as/pom.xml uima/uima-as/branches/uima-as-3/uima-as-parent/pom.xml uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/UimaASService.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/AbstractUimaAsServiceBuilder.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/builder/UimaAsDirectServiceBuilder.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/AbstractUimaAsCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteRequestCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CollectionProcessCompleteResponseCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/CommandFactory.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaRequestCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/GetMetaResponseCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/NoOpCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/PingRequestCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasRequestCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessChildCasResponseCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasRequestCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessInputCasResponseCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ProcessServiceInfoResponseCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/ReleaseCASRequestCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/service/command/StopRequestCommand.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectListener.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessage.java uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/as/client/DirectMessageContext.java uima/uima-as/branches/uima-as-3/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Modified: uima/uima-as/branches/uima-as-3/aggregate-uima-as/pom.xml URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/aggregate-uima-as/pom.xml?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/aggregate-uima-as/pom.xml (original) +++ uima/uima-as/branches/uima-as-3/aggregate-uima-as/pom.xml Thu Oct 18 14:12:00 2018 @@ -66,5 +66,6 @@ <module>../uimaj-as-jms</module> <module>../aggregate-uima-as-eclipse-plugins</module> <module>../uima-as-docbooks</module> + <module>../uimaj-as-connectors</module> </modules> </project> Modified: uima/uima-as/branches/uima-as-3/uima-as-parent/pom.xml URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uima-as-parent/pom.xml?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uima-as-parent/pom.xml (original) +++ uima/uima-as/branches/uima-as-3/uima-as-parent/pom.xml Thu Oct 18 14:12:00 2018 @@ -190,7 +190,6 @@ ${uimaASNoticeText} <scope>compile</scope> </dependency> - <!-- Active MQ Stuff --> <dependency> <groupId>org.apache.activemq</groupId> Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Thu Oct 18 14:12:00 2018 @@ -58,23 +58,18 @@ import org.apache.activemq.command.Activ import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.uima.UIMAFramework; -import org.apache.uima.UIMA_IllegalArgumentException; import org.apache.uima.UIMA_IllegalStateException; import org.apache.uima.aae.AsynchAECasManager_impl; -import org.apache.uima.aae.UIMAEE_Constants; -import org.apache.uima.aae.VersionCompatibilityChecker; import org.apache.uima.aae.UimaASApplicationEvent.EventTrigger; import org.apache.uima.aae.UimaASApplicationExitEvent; import org.apache.uima.aae.UimaAsVersion; +import org.apache.uima.aae.VersionCompatibilityChecker; import org.apache.uima.aae.client.UimaASStatusCallbackListener; import org.apache.uima.aae.client.UimaAsBaseCallbackListener; import org.apache.uima.aae.client.UimaAsynchronousEngine; import org.apache.uima.aae.controller.AnalysisEngineController; import org.apache.uima.aae.controller.ControllerCallbackListener; -import org.apache.uima.aae.controller.ControllerLifecycle; import org.apache.uima.aae.controller.Endpoint; -import org.apache.uima.aae.controller.UimacppServiceController; -import org.apache.uima.aae.error.AsynchAEException; import org.apache.uima.aae.error.UimaASMetaRequestTimeout; import org.apache.uima.aae.jmx.JmxManager; import org.apache.uima.aae.message.AsynchAEMessage; @@ -84,9 +79,7 @@ import org.apache.uima.aae.service.UimaA import org.apache.uima.aae.service.UimaAsServiceRegistry; import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder; import org.apache.uima.adapter.jms.JmsConstants; -import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter; import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer; -import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext; import org.apache.uima.adapter.jms.message.PendingMessage; import org.apache.uima.adapter.jms.message.PendingMessageImpl; import org.apache.uima.adapter.jms.service.Dd2spring; @@ -99,7 +92,6 @@ import org.apache.uima.as.deployer.UimaA import org.apache.uima.as.dispatcher.LocalDispatcher; import org.apache.uima.cas.CAS; import org.apache.uima.cas.SerialFormat; -import org.apache.uima.impl.UimaVersion; import org.apache.uima.internal.util.UUIDGenerator; import org.apache.uima.resource.Resource; import org.apache.uima.resource.ResourceConfigurationException; @@ -109,11 +101,8 @@ import org.apache.uima.resource.Resource import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument; import org.apache.uima.util.Level; import org.apache.xmlbeans.XmlDocumentProperties; -import org.apache.xmlbeans.XmlOptions; -import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; -import org.springframework.context.support.FileSystemXmlApplicationContext; import org.xml.sax.XMLReader; import org.xml.sax.helpers.XMLReaderFactory; @@ -171,12 +160,6 @@ public class BaseUIMAAsynchronousEngine_ protected volatile boolean stopped = false; public BaseUIMAAsynchronousEngine_impl() { this(Transport.JMS); // default - /* - super(); - UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, - "UIMA Version " + UIMAFramework.getVersionString() + - " UIMA-AS Version " + UimaAsVersion.getVersionString()); - */ } public BaseUIMAAsynchronousEngine_impl(Transport transport) { super(); @@ -393,7 +376,8 @@ public class BaseUIMAAsynchronousEngine_ } public void stop() { try { - if ( brokerURI != null && !brokerURI.equals("java")) { + if ( isServiceRemote() ) { +// if ( brokerURI != null && !brokerURI.equals("java")) { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( Level.INFO)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, @@ -475,8 +459,6 @@ public class BaseUIMAAsynchronousEngine_ } protected boolean isServiceRemote() { return transport.equals(Transport.JMS); -// return (service instanceof UimaASJmsService); -// return service == null; } private void startLocalConsumer(Map anApplicationContext) { @@ -519,7 +501,7 @@ public class BaseUIMAAsynchronousEngine_ // start dispatcher in its own thread. It will fetch messages from a shared 'pendingMessageQueue' LocalDispatcher dispatcher = new LocalDispatcher(this, service, pendingMessageQueue); - dispatchThread = new Thread(dispatcher); + dispatchThread = new Thread(dispatcher,"LocalDispatcher"); dispatchThread.start(); } @@ -1231,9 +1213,21 @@ public class BaseUIMAAsynchronousEngine_ throw new AsynchAEException("*** ERROR deployment descriptor validation failed"); } */ + // use xmlbeans framework to parse dd and create java beans for it XmlDocumentProperties dp = dd.documentProperties(); System.out.println(dp.getSourceName()); - + // based on deployment options create relevant deployer + UimaAsServiceDeployer deployer = newServiceDeployer(dd, anApplicationContext); + // deploy (instantiate) uima-as service(s) + service = deployer.deploy(dd, anApplicationContext); + + UimaAsServiceRegistry.getInstance().register(service); + + return service.getId(); + + } + + private UimaAsServiceDeployer newServiceDeployer(AnalysisEngineDeploymentDescriptionDocument dd, Map anApplicationContext) throws Exception { String protocolOverride = null; if ( anApplicationContext!= null && anApplicationContext.containsKey(UimaAsynchronousEngine.Protocol) ) { protocolOverride = (String)anApplicationContext.get(UimaAsynchronousEngine.Protocol); @@ -1247,7 +1241,7 @@ public class BaseUIMAAsynchronousEngine_ // the DD settings if ( protocolOverride == null && providerOverride == null) { // Use factory to create deployer instance for a given - // protocol and provider defined in the DD + // protocol and provider deployer = ServiceDeployers.newDeployer(protocol(dd), provider(dd)); } else { @@ -1272,15 +1266,8 @@ public class BaseUIMAAsynchronousEngine_ deployer = ServiceDeployers.newDeployer(deploymentProtocol, deploymentProvider); } - - service = deployer.deploy(dd, anApplicationContext); - - UimaAsServiceRegistry.getInstance().register(service); - - return service.getId(); - + return deployer; } - protected UimaASService getServiceReference() { return service; } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UimaASJmsService.java Thu Oct 18 14:12:00 2018 @@ -476,5 +476,10 @@ implements UimaASService { public String getName() { return name; } + @Override + public int getScaleout() { + // TODO Auto-generated method stub + return 0; + } } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/builder/UimaAsJmsServiceBuilder.java Thu Oct 18 14:12:00 2018 @@ -18,20 +18,29 @@ */ package org.apache.uima.adapter.jms.service.builder; +import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; import java.util.UUID; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.uima.UIMAFramework; import org.apache.uima.aae.InputChannel; import org.apache.uima.aae.InputChannel.ChannelType; +import org.apache.uima.aae.client.UimaAsynchronousEngine; import org.apache.uima.aae.client.UimaAsynchronousEngine.Transport; +import org.apache.uima.aae.component.TopLevelServiceComponent; import org.apache.uima.aae.OutputChannel; +import org.apache.uima.aae.UimaAsPriorityBasedThreadFactory; import org.apache.uima.aae.UimaClassFactory; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AnalysisEngineController; @@ -46,12 +55,20 @@ import org.apache.uima.aae.error.handler import org.apache.uima.aae.error.handler.GetMetaErrorHandler; import org.apache.uima.aae.error.handler.ProcessCasErrorHandler; import org.apache.uima.aae.handler.Handler; +import org.apache.uima.aae.handler.HandlerBase; +import org.apache.uima.aae.handler.input.MetadataRequestHandler_impl; +import org.apache.uima.aae.handler.input.MetadataResponseHandler_impl; +import org.apache.uima.aae.handler.input.ProcessRequestHandler_impl; +import org.apache.uima.aae.handler.input.ProcessResponseHandler; +import org.apache.uima.aae.service.AsynchronousUimaASService; import org.apache.uima.aae.service.UimaASService; import org.apache.uima.aae.service.builder.AbstractUimaAsServiceBuilder; import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate; import org.apache.uima.aae.service.delegate.RemoteAnalysisEngineDelegate; +import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.activemq.JmsInputChannel; import org.apache.uima.adapter.jms.activemq.JmsOutputChannel; +import org.apache.uima.adapter.jms.activemq.PriorityMessageHandler; import org.apache.uima.adapter.jms.activemq.TempDestinationResolver; import org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer; import org.apache.uima.adapter.jms.service.UimaASJmsService; @@ -65,6 +82,7 @@ import org.apache.uima.resourceSpecifier import org.apache.uima.resourceSpecifier.CollectionProcessCompleteErrorsType; import org.apache.uima.resourceSpecifier.ProcessCasErrorsType; import org.apache.uima.resourceSpecifier.ServiceType; +import org.apache.uima.util.Level; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; public class UimaAsJmsServiceBuilder extends AbstractUimaAsServiceBuilder{ @@ -103,6 +121,276 @@ public class UimaAsJmsServiceBuilder ext } } + /*** NEW CODE */ + + public UimaASService build(TopLevelServiceComponent topLevelComponent, ControllerCallbackListener callback) + throws Exception { + UimaASService service = null; + + // is this the only one resource specifier type supported by the current uima-as? + if (topLevelComponent.getResourceSpecifier() instanceof AnalysisEngineDescription) { + AnalysisEngineDescription aeDescriptor = + (AnalysisEngineDescription) topLevelComponent.getResourceSpecifier(); + String endpoint = resolvePlaceholder(topLevelComponent.getEndpoint().getEndpoint()); + // Create a Top Level Service (TLS) wrapper. This wrapper may contain + // references to multiple TLS service instances if the TLS is scaled + // up. + service = new UimaASJmsService(). + withName(aeDescriptor. + getAnalysisEngineMetaData().getName()) + .withResourceSpecifier(aeDescriptor). + withBrokerURL(topLevelComponent.getEndpoint().getServerURI()). + withInputQueue(endpoint); + + this.buildAndDeploy(topLevelComponent, service, callback); + + + } + return service; + } + + public UimaASService buildAndDeploy(TopLevelServiceComponent topLevelComponent, UimaASService service, ControllerCallbackListener callback) throws Exception { + // create ResourceManager, CasManager, and InProcessCache + initialize(service, topLevelComponent.getComponentCasPool(), Transport.Java); + + AnalysisEngineController topLevelController = + createController(topLevelComponent, callback, service.getId()); + + service.withInProcessCache(super.cache); + System.setProperty("BrokerURI", "Direct"); + configureTopLevelService(topLevelController, service /*, topLevelComponent.getScaleout() */); + return service; + + } + + + private void configureTopLevelService(AnalysisEngineController topLevelController, UimaASService service) throws Exception { + // First create Connection Factory. This is needed by + // JMS listeners. + createConnectionFactory(); + // counts number of initialized threads + CountDownLatch latchToCountNumberOfInitedThreads = + new CountDownLatch(service.getScaleout()); + // counts number of terminated threads + CountDownLatch latchToCountNumberOfTerminatedThreads = + new CountDownLatch(service.getScaleout()); + OutputChannel outputChannel; + ; + // Add one instance of JmsOutputChannel + if ( topLevelController.getOutputChannel(ENDPOINT_TYPE.JMS) == null ) { + outputChannel = new JmsOutputChannel(); + outputChannel.setController(topLevelController); + outputChannel.setServerURI(brokerURL); + outputChannel.setServiceInputEndpoint(service.getEndpoint()); + topLevelController.addOutputChannel(outputChannel); + } else { + outputChannel = (JmsOutputChannel)topLevelController.getOutputChannel(ENDPOINT_TYPE.JMS); + outputChannel.setServiceInputEndpoint(service.getEndpoint()); + } + JmsInputChannel inputChannel; + // Add one instance of JmsInputChannel + if ( topLevelController.getInputChannel(ENDPOINT_TYPE.JMS) == null ) { + inputChannel = new JmsInputChannel(ChannelType.REQUEST_REPLY); + topLevelController.setInputChannel(inputChannel); + } else { + inputChannel = (JmsInputChannel)topLevelController.getInputChannel(ENDPOINT_TYPE.JMS); + } + + inputChannel.setController(topLevelController); + + inputChannel.setMessageHandler(getMessageHandler(topLevelController)); + + // Create service JMS listeners to handle Process, GetMeta and optional FreeCas + // requests. + + // listener to handle process CAS requests + UimaDefaultMessageListenerContainer processListener + = createListener(Type.ProcessCAS, service.getScaleout(), inputChannel, outputChannel); + inputChannel.addListenerContainer(processListener); + + + + + + + String targetStringSelector = ""; + if ( System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) != null ) { + targetStringSelector = System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty); + } else { + // the default selector is IP:PID + String ip = InetAddress.getLocalHost().getHostAddress(); + targetStringSelector = ip+":"+topLevelController.getPID(); + } + UimaDefaultMessageListenerContainer targetedListener = + new UimaDefaultMessageListenerContainer(); + targetedListener.setType(Type.Target); + // setup jms selector + if ( topLevelController.isCasMultiplier()) { + targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)"); + } else { + targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)"); + } + + // use shared ConnectionFactory + targetedListener.setConnectionFactory(processListener.getConnectionFactory()); + // mark the listener as a 'Targeted' listener + targetedListener.setTargetedListener(); + targetedListener.setController(topLevelController); + // there will only be one delivery thread. Its job will be to + // add a targeted message to a BlockingQueue. Such thread will block + // in an enqueue if a dequeue is not available. This will be prevent + // the overwhelming the service with messages. + ThreadPoolTaskExecutor threadExecutor = new ThreadPoolTaskExecutor(); + threadExecutor.setCorePoolSize(1); + threadExecutor.setMaxPoolSize(1); + targetedListener.setTaskExecutor(threadExecutor); + targetedListener.setConcurrentConsumers(1); + if ( processListener.getMessageListener() instanceof PriorityMessageHandler ) { + // the targeted listener will use the same message handler as the + // Process listener. This handler will add a message wrapper + // to enable prioritizing messages. + targetedListener.setMessageListener(processListener.getMessageListener()); + } + // Same queue as the Process queue + targetedListener.setDestination(processListener.getDestination()); + //registerListener(targetedListener); + // targetedListener.afterPropertiesSet(); + threadExecutor.initialize(); + + //targetedListener.initialize(); + //targetedListener.start(); + if (UIMAFramework.getLogger(getClass()).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(getClass()).logrb(Level.INFO, getClass().getName(), + "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_TARGET_LISTENER__INFO", + new Object[] {targetedListener.getMessageSelector(), topLevelController.getComponentName() }); + } + + inputChannel.addListenerContainer(targetedListener); + + //listeners.add(processListener); + // listener to handle GetMeta requests + UimaDefaultMessageListenerContainer getMetaListener + = createListener(Type.GetMeta, 1); + inputChannel.addListenerContainer(getMetaListener); + //listeners.add(getMetaListener); + + if ( topLevelController.isCasMultiplier()) { + // listener to handle Free CAS requests + UimaDefaultMessageListenerContainer freeCasListener + = createListener(Type.FreeCAS, 1); + inputChannel.addListenerContainer(freeCasListener); + //listeners.add(freeCasListener); + } + } + + private UimaDefaultMessageListenerContainer createListener(Type type, int consumerCount, InputChannel inputChannel, OutputChannel outputChannel) throws Exception{ + PriorityMessageHandler h = null; + + ThreadPoolTaskExecutor jmsListenerThreadExecutor = + new ThreadPoolTaskExecutor(); + + if ( Type.ProcessCAS.equals(type)) { + outputChannel.setServerURI(getBrokerURL()); + if ( controller.isPrimitive() ) { + h = new PriorityMessageHandler(consumerCount); + ThreadPoolTaskExecutor threadExecutor = + new ThreadPoolTaskExecutor(); + controller.setThreadFactory(threadExecutor); + + CountDownLatch latchToCountNumberOfTerminatedThreads = + new CountDownLatch(consumerCount); + // Create a Custom Thread Factory. Provide it with an instance of + // PrimitiveController so that every thread can call it to initialize + // the next available instance of a AE. + ThreadFactory tf = + new UimaAsPriorityBasedThreadFactory(Thread.currentThread(). + getThreadGroup(), controller, latchToCountNumberOfTerminatedThreads) + .withQueue(h.getQueue()).withChannel(controller.getInputChannel(ENDPOINT_TYPE.JMS)); + + + ((UimaAsPriorityBasedThreadFactory)tf).setDaemon(true); + // This ThreadExecutor will use custom thread factory instead of default one + threadExecutor.setThreadFactory(tf); + threadExecutor.setCorePoolSize(consumerCount); + threadExecutor.setMaxPoolSize(consumerCount); + + // Initialize the thread pool + threadExecutor.initialize(); + + // Make sure all threads are started. This forces each thread to call + // PrimitiveController to initialize the next instance of AE + threadExecutor.getThreadPoolExecutor().prestartAllCoreThreads(); + // This ThreadExecutor will use custom thread factory instead of default one + + } + + } + jmsListenerThreadExecutor.setCorePoolSize(consumerCount); + jmsListenerThreadExecutor.setMaxPoolSize(consumerCount); + jmsListenerThreadExecutor.initialize(); + + +// threadExecutor.setCorePoolSize(consumerCount); +// threadExecutor.setMaxPoolSize(consumerCount); + + // destination can be NULL if this listener is meant for a + // a temp queue. Such destinations are created on demand + // using destination resolver which is plugged into the + // listener. The resolver creates a temp queue lazily on + // listener startup. + ActiveMQDestination destination = null; + + if ( !isTempQueueListener(type) ) { + destination = new ActiveMQQueue(queueName); + } + JmsMessageListenerBuilder listenerBuilder = + new JmsMessageListenerBuilder(); + + UimaDefaultMessageListenerContainer messageListener = + listenerBuilder.withController(controller) + .withType(type) + .withConectionFactory(factory) + .withThreadPoolExecutor(jmsListenerThreadExecutor) + .withConsumerCount(consumerCount) + .withInputChannel(inputChannel) + .withPriorityMessageHandler(h) + .withSelector(getSelector(type)) + .withDestination(destination) + .build(); + messageListener.setReceiveTimeout(500); +// messageListener.setMessageListener(h); + return messageListener; + } + public HandlerBase getMessageHandler(AnalysisEngineController controller) { + MetadataRequestHandler_impl metaHandler = new MetadataRequestHandler_impl("MetadataRequestHandler"); + metaHandler.setController(controller); + ProcessRequestHandler_impl processHandler = new ProcessRequestHandler_impl("ProcessRequestHandler"); + processHandler.setController(controller); + metaHandler.setDelegate(processHandler); + if ( !controller.isPrimitive() ) { + MetadataResponseHandler_impl metaResponseHandler = + new MetadataResponseHandler_impl("MetadataResponseHandler"); + metaResponseHandler.setController(controller); + processHandler.setDelegate(metaResponseHandler); + + ProcessResponseHandler processResponseHandler = + new ProcessResponseHandler("ProcessResponseHandler"); + processResponseHandler.setController(controller); + metaResponseHandler.setDelegate(processResponseHandler); + + } + return metaHandler; + } + + + + + /* OLD CODE */ + + + + public static InputChannel createInputChannel(ChannelType type) { return new JmsInputChannel(type); @@ -388,7 +676,10 @@ public class UimaAsJmsServiceBuilder ext } public UimaASService build(AnalysisEngineDeploymentDescriptionDocument dd, ControllerCallbackListener callback) - throws Exception { + throws Exception { + + + // get the top level AnalysisEngine descriptor String aeDescriptorPath = getAEDescriptorPath(dd); // parse AE descriptor Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/direct/UimaAsDirectServiceDeployer.java Thu Oct 18 14:12:00 2018 @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import org.apache.uima.aae.component.TopLevelServiceComponent; +import org.apache.uima.aae.component.dd.DeploymentDescriptorProcessor; import org.apache.uima.aae.service.UimaASService; import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder; import org.apache.uima.as.deployer.AbstractUimaASDeployer; @@ -42,7 +44,21 @@ public class UimaAsDirectServiceDeployer Map<String, String> deploymentProperties) throws Exception { UimaASService uimaAsService = null; try { - uimaAsService = new UimaAsDirectServiceBuilder().build(dd, this); + DeploymentDescriptorProcessor ddProcessor = + new DeploymentDescriptorProcessor(dd); + + // process dd producing TopLevelServiceComponent. If the dd + // is an aggregate, the component object will include a tree + // of delegates. It basically combines information from both + // a dd and resource specifier for all parts of the pipeline + // aggregating instances of AnalysisEngineComponent created + // for every delegate. + TopLevelServiceComponent topLevelComponent = + ddProcessor.newComponent(); + + // create an instance of a service for the client to use + uimaAsService = new UimaAsDirectServiceBuilder().build(topLevelComponent, this); + // start listeners uimaAsService.start(); // block until all internal components initialize and are ready to process Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/deployer/jms/UimaAsJmsServiceDeployer.java Thu Oct 18 14:12:00 2018 @@ -21,7 +21,10 @@ package org.apache.uima.as.deployer.jms; import java.util.Map; import java.util.concurrent.CountDownLatch; +import org.apache.uima.aae.component.TopLevelServiceComponent; +import org.apache.uima.aae.component.dd.DeploymentDescriptorProcessor; import org.apache.uima.aae.service.UimaASService; +import org.apache.uima.aae.service.builder.UimaAsDirectServiceBuilder; import org.apache.uima.adapter.jms.service.builder.UimaAsJmsServiceBuilder; import org.apache.uima.as.deployer.AbstractUimaASDeployer; import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument; @@ -37,6 +40,33 @@ public class UimaAsJmsServiceDeployer ex Map<String, String> deploymentProperties) throws Exception { UimaASService uimaAsService = null; + + try { + DeploymentDescriptorProcessor ddProcessor = + new DeploymentDescriptorProcessor(dd); + + // process dd producing TopLevelServiceComponent. If the dd + // is an aggregate, the component object will include a tree + // of delegates. It basically combines information from both + // a dd and resource specifier for all parts of the pipeline + // aggregating instances of AnalysisEngineComponent created + // for every delegate. + TopLevelServiceComponent topLevelComponent = + ddProcessor.newComponent(); + + // create an instance of a service for the client to use + //uimaAsService = new UimaAsDirectServiceBuilder().build(topLevelComponent, this); + uimaAsService = new UimaAsJmsServiceBuilder().build(topLevelComponent, this); + // start listeners + uimaAsService.start(); + // block until all internal components initialize and are ready to process + waitUntilInitialized(); + + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + /* try { uimaAsService = new UimaAsJmsServiceBuilder().build(dd, this); // start listeners. Nothing happens unless JMS listeners start @@ -57,6 +87,7 @@ public class UimaAsJmsServiceDeployer ex throw e; } + */ return uimaAsService; } Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java Thu Oct 18 14:12:00 2018 @@ -99,7 +99,7 @@ public class LocalDispatcher implements while (client.isRunning()) { PendingMessage pm = null; try { - System.out.println("LocalDispatcher.run()- waiting for new message ..."); + System.out.println("LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode()); pm = messageQueue.take(); System.out.println("LocalDispatcher.run()-got new message to dispatch"); } catch (InterruptedException e) { @@ -122,9 +122,12 @@ public class LocalDispatcher implements } } try { + System.out.println(".................... calling LocalDispatch.beforeDispatch()"); client.beforeDispatch(pm); + System.out.println(".................... calling LocalDispatch.dispatch()"); dispatch(pm); + System.out.println(".................... LocalDispatch.dispatch() returned"); } catch (Exception e) { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "run", Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java Thu Oct 18 14:12:00 2018 @@ -51,6 +51,7 @@ import org.apache.uima.aae.service.UimaA import org.apache.uima.aae.service.UimaAsServiceRegistry; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.activemq.JmsInputChannel; +import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl; import org.apache.uima.cas.CAS; import org.apache.uima.cas.TypeSystem; import org.apache.uima.collection.CollectionReader; @@ -129,12 +130,16 @@ public class TestUimaASNoErrors extends @Test public void testDeploy() throws Exception { + UimaAsynchronousEngine uimaAS = getClient(Transport.Java); Map ctx = new HashMap<>(); - + ctx.put(UimaAsynchronousEngine.Provider,"java"); + ctx.put(UimaAsynchronousEngine.Protocol,"java"); +/* ctx.put(UimaAsynchronousEngine.Provider,"activemq"); ctx.put(UimaAsynchronousEngine.Protocol,"jms"); + */ uimaAS.deploy(relativePath + "/Deploy_NoOpAnnotator.xml", ctx); runTest2(null, uimaAS, getMasterConnectorURI(broker), @@ -143,6 +148,21 @@ public class TestUimaASNoErrors extends uimaAS.stop(); } + @Test + public void testJmsServiceAdapter() throws Exception { + Logger.getLogger(this.getClass()).info("-------------- testJmsServiceAdapter -------------"); + //setUp(); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + try { + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsService.xml"); + runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue", + 0, PROCESS_LATCH); + + } catch( Exception e ) { + throw e; + } + } /* * * @@ -1419,6 +1439,27 @@ public class TestUimaASNoErrors extends uimaAsClient.stop(); } + @Test + public void testDeployAsyncAggregateServiceOverJava() throws Exception { + testDeployAsyncAggregateService(Transport.Java); + } + + public void testDeployAsyncAggregateService(Transport transport) throws Exception { + System.out.println("-------------- testDeployAggregateService -------------"); + UimaAsynchronousEngine uimaAsClient = getClient(transport); + System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); + Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue"); + deployTopLevelService(appCtx, transport, uimaAsClient, relativePath + "/Deploy_AsyncAggregate.xml", + "TopLevelTaeQueue"); + + appCtx.put(UimaAsynchronousEngine.Timeout, 0); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0); + + addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); + + runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 200, PROCESS_LATCH); + } + @Test public void testDeployAggregateServiceOverJava() throws Exception { testDeployAggregateService(Transport.Java); @@ -1433,10 +1474,12 @@ public class TestUimaASNoErrors extends //BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); - // System.setProperty("BrokerURL", "tcp::/localhost:61616"); + System.setProperty("NoOpBroker", "tcp::/localhost:61616"); System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); // deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); - Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue"); + deployJmsService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotatorUsingPlaceholder.xml"); + + Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue"); deployTopLevelService(appCtx, transport, uimaAsClient, relativePath + "/Deploy_AggregateAnnotator.xml","TopLevelTaeQueue"); // deployJavaService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml"); Modified: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Thu Oct 18 14:12:00 2018 @@ -659,6 +659,7 @@ public abstract class BaseTestSupport ex } // Send CPC + System.out.println("............. Sending CPC"); uimaAsClient.collectionProcessingComplete(); } } Added: uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregate.xml Thu Oct 18 14:12:00 2018 @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> + + <!-- + *************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *************************************************************** + --> + +<analysisEngineDeploymentDescription xmlns="http://uima.apache.org/resourceSpecifier"> + + <name>Top Level TAE</name> + <description></description> + + <deployment protocol="${Protocol}" provider="${Provider}"> + <casPool numberOfCASes="5" initialFsHeapSize="500"/> + <service> + <inputQueue endpoint="TopLevelTaeQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/> + <topDescriptor> + <!--import location="../descriptors/analysis_engine/SimpleTestAggregate.xml"/--> + <import location="../descriptors/analysis_engine/ComplexNestedAggregate_TAE.xml"/> + </topDescriptor> + <analysisEngine async="true"> + <!--delegates> + + <analysisEngine key="TestMultiplier"> + <casMultiplier poolSize="5"/> + </analysisEngine> + + </delegates--> + </analysisEngine> + </service> + </deployment> + +</analysisEngineDeploymentDescription> \ No newline at end of file Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/Lifecycle.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,6 @@ +package org.apache.uima.aae; + +public interface Lifecycle { + public void start() throws Exception; + public void stop() throws Exception; +} Modified: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java?rev=1844241&r1=1844240&r2=1844241&view=diff ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java (original) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java Thu Oct 18 14:12:00 2018 @@ -27,7 +27,7 @@ import org.apache.uima.UIMAFramework; import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState; import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController; import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl; -import org.apache.uima.as.client.DirectListener.DirectListenerCallback; +import org.apache.uima.aae.definition.connectors.ListenerCallback; import org.apache.uima.util.Level; /** @@ -60,7 +60,7 @@ public class UimaAsThreadFactory impleme private CountDownLatch latchToCountNumberOfInitedThreads; - private DirectListenerCallback callback = null; + private ListenerCallback callback = null; public UimaAsThreadFactory() { @@ -79,7 +79,7 @@ public class UimaAsThreadFactory impleme this.latchToCountNumberOfInitedThreads = latchToCountNumberOfInitedThreads; } - public UimaAsThreadFactory withCallback(DirectListenerCallback c) { + public UimaAsThreadFactory withCallback(ListenerCallback c) { callback = c; return this; } @@ -163,6 +163,7 @@ public class UimaAsThreadFactory impleme // TaskExecutor is terminated. r.run(); } catch (Throwable e) { + e.printStackTrace(); if ( !(e instanceof Exception) ) { // try to log. If this is OOM, logging may not succeed and we // get another OOM. Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AggregateAnalysisEngineComponent.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,23 @@ +package org.apache.uima.aae.component; + +import org.apache.uima.aae.definition.connectors.basic.BasicConnector; +import org.apache.uima.resource.ResourceSpecifier; + +public class AggregateAnalysisEngineComponent extends AnalysisEngineComponent { + + public AggregateAnalysisEngineComponent(String key, ResourceSpecifier rs) { + super(key, rs); + } + + @Override + public boolean isPrimitive() { + return false; + } + + @Override + public Object getConnector() { + return new BasicConnector(); + } + + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/AnalysisEngineComponent.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,147 @@ +package org.apache.uima.aae.component; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.uima.aae.controller.DelegateEndpoint; +import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.controller.DelegateEndpoint.Builder; +import org.apache.uima.resource.ResourceSpecifier; + +public abstract class AnalysisEngineComponent { + + protected List<AnalysisEngineComponent> delegateList = new ArrayList<>(); + private boolean isCasMultiplier = false; + private boolean scaleable = false; + private boolean async = false; + private String componentKey; + private int scaleout = 1; + private ResourceSpecifier resourceSpecifier; + private CasMultiplierNature casMultiplier; + private int requestThreadPoolSize=1; + private int responseThreadPoolSize=1; + private Endpoint endpoint = null; + + + public abstract Object getConnector(); + + public AnalysisEngineComponent() {} + public AnalysisEngineComponent(String key, ResourceSpecifier rs) { + componentKey = key; + resourceSpecifier = rs; + } + + + public CasMultiplierNature getCasMultiplierNature() { + return casMultiplier; + } + public ResourceSpecifier getResourceSpecifier() { + return resourceSpecifier; + } + + public boolean isScaleable() { + return scaleable; + } + + public int getScaleout() { + return scaleout; + } + + public boolean isCasMultiplier() { + return isCasMultiplier; + } + public boolean isAsync() { + return async; + } + public boolean isCasConsumer() { + return false; + } + + public boolean isPrimitive() { + return true; + } + + public boolean isRemote() { + return false; + } + public String getKey() { + return componentKey; + + } + public Endpoint getEndpoint() { + if ( endpoint == null ) { + String serviceEndpoint = getKey(); + String server = "java"; + if ( this instanceof RemoteAnalysisEngineComponent ) { + serviceEndpoint = ((RemoteAnalysisEngineComponent)this).getEndpointName(); + server = ((RemoteAnalysisEngineComponent)this).getServer(); + } + endpoint = new DelegateEndpoint(). new Builder(). + withDelegateKey(getKey()). + withEndpointName(serviceEndpoint). + setRemote(isRemote()). + setServerURI(server). + withResourceSpecifier(getResourceSpecifier()). + build(); + if ( isCasMultiplier ) { + endpoint.setIsCasMultiplier(true); + endpoint.setProcessParentLast(casMultiplier.processParentLast()); + if ( casMultiplier.getPoolSize() > 1) { + endpoint.setShadowCasPoolSize(casMultiplier.getPoolSize()); + } + if ( casMultiplier.getInitialFsHeapSize() > 0 ) { + endpoint.setInitialFsHeapSize( (int) casMultiplier.getInitialFsHeapSize()); + } + endpoint.setDisableJCasCache(casMultiplier.disableJCasCache()); + } + if ( isRemote()) { + endpoint.setMetadataRequestTimeout(((RemoteAnalysisEngineComponent)this).getMetaTimeout()); + endpoint.setProcessRequestTimeout(((RemoteAnalysisEngineComponent)this).getProcessTimeout()); + endpoint.setCollectionProcessCompleteTimeout(((RemoteAnalysisEngineComponent)this).getCollectionProcessCompleteTimeout()); + } + } + + return endpoint; + } + public void add(AnalysisEngineComponent component) { + delegateList.add(component); + } + public AnalysisEngineComponent getChild(int index) { + return delegateList.get(index); + } + public List<AnalysisEngineComponent> getChildren() { + return delegateList; + } + public AnalysisEngineComponent enableScaleout() { + scaleable = true; + return this; + } + public AnalysisEngineComponent withScaleout(int howManyInstances) { + scaleout = howManyInstances; + return this; + } + + public AnalysisEngineComponent withRequestThreadPoolSize(int howManyThreads) { + requestThreadPoolSize = howManyThreads; + return this; + } + + public AnalysisEngineComponent withReplyThreadPoolSize(int howManyThreads) { + responseThreadPoolSize = howManyThreads; + return this; + } + + public AnalysisEngineComponent enableCasMultiplierNatureWith(CasMultiplierNature cm) { + this.casMultiplier = cm; + return this; + } + public AnalysisEngineComponent enableCasMultipler() { + isCasMultiplier = true; + return this; + } + public AnalysisEngineComponent enableAsync() { + async = true; + return this; + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierComponent.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,35 @@ +package org.apache.uima.aae.component; + +public class CasMultiplierComponent implements CasMultiplierNature { + boolean disableJCasCache; + long initialFsHeapSize; + int casPoolSize; + boolean processParentLast; + + public CasMultiplierComponent(boolean disableJCasCache,long initialFsHeapSize,int casPoolSize,boolean processParentLast ) { + this.disableJCasCache = disableJCasCache; + this.initialFsHeapSize = initialFsHeapSize; + this.casPoolSize = casPoolSize; + this.processParentLast = processParentLast; + } + @Override + public boolean disableJCasCache() { + return false; + } + + @Override + public long getInitialFsHeapSize() { + return 0; + } + + @Override + public int getPoolSize() { + return 0; + } + + @Override + public boolean processParentLast() { + return false; + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/CasMultiplierNature.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,8 @@ +package org.apache.uima.aae.component; + +public interface CasMultiplierNature { + boolean disableJCasCache(); + long getInitialFsHeapSize(); + int getPoolSize(); + boolean processParentLast(); +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/ComponentCasPool.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,22 @@ +package org.apache.uima.aae.component; + +public class ComponentCasPool { + boolean disableJCasCache; + int initialHeapSize=1000; + int poolSize=1; + public ComponentCasPool(boolean disableJCasCache, int initialHeapSize, int poolSize) { + this.disableJCasCache = disableJCasCache; + this.initialHeapSize = initialHeapSize; + this.poolSize = poolSize; + } + public boolean isDisableJCasCache() { + return disableJCasCache; + } + public int getInitialHeapSize() { + return initialHeapSize; + } + public int getPoolSize() { + return poolSize; + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/PrimitiveAnalysisEngineComponent.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,42 @@ +package org.apache.uima.aae.component; + +import org.apache.uima.aae.definition.connectors.basic.BasicConnector; +import org.apache.uima.resource.ResourceSpecifier; + +public class PrimitiveAnalysisEngineComponent extends AnalysisEngineComponent { + + public PrimitiveAnalysisEngineComponent(String key, ResourceSpecifier rs) { + super(key, rs); + } + /* + @Override + public boolean isScaleable() { + return false; + } + + @Override + public boolean isCasMultiplier() { + return false; + } + + @Override + public boolean isCasConsumer() { + return false; + } +*/ + @Override + public boolean isPrimitive() { + return true; + } +/* + @Override + public boolean isRemote() { + return false; + } + */ + @Override + public Object getConnector() { + return new BasicConnector(); + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/RemoteAnalysisEngineComponent.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,74 @@ +package org.apache.uima.aae.component; + +import java.util.Objects; + +import org.apache.uima.resourceSpecifier.AsyncAggregateErrorConfigurationType; +import org.apache.uima.resourceSpecifier.InputQueueType; +import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType; +import org.apache.uima.resourceSpecifier.SerializerType; + +public class RemoteAnalysisEngineComponent extends AnalysisEngineComponent { + + private Object connector; + private AnalysisEngineComponent decoratedComponent; + private SerializerType serializer; + private int replyChannelScaleout; + private InputQueueType remoteEndpoint; + private AsyncAggregateErrorConfigurationType errorConfiguration; + + public RemoteAnalysisEngineComponent(AnalysisEngineComponent component, RemoteAnalysisEngineType remoteDelegate) { + super(component.getKey(), null); + decoratedComponent = component; + errorConfiguration = + remoteDelegate.getAsyncAggregateErrorConfiguration(); + remoteEndpoint = + remoteDelegate.getInputQueue(); + replyChannelScaleout = remoteDelegate.getRemoteReplyQueueScaleout(); + serializer = + remoteDelegate.getSerializer(); + } + + @Override + public boolean isRemote() { + return true; + } + public String getServer() { + return remoteEndpoint.getBrokerURL(); + } + + public String getEndpointName() { + return remoteEndpoint.getEndpoint(); + } + public int getPrefetch() { + return remoteEndpoint.getPrefetch(); + } + + @Override + public Object getConnector() { + return connector; + } + + public int getProcessTimeout() { + return errorConfiguration.getProcessCasErrors().getTimeout(); + } + public int getMetaTimeout() { + return errorConfiguration.getGetMetadataErrors().getTimeout(); + } + public int getCollectionProcessCompleteTimeout() { + return errorConfiguration.getCollectionProcessCompleteErrors().getTimeout(); + } + public String getSupportedSerialization() { + if ( Objects.isNull( serializer ) ) { + return "xmi"; + } + return serializer.getStringValue().trim(); + } + public int getReplyConsumerCount() { + return replyChannelScaleout; + } + public RemoteAnalysisEngineComponent withConnector(Object connector) { + this.connector = connector; + return this; + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TestGenerator.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,133 @@ +package org.apache.uima.aae.component; + +import java.io.File; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.uima.aae.UimaClassFactory; +import org.apache.uima.aae.service.delegate.AggregateAnalysisEngineDelegate; +import org.apache.uima.aae.service.delegate.AnalysisEngineDelegate; +import org.apache.uima.analysis_engine.AnalysisEngineDescription; +import org.apache.uima.resource.ResourceSpecifier; +import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument; +import org.apache.uima.resourceSpecifier.AnalysisEngineType; +import org.apache.uima.resourceSpecifier.DelegateAnalysisEngineType; +import org.apache.uima.resourceSpecifier.RemoteAnalysisEngineType; +import org.apache.uima.resourceSpecifier.ServiceType; + +public class TestGenerator { + + public AnalysisEngineDeploymentDescriptionDocument parseDD(String descriptorPath) throws Exception { + return AnalysisEngineDeploymentDescriptionDocument.Factory.parse(new File(descriptorPath)); + + } + private boolean isAggregate(AnalysisEngineType aet) { + return ("true".equals(aet.getAsync()) || aet.isSetAsync() || aet.isSetDelegates()); + } + private boolean isAggregate( ResourceSpecifier resourceSpecifier) { + boolean aggregate = false; + if (resourceSpecifier instanceof AnalysisEngineDescription ) { + AnalysisEngineDescription aeDescriptor = + (AnalysisEngineDescription) resourceSpecifier; + + if ( !aeDescriptor.isPrimitive() ) { + aggregate = true; + } + // if ( d != null ) { +// if ((d instanceof AggregateAnalysisEngineDelegate) || +// (d.isAsync() && !d.isPrimitive()) ) { +// aggregate = true; +// } +// } else if ( !aeDescriptor.isPrimitive() ) { +// aggregate = true; +// } + } + return aggregate; + } + + private AnalysisEngineType findMatchInDD(String key) throws Exception { + + return null; + } + private AnalysisEngineDescription getAeDescription(ResourceSpecifier rs) { + return (AnalysisEngineDescription) rs; + } + public AnalysisEngineComponent parse(ResourceSpecifier rs, String key) throws Exception { + AnalysisEngineDescription aeDescriptor = getAeDescription(rs); + AnalysisEngineComponent component = null; + + //AnalysisEngineType aet = findMatchInDD(rs.) + if ( isAggregate(rs) ) { + component = + new AggregateAnalysisEngineComponent(key, rs); + + Map<String, ResourceSpecifier> delegates = + aeDescriptor.getDelegateAnalysisEngineSpecifiers(); + + for(Entry<String, ResourceSpecifier> delegateEntry: delegates.entrySet() ) { + component.add(parse(delegateEntry.getValue(), delegateEntry.getKey() )); + } + + + +/* + + + + + + // The DD object maintains two arrays, one for co-located delegates and the other for remotes. + // First handle co-located delegates. + if ( aet.getDelegates().getAnalysisEngineArray().length > 0 ) { + DelegateAnalysisEngineType[] localAnalysisEngineArray = + aet.getDelegates().getAnalysisEngineArray(); + + // Add default error handling to each co-located delegate + for( DelegateAnalysisEngineType delegate : localAnalysisEngineArray ) { + String key = delegate.getKey(); + // recursively iterate over delegates until no more aggregates found + aggregate.add(walk(delegate)); + } + + + + addColocatedDelegates(localAnalysisEngineArray,(AggregateAnalysisEngineDelegate)delegate); + } + // Next add remote delegates of this aggregate + if ( hasRemoteDelegates(aet) ) { + RemoteAnalysisEngineType[] remoteAnalysisEngineArray = + aet.getDelegates().getRemoteAnalysisEngineArray(); + addRemoteDelegates(remoteAnalysisEngineArray, (AggregateAnalysisEngineDelegate)delegate); + } +*/ + } else { + component = new PrimitiveAnalysisEngineComponent(key, rs); + } + + if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().isMultipleDeploymentAllowed() ) { + component.enableCasMultipler(); + } + if ( aeDescriptor.getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() ) { + component.enableScaleout(); + } + + return component; + } + public static void main(String[] args) { + try { + + TestGenerator generator = new TestGenerator(); + AnalysisEngineDeploymentDescriptionDocument dd = + generator.parseDD(args[0]); + ServiceType service = + dd.getAnalysisEngineDeploymentDescription().getDeployment().getService(); + ResourceSpecifier resourceSpecifier = + UimaClassFactory.produceResourceSpecifier(service.getTopDescriptor().getImport().getLocation()); + generator.walk(resourceSpecifier, null); // null= top level + + } catch( Exception e) { + e.printStackTrace(); + } + } + +} Added: uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java URL: http://svn.apache.org/viewvc/uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java?rev=1844241&view=auto ============================================================================== --- uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java (added) +++ uima/uima-as/branches/uima-as-3/uimaj-as-core/src/main/java/org/apache/uima/aae/component/TopLevelServiceComponent.java Thu Oct 18 14:12:00 2018 @@ -0,0 +1,248 @@ +package org.apache.uima.aae.component; + +import java.io.InvalidObjectException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.uima.aae.controller.DelegateEndpoint; +import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.resourceSpecifier.AnalysisEngineDeploymentDescriptionDocument; +import org.apache.uima.resourceSpecifier.AsyncPrimitiveErrorConfigurationType; +import org.apache.uima.resourceSpecifier.CasMultiplierType; +import org.apache.uima.resourceSpecifier.CasPoolType; +import org.apache.uima.resourceSpecifier.DeploymentType; +import org.apache.uima.resourceSpecifier.EnvironmentVariableType; +import org.apache.uima.resourceSpecifier.EnvironmentVariablesType; +import org.apache.uima.resourceSpecifier.ServiceType; +import org.apache.uima.resourceSpecifier.TopLevelAnalysisEngineType; + +public class TopLevelServiceComponent extends AnalysisEngineComponent{ + private String name; + private String description; + private String version; + private String vendor; + + private String protocol; + private String provider; + + private int poolSize=1; + private boolean processParentLast; + private boolean disableJCasCache; + private int initialHeapSize=500; + + private Endpoint endpoint; + + private List<EnvironmentVariable> envVariables = new ArrayList<>(); + private ComponentCasPool casPool = null; + + private AnalysisEngineComponent decoratedComponent; + + public TopLevelServiceComponent(AnalysisEngineComponent decorated, AnalysisEngineDeploymentDescriptionDocument dd) { + super(decorated.getKey(),decorated.getResourceSpecifier()); + this.decoratedComponent = decorated; + DeploymentType deployment = + dd.getAnalysisEngineDeploymentDescription().getDeployment(); + + ServiceType service = + dd.getAnalysisEngineDeploymentDescription().getDeployment().getService(); + + if ( Objects.nonNull(service.getEnvironmentVariables()) ) { + configureEnvironmentVariables(service.getEnvironmentVariables()); + } + + if ( Objects.nonNull(deployment.getCasPool()) ) { + configureCasPool(deployment.getCasPool()); + } + + if ( Objects.nonNull(service.getAnalysisEngine())&& + Objects.nonNull(service.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration()) ) { + AsyncPrimitiveErrorConfigurationType topLevelErrorConfiguration = + service.getAnalysisEngine().getAsyncPrimitiveErrorConfiguration(); + configureErrorHandling(topLevelErrorConfiguration); + } + withName(dd.getAnalysisEngineDeploymentDescription().getName()). + withDescription(dd.getAnalysisEngineDeploymentDescription().getDescription()). + withVersion(dd.getAnalysisEngineDeploymentDescription().getVersion()). + withVendor(dd.getAnalysisEngineDeploymentDescription().getVendor()). + withProtocol(deployment.getProtocol()). + withProvider(deployment.getProvider()); + + String brokerURL = service.getInputQueue().getBrokerURL(); + String queueName = service.getInputQueue().getEndpoint(); + int prefetch = service.getInputQueue().getPrefetch(); + configureEndpoint( queueName, brokerURL, prefetch); + if ( Objects.nonNull(service.getAnalysisEngine()) ) { + configureAnalysisEngine(service.getAnalysisEngine()); + + if ( Objects.nonNull(service.getAnalysisEngine().getCasMultiplier()) ) { + configureCasMultiplier(service.getAnalysisEngine().getCasMultiplier()); + } + } + } + @Override + public Endpoint getEndpoint() { + return endpoint; + } + + private void configureEndpoint(String name, String server, int prefetch) { + endpoint = new DelegateEndpoint().new Builder().withDelegateKey(getKey()).withEndpointName(name) + .setRemote(isRemote()).setServerURI(server).withResourceSpecifier(getResourceSpecifier()).build(); + if (isCasMultiplier()) { + endpoint.setIsCasMultiplier(true); + endpoint.setProcessParentLast(processParentLast); + if (poolSize > 1) { + endpoint.setShadowCasPoolSize(poolSize); + } + if (initialHeapSize > 0) { + endpoint.setInitialFsHeapSize(initialHeapSize); + } + endpoint.setDisableJCasCache(disableJCasCache); + } + } + private void configureEnvironmentVariables(EnvironmentVariablesType evt) { + for( EnvironmentVariableType ev : evt.getEnvironmentVariableArray() ) { + EnvironmentVariable envVariable = new EnvironmentVariable(ev.getName(), ev.getStringValue()); + envVariables.add(envVariable); + } + } + private int convertStringToint(String value, int defaultValue) { + try { + return Integer.valueOf(value); + } catch( Exception e ) { + return defaultValue; + } + } + private void configureErrorHandling(AsyncPrimitiveErrorConfigurationType topLevelErrorConfiguration) { + + } + private void configureAnalysisEngine(TopLevelAnalysisEngineType tlae) { + int replyQueueScaleout = convertStringToint(tlae.getInternalReplyQueueScaleout(),1); + int inputQueueScaleout = convertStringToint(tlae.getInputQueueScaleout(),1); + boolean async = false; // default + int scaleout = Objects.nonNull(tlae.getScaleout())? tlae.getScaleout().getNumberOfInstances() : 1; + if ( Objects.nonNull(tlae.getAsync()) ) { + async = Boolean.parseBoolean(tlae.getAsync()); + } + + decoratedComponent.withScaleout(scaleout) + .withReplyThreadPoolSize(replyQueueScaleout) + .withRequestThreadPoolSize(inputQueueScaleout); + // Component is async iff 'async=true' or dd has delegates + if ( !decoratedComponent.isPrimitive() && (async || Objects.nonNull(tlae.getDelegates())) ) { + decoratedComponent.enableAsync(); + } + +// String async = tlae.getAsync(); // true or false +// String replyQueueScaleout = tlae.getInternalReplyQueueScaleout(); +// String key = tlae.getKey(); +// int scaleout = tlae.getScaleout().getNumberOfInstances(); + } + public AggregateAnalysisEngineComponent aggregateComponent() throws InvalidObjectException{ + if ( !isAggregate() ) { + throw new InvalidObjectException("This component is not an aggregate"); + } + return (AggregateAnalysisEngineComponent)decoratedComponent; + } + public boolean isAggregate() { + return decoratedComponent instanceof AggregateAnalysisEngineComponent; + } + @Override + public boolean isPrimitive() { + return !isAggregate(); + } + private void configureCasPool(CasPoolType casPoolType) { + boolean disableJCasCache = casPoolType.getDisableJCasCache(); + int initialHeapSize = casPoolType.getInitialFsHeapSize(); + int poolSize = casPoolType.getNumberOfCASes(); + + casPool = new ComponentCasPool(disableJCasCache, initialHeapSize, poolSize); + + } + private void configureCasMultiplier(CasMultiplierType casMultiplierType ) { + poolSize = casMultiplierType.getPoolSize(); + disableJCasCache = casMultiplierType.getDisableJCasCache(); + initialHeapSize = Integer.parseInt(casMultiplierType.getInitialFsHeapSize()); + processParentLast = Boolean.parseBoolean(casMultiplierType.getProcessParentLast()); // true or false + } + public void addEnvVariable(String name, String value) { + envVariables.add(new EnvironmentVariable(name, value)); + } + public TopLevelServiceComponent withName(String name) { + this.name = name; + return this; + } + public TopLevelServiceComponent withDescription(String description) { + this.description = description; + return this; + } + public TopLevelServiceComponent withVersion(String version) { + this.version = version; + return this; + } + public TopLevelServiceComponent withVendor(String vendor) { + this.vendor = vendor; + return this; + } + public TopLevelServiceComponent withProtocol(String protocol) { + this.protocol = protocol; + return this; + } + public TopLevelServiceComponent withProvider(String provider) { + this.provider = provider; + return this; + } + + + public ComponentCasPool getComponentCasPool() { + if ( Objects.isNull(casPool)) { + return new ComponentCasPool(false, 1000, 1); + } else { + return casPool; + } + + } + + public String getName() { + return name; + } + public String getDescription() { + return description; + } + public String getVersion() { + return version; + } + public String getVendor() { + return vendor; + } + public String getProtocol() { + return protocol; + } + public String getProvider() { + return provider; + } + + @Override + public Object getConnector() { + return null; + } + + public class EnvironmentVariable { + String name; + String value; + + public EnvironmentVariable( String name, String value ) { + this.name = name; + this.value = value; + } + public String getName() { + return name; + } + + public String getValue() { + return value; + } + + + } +}