Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java Thu Dec 20 14:40:58 2018 @@ -23,6 +23,7 @@ import java.io.FileNotFoundException; import org.apache.uima.UIMAFramework; import org.apache.uima.UimaContext; +import org.apache.uima.aae.client.UimaAsynchronousEngine; import org.apache.uima.analysis_component.CasAnnotator_ImplBase; import org.apache.uima.analysis_engine.AnalysisEngineProcessException; import org.apache.uima.cas.CAS; @@ -44,9 +45,14 @@ public class NoOpAnnotator extends CasAn int cpcDelay = 0; + boolean addServiceTargetId = false; + + private String thisServiceTargetId = null; public void initialize(UimaContext aContext) throws ResourceInitializationException { super.initialize(aContext); - + // this can be null. The TargetSelectorProperty is optional + thisServiceTargetId = System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty); + if (getContext().getConfigParameterValue("FailDuringInitialization") != null) { throw new ResourceInitializationException(new FileNotFoundException("Simulated Exception")); } @@ -71,6 +77,9 @@ public class NoOpAnnotator extends CasAn if (getContext().getConfigParameterValue("FinalCount") != null) { finalCount = ((Integer) getContext().getConfigParameterValue("FinalCount")).intValue(); } + if (getContext().getConfigParameterValue("addServiceTargetId") != null) { + addServiceTargetId = ((Boolean) getContext().getConfigParameterValue("addServiceTargetId")).booleanValue(); + } // write log messages Logger logger = getContext().getLogger(); @@ -132,6 +141,10 @@ public class NoOpAnnotator extends CasAn throw new IndexOutOfBoundsException(); } } + if ( addServiceTargetId && thisServiceTargetId != null) { + aCAS.setDocumentText(thisServiceTargetId); + } + } catch (Exception e) { throw new AnalysisEngineProcessException(e); }
Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpCC.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpCC.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpCC.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpCC.java Thu Dec 20 14:40:58 2018 @@ -31,7 +31,7 @@ public class NoOpCC extends CasConsumer_ } public void processCas(CAS aCAS) throws ResourceProcessException { - System.out.println("NoOpCC process() called"); + //System.out.println("NoOpCC process() called"); } } Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Thu Dec 20 14:40:58 2018 @@ -33,12 +33,16 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.jms.Connection; import javax.jms.Message; @@ -84,6 +88,7 @@ import org.apache.uima.resource.Resource import org.apache.uima.resource.ResourceProcessException; import org.apache.uima.resource.ResourceSpecifier; import org.apache.uima.resource.metadata.ProcessingResourceMetaData; +import org.apache.uima.resource.metadata.TypePriorityList; import org.apache.uima.resourceSpecifier.factory.DeploymentDescriptorFactory; import org.apache.uima.resourceSpecifier.factory.ServiceContext; import org.apache.uima.resourceSpecifier.factory.UimaASPrimitiveDeploymentDescriptor; @@ -92,6 +97,7 @@ import org.apache.uima.util.XMLInputSour import org.junit.Test; import org.junit.runner.RunWith; import org.xml.sax.SAXException; +import org.xml.sax.SAXParseException; @RunWith(UimaASTestRunner.class) public class TestUimaASExtended extends BaseTestSupport { @@ -124,14 +130,219 @@ public class TestUimaASExtended extends return b.getDefaultSocketURIString(); } + /* + @Test + public void testBrokerRestartWithPrimitiveMultiplier() throws Exception { + System.out.println("-------------- testBrokerRestartWithPrimitiveMultiplier -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + + deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml"); + + + broker.stop(); + broker.waitUntilStopped(); + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(); + Map<String, Object> appCtx = + buildContext(burl, "TestMultiplierQueue"); + + // reduce the cas pool size and reply window + appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); + appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2)); + runTest(appCtx, eeUimaEngine,burl, + "TestMultiplierQueue", 1, PROCESS_LATCH); + + eeUimaEngine.stop(); + } +*/ + + /** + * This test starts a secondary broker, starts NoOp Annotator, and + * using synchronous sendAndReceive() sends 5 CASes for analysis. Before sending 6th, the test + * stops the secondary broker and sends 5 more CASes. All CASes sent after + * the broker shutdown result in GetMeta ping and a subsequent timeout. Before + * sending 11th CAS the test starts the broker again and sends 10 more CASes + * @throws Exception + */ + @Test + public void testSyncClientRecoveryFromBrokerStopAndRestart() throws Exception { + System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------"); + //System.setProperty("uima.as.enable.jmx","false"); + // Instantiate Uima AS Client + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + BrokerService broker2 = setupSecondaryBroker(true); + // Deploy Uima AS Primitive Service + deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); + + Map<String, Object> appCtx = + buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + int errorCount=0; + int delay = 1; + for (int i = 0; i < 20; i++) { + + if ( i == 5 ) { + broker2.stop(); + broker2.waitUntilStopped(); + System.out.println("..... Stopped broker ............................"); + Timer timer = new Timer(); + timer.schedule(new StartBrokerTask(broker2, this),500); + delay = 200; + } + + synchronized(appCtx) { + try { + appCtx.wait(delay); + } catch( InterruptedException eee) { + + } + } + /* + else if ( i == 10 ) { + // restart the broker + System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); + broker2 = setupSecondaryBroker(true); + + broker2.start(); + broker2.waitUntilStarted(); + + } + */ + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + errorCount++; + System.out.println("Client Received Expected Error on CAS:"+(i+1)); + } finally { + cas.release(); + } + } + + uimaAsEngine.stop(); + super.cleanBroker(broker2); + + broker2.stop(); + + // expecting 5 failures due to broker missing +// if ( errorCount != 5 ) { +// fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures"); +// } + broker2.waitUntilStopped(); + + } + /** + * This test creates 4 UIMA AS clients and runs each in a separate thread. There is a single + * shared jms connection to a broker that each client uses. After initialization a client + * sends 1000 CASes to a remote service. While clients are processing the test kills + * the broker, waits for 4 seconds and restarts it. While the broker is down, clients + * keep trying sending CASes, receiving Ping timeouts. Once the broker is available again + * all clients should recover and begin processing CASes again. This tests recovery of a + * shared connection. + * + * @throws Exception + */ + @Test + public void testMultipleSyncClientsRecoveryFromBrokerStopAndRestart() throws Exception { + System.out.println("-------------- testMultipleSyncClientsRecoveryFromBrokerStopAndRestart -------------"); + //System.setProperty("uima.as.enable.jmx","false"); + final BrokerService broker2 = setupSecondaryBroker(true); + final CountDownLatch latch = new CountDownLatch(8); + Thread[] clientThreads = new Thread[8]; + + // Create 4 Uima AS clients each running in a separate thread + for(int i=0; i < 8; i++) { + clientThreads[i] = new Thread() { + public void run() { + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + try { + // Deploy Uima AS Primitive Service + deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); + Map<String, Object> appCtx = + buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); + + appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + for (int i = 0; i < 500; i++) { + if ( i == 5 ) { + latch.countDown(); // indicate that some CASes were processed + } + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); +// System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service"); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + System.out.println("Client Received Expected Error on CAS:"+(i+1)); + } finally { + cas.release(); + } + synchronized(uimaAsEngine) { + uimaAsEngine.wait(100); + } + } + System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()"); + uimaAsEngine.stop(); + } catch( Exception e) { + e.printStackTrace(); + return; + } + } + }; + clientThreads[i].start(); + } + BrokerService broker3 = null; + try { + latch.await(); // wait for all threads to process a few CASes + + broker2.stop(); + System.out.println("Stopping Broker - wait ..."); + broker2.waitUntilStopped(); + + System.out.println("Restarting Broker - wait ..."); + // restart the broker + broker3 = setupSecondaryBroker(true); + broker3.waitUntilStarted(); + + } catch ( Exception e ) { + + } finally { + for(int i=0; i < 4; i++ ) { + clientThreads[i].join(); + } + System.out.println("Stopping Broker - wait ..."); + if ( broker3 != null ) { + super.cleanBroker(broker3); + + broker3.stop(); + broker3.waitUntilStopped(); + + } + } + } @Test public void testClient() throws Exception { - System.out.println("-------------- testClientRecoveryFromBrokerFailure -------------"); + System.out.println("-------------- testClient -------------"); System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); BaseUIMAAsynchronousEngine_impl uimaAsEngine1 = new BaseUIMAAsynchronousEngine_impl(); BaseUIMAAsynchronousEngine_impl uimaAsEngine2 = new BaseUIMAAsynchronousEngine_impl(); - + File directory = new File("./"); + System.out.println(directory.getAbsolutePath()); String sid1= deployService(uimaAsEngine1, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml"); String sid2 = deployService(uimaAsEngine2, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml"); @@ -142,7 +353,7 @@ public class TestUimaASExtended extends @Test public void testClientWithPrimitives() throws Exception { - System.out.println("-------------- testClientRecoveryFromBrokerFailure -------------"); + System.out.println("-------------- testClientWithPrimitives -------------"); System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); BaseUIMAAsynchronousEngine_impl uimaAsEngine1 = new BaseUIMAAsynchronousEngine_impl(); @@ -244,7 +455,7 @@ public class TestUimaASExtended extends } } - */ + @Test public void testBrokerRestartWithAggregateMultiplier() throws Exception { System.out.println("-------------- testBrokerRestartWithAggregateMultiplier -------------"); @@ -280,9 +491,9 @@ public class TestUimaASExtended extends appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2)); runTest(appCtx, eeUimaEngine,burl, "TopLevelTaeQueue", 1, PROCESS_LATCH); - eeUimaEngine.stop(); + // eeUimaEngine.stop(); } - + */ /** * Tests client and service recovery from broker restart. It deploys CM service, dispatches * a CAS for processing and while the CAS is in process, it bounces a broker. The service @@ -353,6 +564,8 @@ public class TestUimaASExtended extends } + s.shutdownNow(); + s.awaitTermination(10, TimeUnit.HOURS); uimaAsEngine.stop(); } @@ -415,6 +628,8 @@ public class TestUimaASExtended extends } + s.shutdownNow(); + s.awaitTermination(10, TimeUnit.HOURS); uimaAsEngine.stop(); @@ -431,6 +646,7 @@ public class TestUimaASExtended extends * * @throws Exception */ + @Test public void testServiceWithHttpListeners() throws Exception { System.out.println("-------------- testServiceWithHttpListeners -------------"); @@ -442,9 +658,10 @@ public class TestUimaASExtended extends c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888")); c.setDestinationName("TestQ"); c.setConcurrentConsumers(2); - c.setBeanName("TestBean"); + c.setBeanName("testServiceWithHttpListeners() - JUnit Test Listener"); c.setMessageListener(new JmsInputChannel()); - c.initialize(); + //c.initialize(); + //c.afterPropertiesSet(); c.start(); if ( c.isRunning() ) { @@ -465,49 +682,20 @@ public class TestUimaASExtended extends if ( c.failed() ) { fail("Broker Failed - Reason:"+c.getReasonForFailure()); } else { - System.out.println("Stopping Listener"); - c.stop(); - - } + } } catch( Exception e) { e.printStackTrace(); fail(e.getMessage()); } - } - - + System.out.println("Stopping Listener"); + c.stop(); + c.shutdown(); - @Test - public void testBrokerRestartWithPrimitiveMultiplier() throws Exception { - System.out.println("-------------- testBrokerRestartWithPrimitiveMultiplier -------------"); - System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); - - BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); - - deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml"); - - - broker.stop(); - broker.waitUntilStopped(); - - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - - String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(); - Map<String, Object> appCtx = - buildContext(burl, "TestMultiplierQueue"); - - // reduce the cas pool size and reply window - appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); - appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2)); - runTest(appCtx, eeUimaEngine,burl, - "TestMultiplierQueue", 1, PROCESS_LATCH); - - eeUimaEngine.stop(); } + + /* public void testContinueOnRetryFailure2() throws Exception { @@ -541,92 +729,92 @@ public class TestUimaASExtended extends /* * Tests */ - @Test - public void testSyncClientRecoveryFromBrokerStopAndRestart3() throws Exception { - System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------"); - System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); - - // Instantiate Uima AS Client - BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); - //BrokerService broker2 = setupSecondaryBroker(true); - // Deploy Uima AS Primitive Service - deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); - - Map<String, Object> appCtx = - buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue"); - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); - appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); - appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); - initialize(uimaAsEngine, appCtx); - waitUntilInitialized(); - - - broker.stop(); - broker.waitUntilStopped(); - - //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); - //broker2 = setupSecondaryBroker(true); - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - int errorCount = 0; - System.out.println("Sending CASes"); - for (int i = 0; i < 60; i++) { - CAS cas = uimaAsEngine.getCAS(); - cas.setDocumentText("Some Text"); - try { - uimaAsEngine.sendAndReceiveCAS(cas); - } catch( Exception e) { - System.out.println("Client Received Expected Error on CAS:"+(i+1)); - } finally { - cas.release(); - } - - - } - uimaAsEngine.stop(); - - /* - int errorCount=0; - for (int i = 0; i < 20; i++) { - - if ( i == 5 ) { - broker2.stop(); - broker2.waitUntilStopped(); - } else if ( i == 10 ) { - // restart the broker - System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); - broker2 = setupSecondaryBroker(true); - - broker2.start(); - broker2.waitUntilStarted(); - - } - CAS cas = uimaAsEngine.getCAS(); - cas.setDocumentText("Some Text"); - // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); - try { - uimaAsEngine.sendAndReceiveCAS(cas); - } catch( Exception e) { - errorCount++; - System.out.println("Client Received Expected Error on CAS:"+(i+1)); - } finally { - cas.release(); - } - } - - uimaAsEngine.stop(); - super.cleanBroker(broker2); - - broker2.stop(); - - // expecting 5 failures due to broker missing - if ( errorCount != 5 ) { - fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures"); - } - broker2.waitUntilStopped(); -*/ - } +// @Test +// public void testSyncClientRecoveryFromBrokerStopAndRestart3() throws Exception { +// System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------"); +// System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); +// +// // Instantiate Uima AS Client +// BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); +// //BrokerService broker2 = setupSecondaryBroker(true); +// // Deploy Uima AS Primitive Service +// deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); +// +// Map<String, Object> appCtx = +// buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue"); +// appCtx.put(UimaAsynchronousEngine.Timeout, 1100); +// appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); +// appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); +// initialize(uimaAsEngine, appCtx); +// waitUntilInitialized(); +// +// +// broker.stop(); +// broker.waitUntilStopped(); +// +// //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); +// //broker2 = setupSecondaryBroker(true); +// broker = createBroker(); +// broker.start(); +// broker.waitUntilStarted(); +// int errorCount = 0; +// System.out.println("Sending CASes"); +// for (int i = 0; i < 60; i++) { +// CAS cas = uimaAsEngine.getCAS(); +// cas.setDocumentText("Some Text"); +// try { +// uimaAsEngine.sendAndReceiveCAS(cas); +// } catch( Exception e) { +// System.out.println("Client Received Expected Error on CAS:"+(i+1)); +// } finally { +// cas.release(); +// } +// +// +// } +// uimaAsEngine.stop(); +// +// /* +// int errorCount=0; +// for (int i = 0; i < 20; i++) { +// +// if ( i == 5 ) { +// broker2.stop(); +// broker2.waitUntilStopped(); +// } else if ( i == 10 ) { +// // restart the broker +// System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); +// broker2 = setupSecondaryBroker(true); +// +// broker2.start(); +// broker2.waitUntilStarted(); +// +// } +// CAS cas = uimaAsEngine.getCAS(); +// cas.setDocumentText("Some Text"); +// // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); +// try { +// uimaAsEngine.sendAndReceiveCAS(cas); +// } catch( Exception e) { +// errorCount++; +// System.out.println("Client Received Expected Error on CAS:"+(i+1)); +// } finally { +// cas.release(); +// } +// } +// +// uimaAsEngine.stop(); +// super.cleanBroker(broker2); +// +// broker2.stop(); +// +// // expecting 5 failures due to broker missing +// if ( errorCount != 5 ) { +// fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures"); +// } +// broker2.waitUntilStopped(); +//*/ +// } /* @@ -705,12 +893,33 @@ public class TestUimaASExtended extends deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); deployService(eeUimaEngine, relativePath + "/Deploy_SyncAggregateWithJmsService.xml"); runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue", - 0, PROCESS_LATCH); + 1, PROCESS_LATCH); } catch( Exception e ) { throw e; } } + /** + * Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from + * RunAE or RunCPE. + * + * @throws Exception + */ + @Test + public void testJmsServiceAdapterInAsyncAggregate() throws Exception { + Logger.getLogger(this.getClass()).info("-------------- testJmsServiceAdapter -------------"); + //setUp(); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + try { + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + deployService(eeUimaEngine, relativePath + "/Deploy_AsyncAggregateWithJmsService.xml"); + runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue", + 1, PROCESS_LATCH); + + } catch( Exception e ) { + throw e; + } + } /* * Tests Uima AS client placeholder handling and substitution. The Uima Aggregate instantiates * UIMA AS client proxy using Jms Client Descriptor that contains a placeholder @@ -1059,7 +1268,28 @@ public class TestUimaASExtended extends public void testSendAndReceive() throws Exception { BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); - deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + + /* + String id = deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + deployService(uimaAsEngine, relativePath + "/Deploy_AggregateAnnotator.xml"); + + + Map<String, Object> appCtx = buildContext(getMasterConnectorURI(broker),"MeetingDetectorTaeQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + //appCtx.put(UimaAsynchronousEngine.TargetSelectorProperty, "MeetingDetector2"); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + for (int i = 0; i < 15; i++) { + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + System.out.println(".... Sending CAS "+i); + uimaAsEngine.sendAndReceiveCAS(cas); + cas.release(); + } + */ + + String id = deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); deployService(uimaAsEngine, relativePath + "/Deploy_AggregateAnnotator.xml"); // Deploy Uima AS Primitive Service // deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); @@ -1081,7 +1311,7 @@ public class TestUimaASExtended extends for( AnalysisEnginePerformanceMetrics m :componentMetricsList ) { System.out.println(".............. Component:"+m.getName()+" AnalysisTime:"+m.getAnalysisTime()); } - uimaAsEngine.sendCAS(cas); + //uimaAsEngine.sendCAS(cas); System.out.println("----------------------------------------------------"); componentMetricsList.clear(); } catch( Exception e) { @@ -1092,6 +1322,7 @@ public class TestUimaASExtended extends } } uimaAsEngine.stop(); + } @Test public void testMultipleSyncClientsWithMultipleBrokers() throws Exception { @@ -1347,120 +1578,12 @@ public class TestUimaASExtended extends // Deploy Uima AS Primitive Service deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); Map<String, Object> appCtx = buildContext("failover:("+System.getProperty("BrokerURL")+","+getBrokerUri()+")?randomize=false","NoOpAnnotatorQueue"); - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); - appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); - initialize(uimaAsEngine, appCtx); - waitUntilInitialized(); - int errorCount = 0; - for (int i = 0; i < 15; i++) { - CAS cas = uimaAsEngine.getCAS(); - cas.setDocumentText("Some Text"); - // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); - try { - uimaAsEngine.sendAndReceiveCAS(cas); - } catch( Exception e) { - errorCount++; - } finally { - cas.release(); - } - } - uimaAsEngine.stop(); - super.cleanBroker(broker2); - - broker2.stop(); - broker2.waitUntilStopped(); - - } - - /** - * This test starts a secondary broker, starts NoOp Annotator, and - * using synchronous sendAndReceive() sends 10 CASes for analysis. Before sending 11th, the test - * stops the secondary broker and sends 5 more CASes. All CASes sent after - * the broker shutdown result in GetMeta ping and a subsequent timeout. - * @throws Exception - */ - - @Test - public void testSyncClientRecoveryFromBrokerStop() throws Exception { - System.out.println("-------------- testSyncClientRecoveryFromBrokerStop -------------"); - // Instantiate Uima AS Client - BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); - BrokerService broker2 = setupSecondaryBroker(true); - // Deploy Uima AS Primitive Service - deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); - Map<String, Object> appCtx = - buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); - appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 1100); - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); - appCtx.put(UimaAsynchronousEngine.CpcTimeout, 300); - initialize(uimaAsEngine, appCtx); - waitUntilInitialized(); - int errorCount = 0; - for (int i = 0; i < 15; i++) { - - if ( i == 10 ) { - // Stop the broker - broker2.stop(); - broker2.waitUntilStopped(); - } - CAS cas = uimaAsEngine.getCAS(); - cas.setDocumentText("Some Text"); - // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); - try { - uimaAsEngine.sendAndReceiveCAS(cas); - } catch( Exception e) { - errorCount++; - System.out.println("Client Received Expected Error on CAS:"+(i+1)+" ErrorCount:"+errorCount); - } finally { - cas.release(); - } - } - - uimaAsEngine.stop(); - // expecting 5 failures due to broker missing - if ( errorCount != 5 ) { - fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures"); - } - } - /** - * This test starts a secondary broker, starts NoOp Annotator, and - * using synchronous sendAndReceive() sends 5 CASes for analysis. Before sending 6th, the test - * stops the secondary broker and sends 5 more CASes. All CASes sent after - * the broker shutdown result in GetMeta ping and a subsequent timeout. Before - * sending 11th CAS the test starts the broker again and sends 10 more CASes - * @throws Exception - */ - @Test - public void testSyncClientRecoveryFromBrokerStopAndRestart() throws Exception { - System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------"); - // Instantiate Uima AS Client - BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); - BrokerService broker2 = setupSecondaryBroker(true); - // Deploy Uima AS Primitive Service - deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); - - Map<String, Object> appCtx = - buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); - appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); - appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); - initialize(uimaAsEngine, appCtx); - waitUntilInitialized(); - int errorCount=0; - for (int i = 0; i < 20; i++) { - - if ( i == 5 ) { - broker2.stop(); - broker2.waitUntilStopped(); - } else if ( i == 10 ) { - // restart the broker - System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); - broker2 = setupSecondaryBroker(true); - - broker2.start(); - broker2.waitUntilStarted(); - - } + appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + int errorCount = 0; + for (int i = 0; i < 15; i++) { CAS cas = uimaAsEngine.getCAS(); cas.setDocumentText("Some Text"); // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); @@ -1468,195 +1591,134 @@ public class TestUimaASExtended extends uimaAsEngine.sendAndReceiveCAS(cas); } catch( Exception e) { errorCount++; - System.out.println("Client Received Expected Error on CAS:"+(i+1)); } finally { cas.release(); } } - uimaAsEngine.stop(); super.cleanBroker(broker2); broker2.stop(); - - // expecting 5 failures due to broker missing - if ( errorCount != 5 ) { - fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures"); - } broker2.waitUntilStopped(); } - /** - * This test creates 4 UIMA AS clients and runs each in a separate thread. There is a single - * shared jms connection to a broker that each client uses. After initialization a client - * sends 1000 CASes to a remote service. While clients are processing the test kills - * the broker, waits for 4 seconds and restarts it. While the broker is down, clients - * keep trying sending CASes, receiving Ping timeouts. Once the broker is available again - * all clients should recover and begin processing CASes again. This tests recovery of a - * shared connection. - * - * @throws Exception - */ - @Test - public void testMultipleSyncClientsRecoveryFromBrokerStopAndRestart() throws Exception { - System.out.println("-------------- testMultipleSyncClientsRecoveryFromBrokerStopAndRestart -------------"); - final BrokerService broker2 = setupSecondaryBroker(true); - final CountDownLatch latch = new CountDownLatch(4); - Thread[] clientThreads = new Thread[4]; - - // Create 4 Uima AS clients each running in a separate thread - for(int i=0; i < 4; i++) { - clientThreads[i] = new Thread() { - public void run() { - BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); - try { - // Deploy Uima AS Primitive Service - deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); - Map<String, Object> appCtx = - buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); - - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); - appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); - initialize(uimaAsEngine, appCtx); - waitUntilInitialized(); - for (int i = 0; i < 500; i++) { - if ( i == 5 ) { - latch.countDown(); // indicate that some CASes were processed - } - CAS cas = uimaAsEngine.getCAS(); - cas.setDocumentText("Some Text"); -// System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service"); - try { - uimaAsEngine.sendAndReceiveCAS(cas); - } catch( Exception e) { - System.out.println("Client Received Expected Error on CAS:"+(i+1)); - } finally { - cas.release(); - } - synchronized(uimaAsEngine) { - uimaAsEngine.wait(100); - } - } - System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()"); - uimaAsEngine.stop(); - } catch( Exception e) { - e.printStackTrace(); - return; - } - } - }; - clientThreads[i].start(); - } - BrokerService broker3 = null; - try { - latch.await(); // wait for all threads to process a few CASes - - broker2.stop(); - System.out.println("Stopping Broker - wait ..."); - broker2.waitUntilStopped(); - - System.out.println("Restarting Broker - wait ..."); - // restart the broker - broker3 = setupSecondaryBroker(true); - broker3.waitUntilStarted(); - - } catch ( Exception e ) { - - } finally { - for(int i=0; i < 4; i++ ) { - clientThreads[i].join(); - } - System.out.println("Stopping Broker - wait ..."); - if ( broker3 != null ) { - super.cleanBroker(broker3); - - broker3.stop(); - broker3.waitUntilStopped(); - - } - } -} + /** * This test starts a secondary broker, starts NoOp Annotator, and - * using asynchronous send() sends a total of 15 CASes for analysis. After processing 11th - * the test stops the secondary broker and sends 4 more CASes which fails due to broker not running. - * + * using synchronous sendAndReceive() sends 10 CASes for analysis. Before sending 11th, the test + * stops the secondary broker and sends 5 more CASes. All CASes sent after + * the broker shutdown result in GetMeta ping and a subsequent timeout. * @throws Exception */ + @Test - public void testAsyncClientRecoveryFromBrokerStop() throws Exception { - System.out.println("-------------- testAsyncClientRecoveryFromBrokerStop -------------"); - - BrokerService broker2 = setupSecondaryBroker(true); - // Instantiate Uima AS Client + public void testSyncClientRecoveryFromBrokerStop() throws Exception { + System.out.println("-------------- testSyncClientRecoveryFromBrokerStop -------------"); + //System.setProperty("uima.as.enable.jmx","false"); + // Instantiate Uima AS Client BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + BrokerService broker2 = setupSecondaryBroker(true); + // Deploy Uima AS Primitive Service deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); Map<String, Object> appCtx = buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); - appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.Timeout, 0); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 300); + appCtx.put(UimaAsynchronousEngine.CasPoolSize,15); initialize(uimaAsEngine, appCtx); waitUntilInitialized(); - + int errorCount = 0; for (int i = 0; i < 15; i++) { if ( i == 10 ) { + // Stop the broker broker2.stop(); broker2.waitUntilStopped(); - + Timer timer = new Timer(); + timer.schedule(new StartBrokerTask(broker2, this),10000); } CAS cas = uimaAsEngine.getCAS(); cas.setDocumentText("Some Text"); - // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); - uimaAsEngine.sendCAS(cas); + // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + errorCount++; + System.out.println("Client Received Expected Error on CAS:"+(i+1)+" ErrorCount:"+errorCount); + } finally { + cas.release(); + } } - + uimaAsEngine.stop(); - super.cleanBroker(broker2); + // expecting 5 failures due to broker missing +// if ( errorCount != 5 ) { +// fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures"); +// } broker2.stop(); broker2.waitUntilStopped(); } - + @Test public void testAsyncClientRecoveryFromBrokerStopAndRestart() throws Exception { System.out.println("-------------- testAsyncClientRecoveryFromBrokerStopAndRestart -------------"); - BrokerService broker2 = setupSecondaryBroker(true); + BrokerService broker2 = setupSecondaryBroker(true); // Instantiate Uima AS Client BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); // Deploy Uima AS Primitive Service - deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); + String id = deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); + String brokerUri = broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(); Map<String, Object> appCtx = - buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); + buildContext(brokerUri, "NoOpAnnotatorQueue"); - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.Timeout, 200); appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 200); initialize(uimaAsEngine, appCtx); waitUntilInitialized(); - - for (int i = 0; i < 150; i++) { + int delay = 1; + for (int i = 0; i < 50; i++) { if ( i == 10 ) { broker2.stop(); broker2.waitUntilStopped(); + System.out.println("..... Stopped broker ............................"); + Timer timer = new Timer(); + timer.schedule(new StartBrokerTask(broker2, this),500); + delay = 200; + } + synchronized(appCtx) { + try { + appCtx.wait(delay); + } catch( InterruptedException eee) { + + } + } +/* } else if ( i == 15 ) { broker2 = setupSecondaryBroker(true); broker2.waitUntilStarted(); + System.out.println("..... Restarted broker ............................"); } + + */ CAS cas = uimaAsEngine.getCAS(); cas.setDocumentText("Some Text"); - // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); + System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); uimaAsEngine.sendCAS(cas); } + uimaAsEngine.stop(); + super.cleanBroker(broker2); broker2.stop(); broker2.waitUntilStopped(); - } /** @@ -1676,6 +1738,7 @@ public class TestUimaASExtended extends * @throws Exception */ + /* @Test public void testMultipleClientsRecoveryFromBrokerStopAndRestart() throws Exception { System.out.println("-------------- testMultipleClientsRecoveryFromBrokerStopAndRestart -------------"); @@ -1690,6 +1753,7 @@ public class TestUimaASExtended extends appCtx.put(UimaAsynchronousEngine.Timeout, 1100); appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 400); initialize(uimaClient1, appCtx); waitUntilInitialized(); @@ -1701,6 +1765,7 @@ public class TestUimaASExtended extends appCtx2.put(UimaAsynchronousEngine.Timeout, 1100); appCtx2.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx2.put(UimaAsynchronousEngine.GetMetaTimeout, 400); initialize(uimaClient2, appCtx2); waitUntilInitialized(); @@ -1720,9 +1785,11 @@ public class TestUimaASExtended extends } CAS cas = uimaClient1.getCAS(); cas.setDocumentText("Some Text"); - // System.out.println("UIMA AS Client#1 Sending CAS#" + (i + 1) + " Request to a Service"); + System.out.println("UIMA AS Client#1 Sending CAS#" + (i + 1) + " Request to a Service"); try { uimaClient1.sendAndReceiveCAS(cas); + System.out.println("UIMA AS Client#1 Received Reply For CAS:"+(i+1)); + } catch( Exception e) { errorCount++; System.out.println("UIMA AS Client#1 Received Expected Error on CAS:"+(i+1)); @@ -1730,11 +1797,14 @@ public class TestUimaASExtended extends cas.release(); } } + System.out.println("Done with UIMA AS Client#1"); + for (int i = 0; i < 4; i++) { CAS cas = uimaClient2.getCAS(); cas.setDocumentText("Some Text"); - // System.out.println("UIMA AS Client#2 Sending CAS#" + (i + 1) + " Request to a Service"); + System.out.println("UIMA AS Client#2 Sending CAS#" + (i + 1) + " Request to a Service"); try { + uimaClient2.sendAndReceiveCAS(cas); } catch( Exception e) { errorCount++; @@ -1752,6 +1822,7 @@ public class TestUimaASExtended extends broker2.waitUntilStopped(); } + */ /** * Tests ability of an aggregate to recover from a Broker restart. The broker managing * delegate's input queue is stopped after 1st CAS is fully processed. As part of error @@ -1770,6 +1841,49 @@ public class TestUimaASExtended extends } /** + * This test starts a secondary broker, starts NoOp Annotator, and + * using asynchronous send() sends a total of 15 CASes for analysis. After processing 11th + * the test stops the secondary broker and sends 4 more CASes which fails due to broker not running. + * + * @throws Exception + */ + @Test + public void testAsyncClientRecoveryFromBrokerStop() throws Exception { + System.out.println("-------------- testAsyncClientRecoveryFromBrokerStop -------------"); + + BrokerService broker2 = setupSecondaryBroker(true); + // Instantiate Uima AS Client + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); + Map<String, Object> appCtx = + buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.CasPoolSize, 15); + + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + + for (int i = 0; i < 15; i++) { + + if ( i == 10 ) { + broker2.stop(); + broker2.waitUntilStopped(); + + } + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); + uimaAsEngine.sendCAS(cas); + } + + uimaAsEngine.stop(); + super.cleanBroker(broker2); + broker2.stop(); + broker2.waitUntilStopped(); + + } +/** * Tests ability of an aggregate to recover from a Broker restart. The broker managing * delegate's input queue is stopped after 1st CAS is fully processed. As part of error * handling the listener on delegate temp reply queue is stopped and a delegate marked @@ -1917,7 +2031,20 @@ public class TestUimaASExtended extends System.clearProperty("DefaultBrokerURL"); uimaAsEngine.stop(); } - +private class Killer { + Timer timer; + public Killer(int secs) { + timer = new Timer(); + timer.schedule(new KillerTask(), secs*1000); + } + class KillerTask extends TimerTask { + public void run() { + System.out.println("----------------- KillerTask calling System.exit()"); + timer.cancel(); + System.exit(0); + } + } +} @Test public void testClientProcess() throws Exception { System.out.println("-------------- testClientProcess -------------"); @@ -1931,6 +2058,8 @@ public class TestUimaASExtended extends appCtx.put(UimaAsynchronousEngine.Timeout, 0); appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); appCtx.put(UimaAsynchronousEngine.CasPoolSize,2); + appCtx.put(UimaAsynchronousEngine.SERIALIZATION_STRATEGY, "xmi"); + initialize(uimaAsEngine, appCtx); waitUntilInitialized(); @@ -2106,6 +2235,86 @@ public class TestUimaASExtended extends "PersonTitleAnnotatorQueue", 0, EXCEPTION_LATCH); } @Test + public void testDeployPrimitiveServiceWithTargeting() throws Exception { + System.out.println("-------------- testDeployPrimitiveService -------------"); + // Instantiate Uima-AS Client + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,"ServiceOne"); + // Deploy Uima-AS Primitive Service + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithTargetingSupport.xml"); + + System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,"ServiceTwo"); + // Deploy Uima-AS Primitive Service + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithTargetingSupport.xml"); + + System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,"ServiceThree"); + // Deploy Uima-AS Primitive Service + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotatorWithTargetingSupport.xml"); + + System.getProperties().remove(UimaAsynchronousEngine.TargetSelectorProperty); + Map<String, Object> appCtx = buildContext(getMasterConnectorURI(broker), + "NoOpAnnotatorQueue"); + int toProcess = 10; + try { + initialize(eeUimaEngine, appCtx); + waitUntilInitialized(); + String tsId = "ServiceOne"; + + for( int i=0; i < 10; i++ ) { + CAS cas = eeUimaEngine.getCAS(); + //cas.setDocumentText(""); + eeUimaEngine.sendAndReceiveCAS(cas, null, tsId); + String serviceIdWhereCasWasProcessed = cas.getDocumentText(); + if ( !serviceIdWhereCasWasProcessed.equals(tsId)) { + fail("Received Reply from a Wrong Service - Expected: "+tsId+" Instead Received "+serviceIdWhereCasWasProcessed); + } + cas.release(); + } + + tsId = "ServiceTwo"; + for( int i=0; i < 10; i++ ) { + CAS cas = eeUimaEngine.getCAS(); + //cas.setDocumentText(""); + eeUimaEngine.sendAndReceiveCAS(cas, null, tsId); + String serviceIdWhereCasWasProcessed = cas.getDocumentText(); + if ( !serviceIdWhereCasWasProcessed.equals(tsId)) { + fail("Received Reply from a Wrong Service - Expected: "+tsId+" Instead Received "+serviceIdWhereCasWasProcessed); + } + cas.release(); + } + + tsId = "ServiceThree"; + + + for( int i=0; i < toProcess; i++ ) { + CAS cas = eeUimaEngine.getCAS(); + //cas.setDocumentText(""); + eeUimaEngine.sendCAS(cas, tsId); + //cas.release(); + } + } catch( Exception e) { + e.printStackTrace(); + } finally { + + while( getNumberOfCASesProcessed() < toProcess ) { + if ( unexpectedException ) { + fail("Service Targeting Failed"); + break; + } + synchronized(this) { + wait(100); + } + } + eeUimaEngine.stop(); + } +// synchronized(this) { +// wait(0); +// } + + // runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), + // "PersonTitleAnnotatorQueue", 0, EXCEPTION_LATCH); + } + @Test public void testDeployPrimitiveServiceWithInitFailure() throws Exception { System.out.println("-------------- testDeployPrimitiveServiceWithInitFailure -------------"); // Instantiate Uima-AS Client @@ -2385,7 +2594,7 @@ public class TestUimaASExtended extends Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), // Map<String, Object> appCtx = buildContext("tcp://localhost:61616", "TopLevelTaeQueue"); - appCtx.put(UimaAsynchronousEngine.Timeout, 1000); + appCtx.put(UimaAsynchronousEngine.Timeout, 0); appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0); addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); @@ -2394,6 +2603,51 @@ public class TestUimaASExtended extends runTest(appCtx, eeUimaEngine, "tcp://localhost:61616", "TopLevelTaeQueue", 1, PROCESS_LATCH); } + + @Test + public void testDeployAggregateServiceWithDoctype() throws Exception { + System.out.println("-------------- testDeployAggregateServiceWithDoctype -------------"); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); + + try { + deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotatorWithDOCTYPE.xml"); + } catch( Exception e ) { + if ( e.getMessage() != null && + e.getMessage().indexOf("disallow-doctype-decl") > 0 ) { + System.out.println("---- Detected expected error during parsing of a deployment descriptor - SUCCESS ----"); + return; // success - detected expected error + } + } + fail("This test should have failed with SAXParseException - Instead no error was detected"); + } + + @Test + public void testAggregateTypePriorities() throws Exception { + System.out.println("-------------- testAggregateTypePriorities -------------"); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml"); + + Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), +// Map<String, Object> appCtx = buildContext("tcp://localhost:61616", + "TopLevelTaeQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 1000); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0); + initialize(eeUimaEngine, appCtx); + + waitUntilInitialized(); + + ProcessingResourceMetaData meta = eeUimaEngine.getMetaData(); + TypePriorityList[] pl = meta.getTypePriorities().getPriorityLists(); + for( TypePriorityList tp : pl ) { + String[] typeArray = tp.getTypes(); + Assert.assertEquals(true, typeArray[0].equals("uima.cas.TOP")); + Assert.assertEquals(true, typeArray[1].equals("uima.tcas.Annotation")); + } + eeUimaEngine.stop(); + } /** * Sends total of 10 CASes to async aggregate configured to process 2 CASes at a time. * The inner NoOp annotator is configured to sleep for 5 seconds. The client should @@ -2945,13 +3199,13 @@ public class TestUimaASExtended extends try { runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), - "PersonTitleAnnotatorQueue", 500, EXCEPTION_LATCH); + "PersonTitleAnnotatorQueue", 1000, EXCEPTION_LATCH); } catch (RuntimeException e) { System.out.println(">>> runtest generated exception: " + e); e.printStackTrace(System.out); } super.countPingRetries=false; - + // eeUimaEngine.stop(); } /** @@ -3435,6 +3689,17 @@ public class TestUimaASExtended extends BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml"); + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml"); + + Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), + "TopLevelTaeQueue"); + appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); + appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1)); + runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), + "TopLevelTaeQueue", 1, PROCESS_LATCH); + + /* broker.stop(); broker.waitUntilStopped(); @@ -3443,17 +3708,24 @@ public class TestUimaASExtended extends broker = createBroker(); broker.start(); broker.waitUntilStarted(); +*/ - deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); - deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml"); + +/* + + appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); + appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(1)); + runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), + "TestMultiplierQueue", 1, PROCESS_LATCH); + String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(); Map<String, Object> appCtx = buildContext(burl, "TopLevelTaeQueue"); - +*/ // Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), // "TopLevelTaeQueue"); - +/* broker.stop(); broker.waitUntilStopped(); @@ -3462,8 +3734,8 @@ broker.waitUntilStopped(); broker = createBroker(); broker.start(); broker.waitUntilStarted(); - - +*/ +/* // reduce the cas pool size and reply window appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2)); @@ -3471,7 +3743,8 @@ broker.waitUntilStarted(); runTest(appCtx, eeUimaEngine,burl, "TopLevelTaeQueue", 1, PROCESS_LATCH); - + eeUimaEngine.stop(); + */ } @Test public void testClientProcessWithRemoteMultiplier() throws Exception { @@ -4349,5 +4622,27 @@ broker.waitUntilStarted(); } } - + class StartBrokerTask extends TimerTask { + BrokerService broker = null; + TestUimaASExtended testSuiteRef = null; + StartBrokerTask(BrokerService broker, TestUimaASExtended testSuiteRef) { + this.broker = broker; + this.testSuiteRef = testSuiteRef; + } + @Override + public void run() { + try { + broker = testSuiteRef.setupSecondaryBroker(true); + broker.waitUntilStarted(); + System.out.println("..... Restarted broker ............................"); + + } catch( Exception e ) { + throw new RuntimeException(e); + + } + + } + + + } } Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java Thu Dec 20 14:40:58 2018 @@ -23,12 +23,16 @@ import java.io.IOException; import java.lang.reflect.Method; import java.net.BindException; import java.net.URI; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.Semaphore; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.management.MBeanServer; +import javax.management.ObjectInstance; import junit.framework.TestCase; @@ -274,6 +278,14 @@ public class ActiveMQSupport extends Tes protected BrokerService createBroker(int port,boolean secondaryBroker) throws Exception { String hostName = "localhost"; + boolean enableJMX = true; + String jmxFlag = System.getProperty("uima.as.enable.jmx"); + + if ( secondaryBroker ) { + enableJMX = false; + } else if ( jmxFlag != null && jmxFlag.equalsIgnoreCase("false") ) { + enableJMX = false; + } BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/" + hostName + "?persistent=false")); tcpConnector = addConnector(broker, "tcp",port); @@ -281,22 +293,20 @@ public class ActiveMQSupport extends Tes Logger.getRootLogger().info(">>>> Starting Broker With URL:" + uri); int defaultJMXPort = 1098; if ( secondaryBroker ) { - defaultJMXPort = 1097; - broker.getManagementContext().setJmxDomainName(broker.getManagementContext().getJmxDomainName()+".test"); + if ( enableJMX ) { + defaultJMXPort = 1097; + broker.getManagementContext().setJmxDomainName(broker.getManagementContext().getJmxDomainName()+".test"); + } tcpConnector.setName(DEFAULT_BROKER_URL_KEY_2); } else { tcpConnector.setName(DEFAULT_BROKER_URL_KEY); } - boolean enableJMX = true; - String jmxFlag = System.getProperty("uima.as.enable.jmx"); - if ( jmxFlag != null && jmxFlag.equalsIgnoreCase("false") ) { - enableJMX = false; - } if ( enableJMX ) { broker.setUseJmx(enableJMX); broker.getManagementContext().setConnectorPort(defaultJMXPort); } else { + broker.setUseJmx(false); System.out.println("************** ACTIVEMQ JMX Connector Not Enabled ****************"); } PolicyEntry policy = new PolicyEntry(); @@ -320,7 +330,7 @@ public class ActiveMQSupport extends Tes return broker; } protected BrokerService setupSecondaryBroker(boolean addProperty) throws Exception { - System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); + System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test2"); BrokerService broker2 = createBroker(DEFAULT_BROKER_PORT_2, true); broker2.start(); @@ -345,6 +355,17 @@ public class ActiveMQSupport extends Tes } removeHttpConnector(); +// MBeanServer jmxServer = +// broker.getManagementContext().getMBeanServer(); +// if ( jmxServer != null ) { +// Set<ObjectInstance> instances = jmxServer.queryMBeans(null, null); +// Iterator<ObjectInstance> iterator = instances.iterator(); +// while (iterator.hasNext()) { +// ObjectInstance instance = iterator.next(); +// System.out.println("-------------- Object Name:t" + instance.getObjectName()); +// +// } +// } broker.deleteAllMessages(); // cleanBroker(broker); @@ -370,6 +391,7 @@ public class ActiveMQSupport extends Tes cleanBroker(broker); stopBroker(); } + System.out.println("..... Free Memory:"+Runtime.getRuntime().freeMemory()+" Total Memory:"+Runtime.getRuntime().totalMemory()); } public class UimaASErrorHandler implements ErrorHandler { Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Thu Dec 20 14:40:58 2018 @@ -727,6 +727,21 @@ public abstract class BaseTestSupport ex if (aProcessStatus instanceof UimaASProcessStatus) { casReferenceId = ((UimaASProcessStatus) aProcessStatus).getCasReferenceId(); parentCasReferenceId = ((UimaASProcessStatus) aProcessStatus).getParentCasReferenceId(); + if ( ((UimaASProcessStatus) aProcessStatus).getServiceTargetId() != null ) { + // fetch id of the service where this CAS was processed + String serviceId = aCAS.getDocumentText(); + if ( !((UimaASProcessStatus) aProcessStatus).getServiceTargetId().equals(serviceId)) { + System.out.println("Service Targeting Failed - Client received a reply from a wrong service - Expected:" + + ((UimaASProcessStatus) aProcessStatus).getServiceTargetId() + + " The CAS was Actually Processed by "+serviceId); + unexpectedException = true; + processCountLatch.countDown(); + + return; + } else { + System.out.println("Service Targeting Success - CAS Processed by Service:"+serviceId); + } + } } if (aProcessStatus.isException()) { List list = aProcessStatus.getExceptions(); Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith2SecDelay.xml URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith2SecDelay.xml?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith2SecDelay.xml (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith2SecDelay.xml Thu Dec 20 14:40:58 2018 @@ -60,7 +60,7 @@ <nameValuePair> <name>ProcessDelay</name> <value> - <integer>200000000</integer> + <integer>2000</integer> </value> </nameValuePair> Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith5SecDelay.xml URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith5SecDelay.xml?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith5SecDelay.xml (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith5SecDelay.xml Thu Dec 20 14:40:58 2018 @@ -60,7 +60,7 @@ <nameValuePair> <name>ProcessDelay</name> <value> - <integer>500000000</integer> + <integer>5000</integer> </value> </nameValuePair> Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregate.xml URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregate.xml?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregate.xml (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregate.xml Thu Dec 20 14:40:58 2018 @@ -58,6 +58,12 @@ </languagesSupported> </capability> </capabilities> + <typePriorities> + <priorityList> + <type>uima.cas.TOP</type> + <type>uima.tcas.Annotation</type> + </priorityList> +</typePriorities> <operationalProperties> <modifiesCas>true</modifiesCas> <multipleDeploymentAllowed>true</multipleDeploymentAllowed>