Author: cwiklik Date: Fri Oct 14 19:30:40 2016 New Revision: 1764952 URL: http://svn.apache.org/viewvc?rev=1764952&view=rev Log: UIMA-5123 refactored code that deals with recovery after broker restart
Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml (with props) uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml (with props) uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml (with props) Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Fri Oct 14 19:30:40 2016 @@ -19,12 +19,9 @@ package org.apache.uima.adapter.jms.activemq; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; @@ -43,7 +40,9 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.AsyncCallback; import org.apache.activemq.ConnectionFailedException; import org.apache.activemq.advisory.ConsumerEvent; import org.apache.activemq.advisory.ConsumerListener; @@ -96,7 +95,7 @@ public class JmsEndpointConnection_impl private volatile boolean retryEnabled; - private AnalysisEngineController controller = null; + protected AnalysisEngineController controller = null; private volatile boolean connectionAborted = false; @@ -277,7 +276,7 @@ public class JmsEndpointConnection_impl conn.close(); } catch( Exception ee) {} } - // if ( logConnectionProblem ) { + if ( jex.getCause() != null && logConnectionProblem ) { logConnectionProblem = false; // log once // Check if unable to connect to the broker and retry ... if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { @@ -289,8 +288,15 @@ public class JmsEndpointConnection_impl UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", jex); + + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_service_lost_connectivity_WARNING", + new Object[] { controller.getComponentName(), brokerUri}); + + } - // } + } this.wait(1000); // wait between retries } catch ( Exception ee) { ee.printStackTrace(); @@ -668,7 +674,12 @@ public class JmsEndpointConnection_impl } logMessageSize(aMessage, msgSize, destinationName); synchronized (producer) { - producer.send((Destination) delegateEndpoint.getDestination(), aMessage); + // create amq async callback listener to detect jms msg delivery problems + AsyncCallback onComplete = createAMQCallbackListener(command, aMessage); + // if the msg cannot be delivered due to invalid destination, the send does + // not fail since we are using AMQ async sends. To detect delivery issues + // we use callback listener where such conditions are detected and handled + ((ActiveMQMessageProducer)producer).send((Destination) delegateEndpoint.getDestination(), aMessage, onComplete); } } else { destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName(); @@ -678,22 +689,27 @@ public class JmsEndpointConnection_impl new Object[] { destinationName }); } logMessageSize(aMessage, msgSize, destinationName); - // If in ParallelStep its possible to receive a reply from one of the delegates in parallel - // step *before* a CAS is dispatched to all of the delegates. This can cause a problem - // as replies are merged which causes the CAS to be in an inconsistent state. - // The following code calls dispatchCasToParallelDelegate() which count down - // a java latch. The same latch is used when receiving replies. If the latch is non zero - // the code blocks a thread from performing deserialization. - if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) { + // If in ParallelStep its possible to receive a reply from one of the delegates in parallel + // step *before* a CAS is dispatched to all of the delegates. This can cause a problem + // as replies are merged which causes the CAS to be in an inconsistent state. + // The following code calls dispatchCasToParallelDelegate() which count down + // a java latch. The same latch is used when receiving replies. If the latch is non zero + // the code blocks a thread from performing deserialization. + if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) { String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference); CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId); if ( casStateEntry.getNumberOfParallelDelegates() > 0) { casStateEntry.dispatchedCasToParallelDelegate(); } - } + } synchronized (producer) { - producer.send(aMessage); + // create amq async callback listener to detect jms msg delivery problems + AsyncCallback onComplete = createAMQCallbackListener(command, aMessage); + // if the msg cannot be delivered due to invalid destination, the send does + // not fail since we are using AMQ async sends. To detect delivery issues + // we use callback listener where such conditions are detected and handled + ((ActiveMQMessageProducer)producer).send(aMessage, onComplete); } } @@ -709,7 +725,7 @@ public class JmsEndpointConnection_impl // to find inactive sessions. lastDispatchTimestamp.set(System.currentTimeMillis()); // Succeeded sending the CAS - return true; + return true; } catch (Exception e) { // if a client terminates with an outstanding request, the service will not @@ -805,6 +821,20 @@ public class JmsEndpointConnection_impl return false; } + public AsyncCallback createAMQCallbackListener(int command, Message aMessage) throws Exception { + String cid=""; + CasStateEntry casStateEntry = null; + AsyncCallback onComplete = null; + if ( command == AsynchAEMessage.Process) { + cid = aMessage.getStringProperty(AsynchAEMessage.CasReference); + casStateEntry = controller.getLocalCache().lookupEntry(cid); + onComplete = new UimaAsAsyncCallbackListener(casStateEntry); + } else { + onComplete = new UimaAsAsyncCallbackListener(command); + } + return onComplete; + } + private void logMessageSize(Message aMessage, long msgSize, String destinationName) { if (UIMAFramework.getLogger().isLoggable(Level.FINE)) { boolean isReply = false; @@ -921,4 +951,72 @@ public class JmsEndpointConnection_impl // brokerDestinations.getConnectionTimer().stopTimer(); } + private class UimaAsAsyncCallbackListener implements AsyncCallback { + CasStateEntry casState=null; + int command; + + public UimaAsAsyncCallbackListener(int command) { + this.command = command; + } + public UimaAsAsyncCallbackListener( CasStateEntry casState ) { + this.casState = casState; + } + public void onException(JMSException exception) { + if ( casState != null ) { + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_unable_to_deliver_msg__INFO", + new Object[] { controller.getComponentName(), casState.getCasReferenceId(),exception.getMessage() }); + } + casState.setDeliveryToClientFailed(); + if ( casState.isSubordinate()) { + try { + + String inputCasId = casState.getInputCasReferenceId(); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_force_cas_abort__INFO", + new Object[] { controller.getComponentName(), "parent", inputCasId }); + } + + + CasStateEntry parentCasStateEntry = controller.getLocalCache().lookupEntry(inputCasId); + //parentCasStateEntry.setDeliveryToClientFailed(); + parentCasStateEntry.setFailed(); + controller.addAbortedCasReferenceId(inputCasId); + if ( controller instanceof AggregateAnalysisEngineController ) { + List<AnalysisEngineController> controllers = + ((AggregateAnalysisEngineController)controller).getChildControllerList(); + for( AnalysisEngineController ctrl : controllers) { + ctrl.addAbortedCasReferenceId(inputCasId); + } + } + + } catch( Exception e) { + e.printStackTrace(); + } + controller.releaseNextCas(casState.getCasReferenceId()); + } + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_release_cas_req__FINE", + new Object[] { controller.getComponentName(), casState.getCasReferenceId() }); + } + } else { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), "", endpointName}); + + } + } + + public void onSuccess() { + } + + } } Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Fri Oct 14 19:30:40 2016 @@ -27,12 +27,14 @@ import java.util.concurrent.CopyOnWriteA import java.util.concurrent.CountDownLatch; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.uima.UIMAFramework; @@ -57,6 +59,7 @@ import org.apache.uima.adapter.jms.JmsCo import org.apache.uima.adapter.jms.message.JmsMessageContext; import org.apache.uima.util.Level; import org.springframework.jms.listener.SessionAwareMessageListener; +import org.springframework.jms.support.destination.DestinationResolver; /** * Thin adapter for receiving JMS messages from Spring. It delegates processing of all messages to @@ -1041,7 +1044,61 @@ public class JmsInputChannel implements } } } - + public void createListenerOnTempQueue(ConnectionFactory cf, boolean isFreeCasDestination ) throws Exception { + TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName()); + UimaDefaultMessageListenerContainer connector = new UimaDefaultMessageListenerContainer(true); + connector.setConnectionFactory(cf); + resolver.setListener(connector); + connector.setConcurrentConsumers(1); + connector.setDestinationResolver(resolver); + connector.setController(getController()); + connector.setMessageListener(this); + connector.initializeContainer(); + connector.getDestination(); + connector.afterPropertiesSet(false); + UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,getController().getComponentName()+"-JmsInputChannel.createListenerOnTempQueue()-starting new Listener" ); + connector.start(); + boolean log = true; + synchronized (mux) { + while (connector.getListenerEndpoint() == null) { + try { + if ( log ) { + log = false; + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_temp_destination_not_available_retrying__INFO", + new Object[] { getController().getComponentName(), "5"}); + } + } + mux.wait(5000); + + } catch (InterruptedException e) { + } + } + + } + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_temp_destination_available__INFO", + new Object[] { getController().getComponentName(), connector.getListenerEndpoint(), isFreeCasDestination}); + } + + if ( isFreeCasDestination ) { + ((JmsOutputChannel) getController().getOutputChannel()) + .setFreeCasQueue(connector.getListenerEndpoint()); + } + setListenerContainer(connector); + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), + "createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_activated_fcq__CONFIG", + new Object[] { getController().getComponentName(), connector.getEndpointName() }); + } + } public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception { if (getController() instanceof AggregateAnalysisEngineController) { Delegate delegate = ((AggregateAnalysisEngineController) getController()) @@ -1056,7 +1113,7 @@ public class JmsInputChannel implements newListener.setMessageListener(this); newListener.setController(getController()); - TempDestinationResolver resolver = new TempDestinationResolver(); + TempDestinationResolver resolver = new TempDestinationResolver(controller.getComponentName()); resolver.setConnectionFactory(f); resolver.setListener(newListener); newListener.setDestinationResolver(resolver); Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Fri Oct 14 19:30:40 2016 @@ -530,9 +530,12 @@ public class JmsOutputChannel implements // Only one thread at a time is allowed here. synchronized( masterEndpoint ) { if ( masterEndpoint.getStatus() == Endpoint.FAILED ) { + + String name = anEndpoint.getDestination().toString(); // Returns InputChannel if the Reply Listener for the delegate has previously failed. // If the listener hasnt failed the getReplyInputChannel returns null - InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey()); +// InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey()); + InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString()); if ( iC != null ) { try { // Create a new Listener, new Temp Queue and associate the listener with the Input Channel @@ -778,7 +781,7 @@ public class JmsOutputChannel implements public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException { - try { + try { anEndpoint.setReplyEndpoint(true); if (anEndpoint.isRemote()) { if (anEndpoint.getSerialFormat() == SerialFormat.XMI) { @@ -1670,7 +1673,6 @@ public class JmsOutputChannel implements // produced by the CAS Multiplier. The client will treat this CAS // differently from the input CAS. tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); - isRequest = true; // Save the id of the parent CAS tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java Fri Oct 14 19:30:40 2016 @@ -55,7 +55,7 @@ import org.springframework.jms.support.d public class SpringContainerDeployer implements ControllerCallbackListener { private static final Class CLASS_NAME = SpringContainerDeployer.class; - private static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10; + public static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10; public static final int QUIESCE_AND_STOP = 1000; Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java Fri Oct 14 19:30:40 2016 @@ -33,6 +33,13 @@ public class TempDestinationResolver imp private Object mutex = new Object(); + private String serviceName = ""; + + public TempDestinationResolver() { + } + public TempDestinationResolver(String name) { + serviceName = name; + } /** * This method is called by the Spring listener code. It creates a single temp queue for all * listener instances. If the Spring listener is configured with more than one concurrentConsumer, @@ -41,7 +48,6 @@ public class TempDestinationResolver imp */ public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException { - synchronized (mutex) { if (destination == null) { destination = session.createTemporaryQueue(); Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Oct 14 19:30:40 2016 @@ -19,11 +19,9 @@ package org.apache.uima.adapter.jms.activemq; -import java.io.IOException; import java.lang.reflect.Method; import java.net.ConnectException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -38,6 +36,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.ExceptionListener; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.TemporaryQueue; @@ -125,6 +124,7 @@ public class UimaDefaultMessageListenerC // on listener failure log once and retry silently private volatile boolean logListenerFailure=true; + private static CountDownLatch recoveryLatch = new CountDownLatch(4); public UimaDefaultMessageListenerContainer() { super(); // reset global static. This only effects unit testing as services are deployed @@ -132,7 +132,7 @@ public class UimaDefaultMessageListenerC terminating = false; UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING); __listenerRef = this; - setRecoveryInterval(30000); // increase connection recovery to 30 sec + setRecoveryInterval(400); // increase connection recovery to 30 sec setAcceptMessagesWhileStopping(true); setExceptionListener(this); threadGroup = new ThreadGroup("ListenerThreadGroup_" @@ -166,7 +166,63 @@ public class UimaDefaultMessageListenerC tcon = createConnection(); JmsUtils.closeConnection(tcon); } - logger.info("Successfully refreshed JMS Connection"); + String ctrlName = ""; + if ( controller != null ) { + ctrlName = "Controller: "+controller.getComponentName(); + } + if ( super.getMessageSelector() != null ) { + logger.info(ctrlName+" Successfully refreshed JMS Connection - Selector "+super.getMessageSelector()+" Instance hashcode:"+this.hashCode()); + + } else { + logger.info(ctrlName+" Successfully refreshed JMS Connection "); + + } + //if (controller != null && controller instanceof AggregateAnalysisEngineController) { + // If endpoint not set, this is a temp reply queue listener. + if (getDestination() != null && ((ActiveMQDestination)getDestination()).isTemporary()) { + destroy(); + logger.info("Controller:"+controller.getComponentName()+"... Destroyed Listener on a temp queue:"+getDestination()); +// ((JmsOutputChannel)controller.getOutputChannel()).setFreeCasQueue(getDestination()); + + if ( freeCasQueueListener ) { + logger.info("Controller:"+controller.getComponentName()+" ------------------- Creating new listener for the FreeCas temp queue"); + try { + ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory(), true); + logger.info("Controller:"+controller.getComponentName()+"------------------- New listener on FreeCas temp queue is ready"); + } catch( Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_exception__WARNING", e); + } + } + } + + /* + if ( getMessageListener() instanceof JmsInputChannel ) { + System.out.println("------------------- Creating new listener for the temp queue"); + try { + ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory()); + System.out.println("------------------- New listener on temp queue is ready"); + } catch( Exception e) { + System.out.println("------------------- Error while creating new listener on temp queue"); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_exception__WARNING", e); + } + + } + } +*/ + + } + + /* + String delegateKey = ((AggregateAnalysisEngineController) controller) + .lookUpDelegateKey(endpoint.getEndpoint()); + */ + //} break; } catch (Exception ex) { @@ -186,8 +242,8 @@ public class UimaDefaultMessageListenerC } } // sleepInbetweenRecoveryAttempts(); - setRecoveryInterval(10); - } + // setRecoveryInterval(10); + } } } catch( IllegalStateException e ) { } @@ -243,7 +299,7 @@ public class UimaDefaultMessageListenerC } else { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), - "handleTempQueueFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "handleListenerFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_lookup_input_channel__INFO", queueName); } } @@ -291,6 +347,24 @@ public class UimaDefaultMessageListenerC } catch( Exception exx ) { // shared connection may not exist yet if a broker is not up } + if (t instanceof InvalidDestinationException ) { + destroy(); + if ( getMessageListener() instanceof JmsInputChannel ) { + try { + // ((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory()); + } catch( Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_exception__WARNING", e); + } + + } + } + return; + } + + if ( (conn != null && conn.isTransportFailed() ) || t instanceof javax.jms.IllegalStateException && t.getMessage().equals("The Consumer is closed")) { @@ -340,6 +414,7 @@ public class UimaDefaultMessageListenerC } } else if (disableListener(t)) { handleQueueFailure(t); + } else { } } @@ -442,7 +517,7 @@ public class UimaDefaultMessageListenerC } - setRecoveryInterval(0); + setRecoveryInterval(1); // Spin a shutdown thread to terminate listener. new Thread() { @@ -886,7 +961,7 @@ public class UimaDefaultMessageListenerC JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST", new Object[] { msg }); } - setRecoveryInterval(0); + setRecoveryInterval(1); setAutoStartup(false); if ( getSharedConnection() != null ) { ActiveMQConnection amqc = (ActiveMQConnection)getSharedConnection(); Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Fri Oct 14 19:30:40 2016 @@ -319,7 +319,9 @@ public class BaseUIMAAsynchronousEngine_ if ( amqc != null && !amqc.isClosed() && !amqc.isClosing() && consumerDestination != null && consumerDestination instanceof ActiveMQTempDestination ) { try { - amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination); + if ( !amqc.isClosed() && !amqc.isTransportFailed()) { + amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination); + } } catch( Exception e) { e.printStackTrace(); } @@ -351,6 +353,7 @@ public class BaseUIMAAsynchronousEngine_ sender.doStop(); } try { + System.out.println(this.getClass().getName()+".stop() - Stopping UIMA-AS Client"); stopConnection(); // Undeploy all containers undeploy(); Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Fri Oct 14 19:30:40 2016 @@ -37,6 +37,8 @@ import java.util.concurrent.CountDownLat import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Message; @@ -58,6 +60,8 @@ import org.apache.uima.aae.client.UimaAs import org.apache.uima.aae.client.UimaAsynchronousEngine; import org.apache.uima.aae.controller.Endpoint; import org.apache.uima.aae.error.ServiceShutdownException; +import org.apache.uima.aae.error.UimaASPingTimeout; +import org.apache.uima.aae.error.UimaASProcessCasTimeout; import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.activemq.JmsInputChannel; @@ -120,6 +124,358 @@ public class TestUimaASExtended extends return b.getDefaultSocketURIString(); } + + + /** + * Tests error handling of the client. It deploys Aggregate service Cas Multiplier. initializes + * the client and sends a CAS for processing. The child CAS is than held in NoOp Annotator for + * 30 secs to simulate heavy processing. While the CAS is being processed, a broker is stopped. + * The client should timeout after 40 secs and attempt to send 2 more CASes. Since the broker + * is down, each of these 2 CASes goes into a retry list while a Connection is being retried. + * Both should timeout, and sendAndReceive() should fail due to a timeout. + * + * @throws Exception + */ + @Test + public void testClientRecoveryFromBrokerFailure() throws Exception { + System.out.println("-------------- testClientRecoveryFromBrokerFailure -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml"); + + Map<String, Object> appCtx = + buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 40000); // AE will hold the CAS for 30 secs so this needs to be larger + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + + ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor(); + + // schedule a thread that will stop the broker after 10 secs + s.schedule( + new Runnable() { + + @Override + public void run() { + try { + System.out.println("Stopping Broker ..."); + broker.stop(); + broker.waitUntilStopped(); + System.out.println("Broker Stopped..."); + + } catch( Exception e) { + + } + + } + + } + , 10, TimeUnit.SECONDS); + int timeoutCount=0; + + // try to send 3 CASes, each should timeout + for (int i = 0; i < 3; i++) { + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + try { + System.out.println("............... Client Sending CAS #"+(i+1)); + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + if ( e instanceof UimaASProcessCasTimeout ) { + timeoutCount++; + System.out.println("Client .............. "+e.getMessage()); + if ( e.getCause() != null && e instanceof UimaASPingTimeout) { + System.out.println("Client .............. "+e.getCause().getMessage()); + } + } else if ( e.getCause() instanceof UimaASProcessCasTimeout ) { + timeoutCount++; + System.out.println("Client .............. "+e.getCause().getMessage()); + if ( e.getCause().getCause() != null && e.getCause().getCause() instanceof UimaASPingTimeout) { + System.out.println("Client .............. "+e.getCause().getCause().getMessage()); + } + } else { + e.printStackTrace(); + } + // System.out.println("Client Received Expected Error on CAS:"+(i+1)); + } finally { + cas.release(); + } + } + if ( timeoutCount != 3) { + uimaAsEngine.stop(); + fail("Expected 3 Errors Due to Timeout, Instead Got "+timeoutCount+" Timeouts"); + } else { + uimaAsEngine.stop(); + } + + } + + @Test + public void testBrokerRestartWithAggregateMultiplier() throws Exception { + System.out.println("-------------- testBrokerRestartWithAggregateMultiplier -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml"); + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml"); + + String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(); + Map<String, Object> appCtx = buildContext(burl, "TopLevelTaeQueue"); + synchronized(this) { + this.wait(2000); + } + broker.stop(); + broker.waitUntilStopped(); + + synchronized(this) { + this.wait(2000); + } + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + synchronized(this) { + this.wait(2000); + } + + + // reduce the cas pool size and reply window + appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); + appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2)); + runTest(appCtx, eeUimaEngine,burl, + "TopLevelTaeQueue", 1, PROCESS_LATCH); + eeUimaEngine.stop(); + } + + /** + * Tests client and service recovery from broker restart. It deploys CM service, dispatches + * a CAS for processing and while the CAS is in process, it bounces a broker. The service + * listeners should be restored and the CAS should fail due to invalid destination. Once + * the client times out, it should send 2 more CASes which should force client to re-establish + * connection with a broker and replies should come back. + * + * @throws Exception + */ + + @Test + public void testBrokerRestartWithAggregateMultiplierWhileProcessingCAS() throws Exception { + System.out.println("-------------- testBrokerRestartWithAggregateMultiplierWhileProcessingCAS -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + deployService(uimaAsEngine, relativePath + "/Deploy_AggregateMultiplierWith30SecDelay.xml"); + + Map<String, Object> appCtx = + buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "TopLevelTaeQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 40000); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + + + ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor(); + + s.schedule( + new Runnable() { + + @Override + public void run() { + try { + System.out.println("Stopping Broker ..."); + broker.stop(); + broker.waitUntilStopped(); + System.out.println("Broker Stopped..."); + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + System.out.println("Broker Restarted..."); + + + } catch( Exception e) { + + } + + } + + } + , 10, TimeUnit.SECONDS); + + + for (int i = 0; i < 3; i++) { + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + try { + System.out.println("............... Client Sending CAS #"+(i+1)); + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + e.printStackTrace(); + // System.out.println("Client Received Expected Error on CAS:"+(i+1)); + } finally { + cas.release(); + } + + + } + uimaAsEngine.stop(); + } + + + @Test + public void testBrokerRestartWhileProcessingCAS() throws Exception { + System.out.println("-------------- testBrokerRestartWhileProcessingCAS -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + + deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWith30SecDelay.xml"); + Map<String, Object> appCtx = + buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 40000); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + + + ScheduledExecutorService s = Executors.newSingleThreadScheduledExecutor(); + + s.schedule( + new Runnable() { + + @Override + public void run() { + try { + System.out.println("Stopping Broker ..."); + broker.stop(); + broker.waitUntilStopped(); + System.out.println("Broker Stopped..."); + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + System.out.println("Broker Restarted..."); + + + } catch( Exception e) { + + } + + } + + } + , 10, TimeUnit.SECONDS); + + for (int i = 0; i < 1; i++) { + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + e.printStackTrace(); + // System.out.println("Client Received Expected Error on CAS:"+(i+1)); + } finally { + cas.release(); + } + + + } + uimaAsEngine.stop(); + + + } + /** + * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the test + * fails due to broker bug. What happens is that when a jms client uses http + * protocol, the connection is made but the keep-alive chat between broker and + * client is not causing a timeout and an exception. + * + * The exception is internal to the broker but it also happens within amq + * client code. To get to this, a custom spring based listener is deployed + * with some of its exception handling methods overriden to capture an exception. + * + * @throws Exception + */ + @Test + public void testServiceWithHttpListeners() throws Exception { + System.out.println("-------------- testServiceWithHttpListeners -------------"); + // Need java monitor object on which to sleep + Object waitObject = new Object(); + // Custom spring listener with handleListenerSetupFailure() overriden to + // capture AMQ exception. + TestDefaultMessageListenerContainer c = new TestDefaultMessageListenerContainer(); + c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888")); + c.setDestinationName("TestQ"); + c.setConcurrentConsumers(2); + c.setBeanName("TestBean"); + c.setMessageListener(new JmsInputChannel()); + c.initialize(); + c.start(); + + if ( c.isRunning() ) { + System.out.println("... Listener Ready"); + + } + // Keep-alive has a default 30 secs timeout. Sleep for bit longer than that + // If there is an exception due to keep-alive, an exception handler will be + // called on the TestDefaultMessageListenerContainer instance where we + // capture the error. + System.out.println("... Waiting for 40 secs"); + try { + synchronized(waitObject) { + waitObject.wait(40000); + } + // had there been broker issues relateds to keep-alive the listener's failed + // flag would have been set by now. Check it and fail the test + if ( c.failed() ) { + fail("Broker Failed - Reason:"+c.getReasonForFailure()); + } else { + System.out.println("Stopping Listener"); + c.stop(); + + } + } catch( Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + + @Test + public void testBrokerRestartWithPrimitiveMultiplier() throws Exception { + System.out.println("-------------- testBrokerRestartWithPrimitiveMultiplier -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + + deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml"); + + + broker.stop(); + broker.waitUntilStopped(); + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(); + Map<String, Object> appCtx = + buildContext(burl, "TestMultiplierQueue"); + + // reduce the cas pool size and reply window + appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); + appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2)); + runTest(appCtx, eeUimaEngine,burl, + "TestMultiplierQueue", 1, PROCESS_LATCH); + + eeUimaEngine.stop(); + } + + + /* public void testContinueOnRetryFailure2() throws Exception { System.out.println("-------------- testContinueOnRetryFailure -------------"); @@ -147,9 +503,159 @@ public class TestUimaASExtended extends } */ + + + /* + * Tests + */ + @Test + public void testSyncClientRecoveryFromBrokerStopAndRestart3() throws Exception { + System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + + // Instantiate Uima AS Client + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + //BrokerService broker2 = setupSecondaryBroker(true); + // Deploy Uima AS Primitive Service + deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); + + Map<String, Object> appCtx = + buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(), "NoOpAnnotatorQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + + + broker.stop(); + broker.waitUntilStopped(); + + //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); + //broker2 = setupSecondaryBroker(true); + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + int errorCount = 0; + System.out.println("Sending CASes"); + for (int i = 0; i < 60; i++) { + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + System.out.println("Client Received Expected Error on CAS:"+(i+1)); + } finally { + cas.release(); + } - + } + uimaAsEngine.stop(); + + /* + int errorCount=0; + for (int i = 0; i < 20; i++) { + + if ( i == 5 ) { + broker2.stop(); + broker2.waitUntilStopped(); + } else if ( i == 10 ) { + // restart the broker + System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); + broker2 = setupSecondaryBroker(true); + + broker2.start(); + broker2.waitUntilStarted(); + + } + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + errorCount++; + System.out.println("Client Received Expected Error on CAS:"+(i+1)); + } finally { + cas.release(); + } + } + + uimaAsEngine.stop(); + super.cleanBroker(broker2); + + broker2.stop(); + + // expecting 5 failures due to broker missing + if ( errorCount != 5 ) { + fail("Expected 5 failures due to broker down, instead received:"+errorCount+" failures"); + } + broker2.waitUntilStopped(); +*/ + } + + /* + + @Test + public void testSyncClientRecoveryFromBrokerStopAndRestart2() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + System.out.println("-------------- testSyncClientRecoveryFromBrokerStopAndRestart -------------"); + // Instantiate Uima AS Client + BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + BrokerService broker2 = setupSecondaryBroker(true); + // Deploy Uima AS Primitive Service + deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml"); + + Map<String, Object> appCtx = + buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), "NoOpAnnotatorQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 5100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000); + initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + + // Get meta received, bounce the broker now. + broker2.stop(); + broker2.waitUntilStopped(); + + +// ActiveMQConnectionFactory f = new ActiveMQConnectionFactory(""); +// ActiveMQConnection c = (ActiveMQConnection)f.createConnection(); + // restart the broker + System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); + broker2 = setupSecondaryBroker(true); + + broker2.start(); + broker2.waitUntilStarted(); + + // new broker is up. Send a few CASes now + for( int i=0; i < 5; i++) { + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + e.printStackTrace(); + // fail("Unexpected exception from sendAndReceive()- test Failed"); + } finally { + cas.release(); + } + + } + + + + uimaAsEngine.stop(); + super.cleanBroker(broker2); + + broker2.stop(); + broker2.waitUntilStopped(); + + } + + */ /** * Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to emulate usage from @@ -636,63 +1142,6 @@ public class TestUimaASExtended extends } - /** - * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the test - * fails due to broker bug. What happens is that when a jms client uses http - * protocol, the connection is made but the keep-alive chat between broker and - * client is not causing a timeout and an exception. - * - * The exception is internal to the broker but it also happens within amq - * client code. To get to this, a custom spring based listener is deployed - * with some of its exception handling methods overriden to capture an exception. - * - * @throws Exception - */ - @Test - public void testServiceWithHttpListeners() throws Exception { - System.out.println("-------------- testServiceWithHttpListeners -------------"); - // Need java monitor object on which to sleep - Object waitObject = new Object(); - // Custom spring listener with handleListenerSetupFailure() overriden to - // capture AMQ exception. - TestDefaultMessageListenerContainer c = new TestDefaultMessageListenerContainer(); - c.setConnectionFactory(new ActiveMQConnectionFactory("http://localhost:18888")); - c.setDestinationName("TestQ"); - c.setConcurrentConsumers(2); - c.setBeanName("TestBean"); - c.setMessageListener(new JmsInputChannel()); - c.initialize(); - c.start(); - - if ( c.isRunning() ) { - System.out.println("... Listener Ready"); - - } - // Keep-alive has a default 30 secs timeout. Sleep for bit longer than that - // If there is an exception due to keep-alive, an exception handler will be - // called on the TestDefaultMessageListenerContainer instance where we - // capture the error. - System.out.println("... Waiting for 40 secs"); - try { - synchronized(waitObject) { - waitObject.wait(40000); - } - // had there been broker issues relateds to keep-alive the listener's failed - // flag would have been set by now. Check it and fail the test - if ( c.failed() ) { - fail("Broker Failed - Reason:"+c.getReasonForFailure()); - } else { - System.out.println("Stopping Listener"); - c.stop(); - - } - } catch( Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test public void testAggregateHttpTunnelling() throws Exception { System.out.println("-------------- testAggregateHttpTunnelling -------------"); @@ -1469,15 +1918,18 @@ public class TestUimaASExtended extends deployService(uimaAsEngine, relativePath + "/Deploy_NoOpAnnotatorWithLongDelay.xml"); Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), "NoOpAnnotatorQueueLongDelay"); - appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.Timeout, 300); initialize(uimaAsEngine, appCtx); waitUntilInitialized(); - - for (int i = 0; i < 1; i++) { + Object o = new Object(); + for (int i = 0; i < 6; i++) { CAS cas = uimaAsEngine.getCAS(); cas.setDocumentText("Some Text"); // System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); uimaAsEngine.sendCAS(cas); + synchronized(o) { + o.wait(1000); + } } uimaAsEngine.collectionProcessingComplete(); @@ -1882,19 +2334,26 @@ public class TestUimaASExtended extends public void testDeployAggregateService() throws Exception { System.out.println("-------------- testDeployAggregateService -------------"); BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + + + // System.setProperty("BrokerURL", "tcp::/localhost:61616"); + + System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml"); Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), +// Map<String, Object> appCtx = buildContext("tcp://localhost:61616", "TopLevelTaeQueue"); - appCtx.put(UimaAsynchronousEngine.Timeout, 0); + appCtx.put(UimaAsynchronousEngine.Timeout, 1000); appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0); addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); - runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue", - 10, PROCESS_LATCH); +// runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue", + runTest(appCtx, eeUimaEngine, "tcp://localhost:61616", "TopLevelTaeQueue", + 1, PROCESS_LATCH); } /** * Sends total of 10 CASes to async aggregate configured to process 2 CASes at a time. @@ -2257,7 +2716,7 @@ public class TestUimaASExtended extends System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); deployService(eeUimaEngine, relativePath + "/Deploy_ComplexAggregateWithInnerUimaAggregateCM.xml"); runTest(null, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue", - 10, PROCESS_LATCH); + 1, PROCESS_LATCH); } /** @@ -2932,23 +3391,50 @@ public class TestUimaASExtended extends @Test public void testClientWithAggregateMultiplier() throws Exception { System.out.println("-------------- testClientWithAggregateMultiplier -------------"); + System.setProperty("BrokerURL", broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString()); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml"); + broker.stop(); + broker.waitUntilStopped(); + + //System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); + //broker2 = setupSecondaryBroker(true); + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); deployService(eeUimaEngine, relativePath + "/Deploy_AggregateMultiplier.xml"); - Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), - "TopLevelTaeQueue"); + String burl = broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(); + Map<String, Object> appCtx = + buildContext(burl, "TopLevelTaeQueue"); + +// Map<String, Object> appCtx = buildContext(String.valueOf(getMasterConnectorURI(broker)), + // "TopLevelTaeQueue"); + +broker.stop(); +broker.waitUntilStopped(); + +//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test"); +//broker2 = setupSecondaryBroker(true); +broker = createBroker(); +broker.start(); +broker.waitUntilStarted(); + // reduce the cas pool size and reply window appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize); appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2)); - runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), +// runTest(appCtx, eeUimaEngine, String.valueOf(getMasterConnectorURI(broker)), + runTest(appCtx, eeUimaEngine,burl, "TopLevelTaeQueue", 1, PROCESS_LATCH); } @Test public void testClientProcessWithRemoteMultiplier() throws Exception { System.out.println("-------------- testClientProcessWithRemoteMultiplier -------------"); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); deployService(eeUimaEngine, relativePath + "/Deploy_RemoteCasMultiplier.xml"); Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java Fri Oct 14 19:30:40 2016 @@ -353,8 +353,10 @@ public class ActiveMQSupport extends Tes System.clearProperty("BrokerURL"); wait(3000); - cleanBroker(broker); - stopBroker(); + if ( !broker.isStopped()) { + cleanBroker(broker); + stopBroker(); + } } public class UimaASErrorHandler implements ErrorHandler { Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml (added) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml Fri Oct 14 19:30:40 2016 @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> + <!-- + *************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *************************************************************** + --> +<analysisEngineDeploymentDescription + xmlns="http://uima.apache.org/resourceSpecifier"> + + <name>NoOp Annotator A</name> + <description>Deploys NoOp Annotator Primitive AE</description> + + <deployment protocol="jms" provider="activemq"> + <casPool numberOfCASes="5" initialFsHeapSize="500"/> + <service> + <inputQueue endpoint="NoOpAnnotatorQueue" brokerURL="${DefaultBrokerURL}" prefetch="1"/> + <topDescriptor> + <import location="../descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml"/> + </topDescriptor> + <analysisEngine> + <asyncPrimitiveErrorConfiguration> + <!-- <processCasErrors thresholdCount="4" thresholdWindow="10" thresholdAction="terminate" /> --> + <collectionProcessCompleteErrors additionalErrorAction="terminate" /> + </asyncPrimitiveErrorConfiguration> + </analysisEngine> + + + </service> + </deployment> + +</analysisEngineDeploymentDescription> + Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml (added) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml Fri Oct 14 19:30:40 2016 @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> + <!-- + *************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *************************************************************** + --> +<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier"> + <frameworkImplementation>org.apache.uima.java</frameworkImplementation> + <primitive>true</primitive> + <annotatorImplementationName>org.apache.uima.ae.noop.NoOpAnnotator</annotatorImplementationName> + <analysisEngineMetaData> + <name>NoOp Annotator</name> + <description>Annotator That Does Nothin</description> + <version>1.0</version> + <vendor>The Apache Software Foundation</vendor> + + <configurationParameters> + <configurationParameter> + <name>ErrorFrequency</name> + <description>Frequency of Generated Errors</description> + <type>Integer</type> + <multiValued>false</multiValued> + <mandatory>true</mandatory> + </configurationParameter> + + <configurationParameter> + <name>ProcessDelay</name> + <description>Process Delay</description> + <type>Integer</type> + <multiValued>false</multiValued> + <mandatory>true</mandatory> + </configurationParameter> + + + </configurationParameters> + + <configurationParameterSettings> + <nameValuePair> + <name>ErrorFrequency</name> + <value> + <integer>0</integer> + </value> + </nameValuePair> + + <nameValuePair> + <name>ProcessDelay</name> + <value> + <integer>30000</integer> + </value> + </nameValuePair> + + </configurationParameterSettings> + + + + <typeSystemDescription> + </typeSystemDescription> + + <capabilities> + </capabilities> + + <operationalProperties> + <modifiesCas>true</modifiesCas> + <multipleDeploymentAllowed>true</multipleDeploymentAllowed> + <outputsNewCASes>false</outputsNewCASes> + </operationalProperties> + </analysisEngineMetaData> +</analysisEngineDescription> + Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml?rev=1764952&view=auto ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml (added) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml Fri Oct 14 19:30:40 2016 @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> + + <!-- + *************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + *************************************************************** + --> + +<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier"> + <frameworkImplementation>org.apache.uima.java</frameworkImplementation> + <primitive>false</primitive> + <delegateAnalysisEngineSpecifiers> + + <delegateAnalysisEngine key="TestMultiplier"> + <import location="../multiplier/SimpleCasGeneratorProducing1000Cases.xml"/> + </delegateAnalysisEngine> + + + <delegateAnalysisEngine key="NoOp"> + <import location="NoOpAnnotatorWith30SecDelay.xml"/> + </delegateAnalysisEngine> + + </delegateAnalysisEngineSpecifiers> + <analysisEngineMetaData> + <name>Test Aggregate TAE</name> + <description>Detects Nothing</description> + <configurationParameters/> + <configurationParameterSettings/> + <flowConstraints> + <fixedFlow> + + <node>TestMultiplier</node> + <node>NoOp</node> + </fixedFlow> + </flowConstraints> + <capabilities> + <capability> + <inputs/> + <outputs> + </outputs> + <languagesSupported> + <language>en</language> + </languagesSupported> + </capability> + </capabilities> + <operationalProperties> + <modifiesCas>true</modifiesCas> + <multipleDeploymentAllowed>true</multipleDeploymentAllowed> + <outputsNewCASes>true</outputsNewCASes> + </operationalProperties> + </analysisEngineMetaData> +</analysisEngineDescription> Propchange: uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Fri Oct 14 19:30:40 2016 @@ -145,9 +145,9 @@ public abstract class BaseAnalysisEngine protected long errorCount = 0; - protected List inputChannelList = new ArrayList(); + protected List<InputChannel> inputChannelList = new ArrayList<InputChannel>(); - protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap(); + protected ConcurrentHashMap<String, InputChannel> inputChannelMap = new ConcurrentHashMap<String, InputChannel>(); private UimaEEAdminContext adminContext; @@ -1088,7 +1088,7 @@ public abstract class BaseAnalysisEngine public void addInputChannel(InputChannel anInputChannel) { if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) { inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel); - if (inputChannelList.contains(anInputChannel)) { + if (!inputChannelList.contains(anInputChannel)) { inputChannelList.add(anInputChannel); } } @@ -2410,13 +2410,34 @@ public abstract class BaseAnalysisEngine return null; } - public InputChannel getReplyInputChannel(String aDelegateKey) { - for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) { - if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) { - return (InputChannel) inputChannelList.get(i); +// public InputChannel getReplyInputChannel(String aDelegateKey) { + public InputChannel getReplyInputChannel(String aDestination) { + InputChannel IC = null; + if ( inputChannelMap.containsKey(aDestination) ) { + return inputChannelMap.get(aDestination); + } +/* + for( InputChannel inputChannel : inputChannelList) { +// if ( inputChannel.get) + if ( inputChannel.isFailed(aDelegateKey)) { + System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found InputChannel for Delegate:"+aDelegateKey+" hashCode="+inputChannel.hashCode()); + IC = inputChannel; + } + System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next Input Channel - hashcode="+inputChannel.hashCode()); + + } + */ +/* + for (int i = 0; inputChannelList != null && i < inputChannelList.size(); i++) { + + if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) { + System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found InputChannel for Delegate:"+aDelegateKey); + return (InputChannel) inputChannelList.get(i); } + System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next Input Channel - hashcode="+); } - return null; + */ + return IC; } Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Oct 14 19:30:40 2016 @@ -1034,6 +1034,11 @@ public class PrimitiveAnalysisEngineCont // Check for delivery failure. The client may have terminated while an input CAS was being processed if ( childCasStateEntry.deliveryToClientFailed() ) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "process", + UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delivery_to_client_failed_INFO", + new Object[] { getComponentName(), aCasReferenceId }); + } clientUnreachable = true; if ( cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) { cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId()); Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Fri Oct 14 19:30:40 2016 @@ -522,7 +522,10 @@ public class ProcessResponseHandler exte getController().getEventListener()); } else { // Callback to notify that the cache is empty - getController().getEventListener().onCacheEmpty(); + + // !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF CAS IS ABORTED? + // !!!!!!!!!!!!!!!!!!!!!! ????????????????????????????????? +// getController().getEventListener().onCacheEmpty(); } } Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Fri Oct 14 19:30:40 2016 @@ -266,3 +266,6 @@ UIMAEE_service_warmup_start_INFO = Servi UIMAEE_service_warmup_success_INFO = Service: {0} Thread: {1} WarmUp Has Finished Successfully - Processed: {2} CASes - Time Spent Warming Up: {3} secs- Ready For Processing UIMAEE_warmup_dropping_cas__FINE = Aggregate Warmup Stage - Dropping CAS:{0} Processing took {1} UIMAEE_warmup_start_cas__FINE = Aggregate Warmup Stage - Processing CAS id:{0} +UIMAEE_delivery_to_client_failed_INFO = Service:{0} Unable to Deliver CAS:{1} to Client - Dropping CAS +UIMAEE_unable_to_deliver_msg__INFO=Service:{0} JMS unable to Deliver CAS:{1} - Error:{2} +UIMAEE_force_cas_abort__INFO="Service:{0} Forcing {1} CAS:{1} to Abort \ No newline at end of file Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Fri Oct 14 19:30:40 2016 @@ -21,6 +21,8 @@ package org.apache.uima.adapter.jms.clie import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.ConnectException; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; @@ -41,6 +43,7 @@ import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -49,6 +52,8 @@ import javax.jms.ObjectMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.DestinationDoesNotExistException; +import org.apache.activemq.transport.TransportListener; import org.apache.uima.UIMAFramework; import org.apache.uima.UIMARuntimeException; import org.apache.uima.UIMA_IllegalStateException; @@ -1498,8 +1503,15 @@ public abstract class BaseUIMAAsynchrono TextMessage msg = createTextMessage(); msg.setText(""); setReleaseCASMessage(msg, casReferenceId); - // Create Message Producer for the Destination - MessageProducer msgProducer = getMessageProducer(freeCASNotificationDestination); + MessageProducer msgProducer = null; + try { + // Create Message Producer for the Destination + msgProducer = getMessageProducer(freeCASNotificationDestination); + + } catch( DestinationDoesNotExistException ee) { + + } + if (msgProducer != null) { try { // Send FreeCAS message to a Cas Multiplier @@ -3136,10 +3148,13 @@ public abstract class BaseUIMAAsynchrono //System.out.println("------------------------ stop2? "+stop); // This loop attempts to recover broker connection every 5 seconds and ends when all clients // using this shared object terminate or a connection is recovered + boolean log = true; while( !stop ) { - //System.out.println("------------------------ clientList.size()- "+clientList.size()); if ( clientList.size() == 0 ) { break; // no more active clients - break out of connection recovery + } else { + BaseUIMAAsynchronousEngineCommon_impl c = + clientList.get(0); } try { // Attempt a new connection to a broker @@ -3154,8 +3169,22 @@ public abstract class BaseUIMAAsynchrono } break; } catch( Exception e) { - e.printStackTrace(); - synchronized( stateMonitor ) { + + if ( log ) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "retryConnectionUntilSuccessfull", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_retry__INFO", + new Object[] { brokerURL }); + } + if ( e instanceof JMSException && e.getMessage().endsWith("Connection refused") ) { + log = false; + System.out.println("Uima AS Client:"+e.getMessage()+" Retrying every 5 seconds until successfull"); + + } else { + e.printStackTrace(); + } + } + synchronized( stateMonitor ) { try { stateMonitor.wait(5000); // retry every 5 secs } catch( InterruptedException ie) {} @@ -3264,6 +3293,33 @@ public abstract class BaseUIMAAsynchrono return false; } } + public static class UimaAsTransportListener implements TransportListener { + + @Override + public void onCommand(Object arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void onException(IOException arg0) { + System.out.println("!!!!!!!!!!!!!!!!!! UimaAsTransportListener.onException() - lost connectipon to broker"); + + } + + @Override + public void transportInterupted() { + // TODO Auto-generated method stub + + } + + @Override + public void transportResumed() { + // TODO Auto-generated method stub + + } + + } public static class UimaASShutdownHook implements Runnable { UimaAsynchronousEngine asEngine=null; public UimaASShutdownHook( UimaAsynchronousEngine asEngine) { Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java Fri Oct 14 19:30:40 2016 @@ -174,7 +174,7 @@ public class ClientServiceDelegate exten try { clientUimaAsEngine.handleException(new UimaASProcessCasTimeout("Service Not Responding to Ping - CAS:"+de.getCasReferenceId(), new UimaASPingTimeout("Forced Timeout on CAS in PendingDispatch list. The CAS Has Not Been Dispatched since the Service Appears to be Unavailable")), de.getCasReferenceId(), null,cachedRequest, !cachedRequest.isSynchronousInvocation(), false); } catch( Exception ex) { - ex.printStackTrace(); + //ex.printStackTrace(); } } if ( clientUimaAsEngine.running ) { Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Fri Oct 14 19:30:40 2016 @@ -244,4 +244,7 @@ UIMAJMS_retrying_jms_connection__WARNING UIMAJMS_service_recvd_new_message__FINE = > service recvd CAS RefId: {0} UIMAJMS_sent_ack_message__FINE = < service sent ACK for CAS RefId: {0} UIMAJMS_received_service_info_FINEST = Received ServiceInfo message from {0} -UIMAJMS_debug_msg__FINEST={0} \ No newline at end of file +UIMAJMS_debug_msg__FINEST={0} +UIMAJMS_temp_destination_not_available_retrying__INFO=Service:{0} Unable to refresh temp destination - retrying in {1} seconds until success ... +UIMAJMS_temp_destination_available__INFO=Service:{0} succesfully refreshed temp destination:{1} - FreeCas Queue:{2} +UIMAJMS_client_connection_retry__INFO=UIMA-AS Client Unable to Connect to Broker:{0} - Retrying Until Success ... \ No newline at end of file