Author: cwiklik
Date: Fri Oct 14 19:30:40 2016
New Revision: 1764952

URL: http://svn.apache.org/viewvc?rev=1764952&view=rev
Log:
UIMA-5123 refactored code that deals with recovery after broker restart

Added:
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
   (with props)
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
   (with props)
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
   (with props)
Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 Fri Oct 14 19:30:40 2016
@@ -19,12 +19,9 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,7 +40,9 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageProducer;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.AsyncCallback;
 import org.apache.activemq.ConnectionFailedException;
 import org.apache.activemq.advisory.ConsumerEvent;
 import org.apache.activemq.advisory.ConsumerListener;
@@ -96,7 +95,7 @@ public class JmsEndpointConnection_impl
 
   private volatile boolean retryEnabled;
 
-  private AnalysisEngineController controller = null;
+  protected AnalysisEngineController controller = null;
 
   private volatile boolean connectionAborted = false;
 
@@ -277,7 +276,7 @@ public class JmsEndpointConnection_impl
                                                          conn.close();
                                                  } catch( Exception ee) {}
                                          }
-                                       //  if ( logConnectionProblem ) {
+                                         if ( jex.getCause() != null && 
logConnectionProblem ) {
                                                  logConnectionProblem = false; 
  // log once
                                                  // Check if unable to connect 
to the broker and retry ...
                                                  if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -289,8 +288,15 @@ public class JmsEndpointConnection_impl
                                                  
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                                                          "openChannel", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                                                          
"UIMAJMS_exception__WARNING", jex);
+                                                 
+                                                 
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                                                         "openChannel", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                                                         
"UIMAEE_service_lost_connectivity_WARNING",
+                                                         new Object[] { 
controller.getComponentName(), brokerUri});
+                                                 
+                                                 
                                                }
-                               //        } 
+                                         } 
                                         this.wait(1000);  // wait between 
retries 
                                  } catch ( Exception ee) {
                                          ee.printStackTrace();
@@ -668,7 +674,12 @@ public class JmsEndpointConnection_impl
         }
         logMessageSize(aMessage, msgSize, destinationName);
         synchronized (producer) {
-          producer.send((Destination) delegateEndpoint.getDestination(), 
aMessage);
+            // create amq async callback listener to detect jms msg delivery 
problems
+               AsyncCallback onComplete = createAMQCallbackListener(command, 
aMessage);
+               // if the msg cannot be delivered due to invalid destination, 
the send does
+               // not fail since we are using AMQ async sends. To detect 
delivery issues
+               // we use callback listener where such conditions are detected 
and handled
+               ((ActiveMQMessageProducer)producer).send((Destination) 
delegateEndpoint.getDestination(), aMessage, onComplete);
         }
       } else {
         destinationName = ((ActiveMQQueue) 
producer.getDestination()).getPhysicalName();
@@ -678,22 +689,27 @@ public class JmsEndpointConnection_impl
                   new Object[] { destinationName });
         }
         logMessageSize(aMessage, msgSize, destinationName);
-         // If in ParallelStep its possible to receive a reply from one of the 
delegates in parallel 
-         // step *before* a CAS is dispatched to all of the delegates. This 
can cause a problem
-         // as replies are merged which causes the CAS to be in an 
inconsistent state.
-         // The following code calls dispatchCasToParallelDelegate() which 
count down
-         // a java latch. The same latch is used when receiving replies. If 
the latch is non zero
-         // the code blocks a thread from performing deserialization.
-         if ( msgType == AsynchAEMessage.Request && command == 
AsynchAEMessage.Process ) {
+           // If in ParallelStep its possible to receive a reply from one of 
the delegates in parallel 
+           // step *before* a CAS is dispatched to all of the delegates. This 
can cause a problem
+           // as replies are merged which causes the CAS to be in an 
inconsistent state.
+           // The following code calls dispatchCasToParallelDelegate() which 
count down
+           // a java latch. The same latch is used when receiving replies. If 
the latch is non zero
+           // the code blocks a thread from performing deserialization.
+           if ( msgType == AsynchAEMessage.Request && command == 
AsynchAEMessage.Process ) {
                  String casReferenceId = 
aMessage.getStringProperty(AsynchAEMessage.CasReference);
                  CasStateEntry casStateEntry = 
controller.getLocalCache().lookupEntry(casReferenceId);
                  if ( casStateEntry.getNumberOfParallelDelegates() > 0) {
                          casStateEntry.dispatchedCasToParallelDelegate();
                  }
-         }
+           }
 
         synchronized (producer) {
-          producer.send(aMessage);
+            // create amq async callback listener to detect jms msg delivery 
problems
+               AsyncCallback onComplete = createAMQCallbackListener(command, 
aMessage);
+               // if the msg cannot be delivered due to invalid destination, 
the send does
+               // not fail since we are using AMQ async sends. To detect 
delivery issues
+               // we use callback listener where such conditions are detected 
and handled
+               ((ActiveMQMessageProducer)producer).send(aMessage, onComplete);
         }
 
       }
@@ -709,7 +725,7 @@ public class JmsEndpointConnection_impl
       // to find inactive sessions.
          lastDispatchTimestamp.set(System.currentTimeMillis());
       // Succeeded sending the CAS
-      return true;
+         return true;
     } catch (Exception e) {
        
         // if a client terminates with an outstanding request, the service 
will not
@@ -805,6 +821,20 @@ public class JmsEndpointConnection_impl
     return false;
   }
 
+  public AsyncCallback createAMQCallbackListener(int command, Message 
aMessage) throws Exception {
+       String cid="";
+       CasStateEntry casStateEntry = null;
+       AsyncCallback onComplete = null;
+       if ( command == AsynchAEMessage.Process) {
+               cid = aMessage.getStringProperty(AsynchAEMessage.CasReference);
+               casStateEntry = controller.getLocalCache().lookupEntry(cid);
+               onComplete = new UimaAsAsyncCallbackListener(casStateEntry);
+       } else {
+               onComplete = new UimaAsAsyncCallbackListener(command);
+       }
+       return onComplete;
+  }
+  
   private void logMessageSize(Message aMessage, long msgSize, String 
destinationName) {
     if (UIMAFramework.getLogger().isLoggable(Level.FINE)) {
       boolean isReply = false;
@@ -921,4 +951,72 @@ public class JmsEndpointConnection_impl
 //    brokerDestinations.getConnectionTimer().stopTimer();
   }
 
+  private class UimaAsAsyncCallbackListener implements AsyncCallback {
+       CasStateEntry casState=null;
+       int command;
+       
+       public UimaAsAsyncCallbackListener(int command) {
+               this.command = command;
+       }
+       public UimaAsAsyncCallbackListener( CasStateEntry casState ) {
+               this.casState = casState;
+       }
+       public void onException(JMSException exception) {
+               if ( casState != null ) {
+                       
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                                               
"UimaAsAsyncCallbackListener.onException()", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                                               
"UIMAEE_unable_to_deliver_msg__INFO",
+                                               new Object[] { 
controller.getComponentName(), 
casState.getCasReferenceId(),exception.getMessage() });
+                   }
+                       casState.setDeliveryToClientFailed();
+                       if ( casState.isSubordinate()) {
+                               try {
+                                       
+                                       String inputCasId = 
casState.getInputCasReferenceId();
+                                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                                                               
"UimaAsAsyncCallbackListener.onException()", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                                                               
"UIMAEE_force_cas_abort__INFO",
+                                                               new Object[] { 
controller.getComponentName(), "parent", inputCasId });
+                                   }
+                                       
+                                       
+                                       CasStateEntry parentCasStateEntry = 
controller.getLocalCache().lookupEntry(inputCasId);
+                                       
//parentCasStateEntry.setDeliveryToClientFailed();
+                                       parentCasStateEntry.setFailed();
+                                       
controller.addAbortedCasReferenceId(inputCasId);
+                                       if ( controller instanceof 
AggregateAnalysisEngineController ) {
+                                               List<AnalysisEngineController> 
controllers = 
+                                                               
((AggregateAnalysisEngineController)controller).getChildControllerList();
+                                               for( AnalysisEngineController 
ctrl : controllers) {
+                                                       
ctrl.addAbortedCasReferenceId(inputCasId);
+                                               }
+                                       }
+                                       
+                               } catch( Exception e) {
+                                       e.printStackTrace();
+                               }
+                               
controller.releaseNextCas(casState.getCasReferenceId());
+                       }
+                       
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                                                       
"UimaAsAsyncCallbackListener.onException()", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                                                       
"UIMAEE_release_cas_req__FINE",
+                                                       new Object[] { 
controller.getComponentName(), casState.getCasReferenceId() });
+                       }
+               } else {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                         "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                         "UIMAEE_service_delivery_exception__WARNING",new 
Object[] { controller.getComponentName(), "", endpointName});
+
+               }
+       }
+
+       public void onSuccess() {
+       }
+         
+  }
 }

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
 Fri Oct 14 19:30:40 2016
@@ -27,12 +27,14 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.uima.UIMAFramework;
@@ -57,6 +59,7 @@ import org.apache.uima.adapter.jms.JmsCo
 import org.apache.uima.adapter.jms.message.JmsMessageContext;
 import org.apache.uima.util.Level;
 import org.springframework.jms.listener.SessionAwareMessageListener;
+import org.springframework.jms.support.destination.DestinationResolver;
 
 /**
  * Thin adapter for receiving JMS messages from Spring. It delegates 
processing of all messages to
@@ -1041,7 +1044,61 @@ public class JmsInputChannel implements
       }
     }
   }
-
+  public void createListenerOnTempQueue(ConnectionFactory cf, boolean 
isFreeCasDestination ) throws Exception {
+         TempDestinationResolver resolver = new 
TempDestinationResolver(controller.getComponentName());
+         UimaDefaultMessageListenerContainer connector = new 
UimaDefaultMessageListenerContainer(true);
+         connector.setConnectionFactory(cf);
+         resolver.setListener(connector);
+         connector.setConcurrentConsumers(1);
+         connector.setDestinationResolver(resolver);
+         connector.setController(getController());
+         connector.setMessageListener(this);
+         connector.initializeContainer();
+         connector.getDestination();
+         connector.afterPropertiesSet(false);
+         
UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,getController().getComponentName()+"-JmsInputChannel.createListenerOnTempQueue()-starting
 new Listener" );
+         connector.start();
+         boolean log = true;
+         synchronized (mux) {
+                 while (connector.getListenerEndpoint() == null) {
+                         try {
+                                 if ( log ) {
+                                         log = false;
+                                         if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                                                 
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                                                                 
"createListenerOnTempQueue", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                                                 
"UIMAJMS_temp_destination_not_available_retrying__INFO",
+                                                                 new Object[] 
{ getController().getComponentName(), "5"});
+                                         }
+                                 }
+                                 mux.wait(5000);
+                                 
+                         } catch (InterruptedException e) {
+                         }
+                 }
+
+         }
+
+         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
+                                 "createListenerOnTempQueue", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                 "UIMAJMS_temp_destination_available__INFO",
+                                 new Object[] { 
getController().getComponentName(), connector.getListenerEndpoint(), 
isFreeCasDestination});
+         }
+
+         if ( isFreeCasDestination ) {
+                 ((JmsOutputChannel) getController().getOutputChannel())
+          .setFreeCasQueue(connector.getListenerEndpoint());
+         }
+      setListenerContainer(connector);
+
+         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.CONFIG)) {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, 
CLASS_NAME.getName(),
+                                 "createListenerOnTempQueue", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                 "UIMAJMS_activated_fcq__CONFIG",
+                                 new Object[] { 
getController().getComponentName(), connector.getEndpointName() });
+         }
+  }
   public void createListener(String aDelegateKey, Endpoint endpointToUpdate) 
throws Exception {
     if (getController() instanceof AggregateAnalysisEngineController) {
       Delegate delegate = ((AggregateAnalysisEngineController) getController())
@@ -1056,7 +1113,7 @@ public class JmsInputChannel implements
         newListener.setMessageListener(this);
         newListener.setController(getController());
 
-        TempDestinationResolver resolver = new TempDestinationResolver();
+        TempDestinationResolver resolver = new 
TempDestinationResolver(controller.getComponentName());
         resolver.setConnectionFactory(f);
         resolver.setListener(newListener);
         newListener.setDestinationResolver(resolver);

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 Fri Oct 14 19:30:40 2016
@@ -530,9 +530,12 @@ public class JmsOutputChannel implements
                 //  Only one thread at a time is allowed here.
                 synchronized( masterEndpoint ) {
                   if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
+                         
+                       String name =  anEndpoint.getDestination().toString();
                     //  Returns InputChannel if the Reply Listener for the 
delegate has previously failed.
                     //  If the listener hasnt failed the getReplyInputChannel 
returns null
-                    InputChannel iC = 
getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
+//                    InputChannel iC = 
getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
+                    InputChannel iC = 
getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
                     if ( iC != null ) { 
                       try {
                         // Create a new Listener, new Temp Queue and associate 
the listener with the Input Channel
@@ -778,7 +781,7 @@ public class JmsOutputChannel implements
 
 
   public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws 
AsynchAEException {
-    try {
+         try {
       anEndpoint.setReplyEndpoint(true);
       if (anEndpoint.isRemote()) {
         if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
@@ -1670,7 +1673,6 @@ public class JmsOutputChannel implements
         // produced by the CAS Multiplier. The client will treat this CAS
         // differently from the input CAS.
         tm.setIntProperty(AsynchAEMessage.MessageType, 
AsynchAEMessage.Request);
-
         isRequest = true;
         // Save the id of the parent CAS
         tm.setStringProperty(AsynchAEMessage.InputCasReference, 
getTopParentCasReferenceId(entry

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/SpringContainerDeployer.java
 Fri Oct 14 19:30:40 2016
@@ -55,7 +55,7 @@ import org.springframework.jms.support.d
 public class SpringContainerDeployer implements ControllerCallbackListener {
   private static final Class CLASS_NAME = SpringContainerDeployer.class;
 
-  private static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10;
+  public static final int MAX_PREFETCH_FOR_CAS_NOTIFICATION_Q = 10;
 
   public static final int QUIESCE_AND_STOP = 1000;
 

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/TempDestinationResolver.java
 Fri Oct 14 19:30:40 2016
@@ -33,6 +33,13 @@ public class TempDestinationResolver imp
 
   private Object mutex = new Object();
 
+  private String serviceName = "";
+  
+  public TempDestinationResolver() {
+  }
+  public TempDestinationResolver(String name) {
+         serviceName = name;
+  }
   /**
    * This method is called by the Spring listener code. It creates a single 
temp queue for all
    * listener instances. If the Spring listener is configured with more than 
one concurrentConsumer,
@@ -41,7 +48,6 @@ public class TempDestinationResolver imp
    */
   public Destination resolveDestinationName(Session session, String 
destinationName,
           boolean pubSubDomain) throws JMSException {
-
     synchronized (mutex) {
       if (destination == null) {
         destination = session.createTemporaryQueue();

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
 Fri Oct 14 19:30:40 2016
@@ -19,11 +19,9 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
-import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +36,7 @@ import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
@@ -125,6 +124,7 @@ public class UimaDefaultMessageListenerC
   //    on listener failure log once and retry silently
   private volatile boolean logListenerFailure=true;
   
+  private static CountDownLatch recoveryLatch = new CountDownLatch(4);
   public UimaDefaultMessageListenerContainer() {
     super();
     // reset global static. This only effects unit testing as services are 
deployed 
@@ -132,7 +132,7 @@ public class UimaDefaultMessageListenerC
     terminating = false;
     UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
     __listenerRef = this;
-    setRecoveryInterval(30000);  // increase connection recovery to 30 sec
+    setRecoveryInterval(400);  // increase connection recovery to 30 sec
     setAcceptMessagesWhileStopping(true);
     setExceptionListener(this);
     threadGroup = new ThreadGroup("ListenerThreadGroup_"
@@ -166,7 +166,63 @@ public class UimaDefaultMessageListenerC
                          tcon = createConnection();
                          JmsUtils.closeConnection(tcon);
                        }
-                       logger.info("Successfully refreshed JMS Connection");
+                       String ctrlName = "";
+                       if ( controller != null ) {
+                               ctrlName = "Controller: 
"+controller.getComponentName();
+                       }
+                       if ( super.getMessageSelector() != null ) {
+                               logger.info(ctrlName+" Successfully refreshed 
JMS Connection - Selector "+super.getMessageSelector()+" Instance 
hashcode:"+this.hashCode());
+                               
+                       } else {
+                               logger.info(ctrlName+" Successfully refreshed 
JMS Connection ");
+                               
+                       }
+                       //if (controller != null && controller instanceof 
AggregateAnalysisEngineController) {
+                                       //      If endpoint not set, this is a 
temp reply queue listener.
+                      if (getDestination() != null && 
((ActiveMQDestination)getDestination()).isTemporary()) {
+                          destroy();
+                          
logger.info("Controller:"+controller.getComponentName()+"... Destroyed Listener 
on a temp queue:"+getDestination());
+//                        
((JmsOutputChannel)controller.getOutputChannel()).setFreeCasQueue(getDestination());
+                        
+                          if ( freeCasQueueListener ) {
+                                       
logger.info("Controller:"+controller.getComponentName()+" ------------------- 
Creating new listener for the FreeCas temp queue");
+                                       try {
+                                           
((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory(),
 true);
+                                               
logger.info("Controller:"+controller.getComponentName()+"------------------- 
New listener on FreeCas temp queue is ready");
+                                       } catch( Exception e) {
+                                         if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                                             
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                                                     "handleTempQueueFailure", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                                     
"UIMAJMS_exception__WARNING", e);
+                                           }
+                                       }
+                          }
+                          
+                                               /*  
+                       if ( getMessageListener() instanceof JmsInputChannel ) {
+                               System.out.println("------------------- 
Creating new listener for the temp queue");
+                               try {
+                               
((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
+                                       System.out.println("------------------- 
New listener on temp queue is ready");
+                               } catch( Exception e) {
+                                       System.out.println("------------------- 
Error while creating new listener on temp queue");
+                                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                                     
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                                             "handleTempQueueFailure", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                             "UIMAJMS_exception__WARNING", e);
+                                   }
+                                       
+                               }
+                       }
+*/
+                      
+                      }
+                      
+         /*
+                      String delegateKey = 
((AggregateAnalysisEngineController) controller)
+                       .lookUpDelegateKey(endpoint.getEndpoint());
+               */
+                       //}
                        break;
                      }
                      catch (Exception ex) {
@@ -186,8 +242,8 @@ public class UimaDefaultMessageListenerC
                        }
                      }
                     // sleepInbetweenRecoveryAttempts();
-                     setRecoveryInterval(10);
-                   }       
+   //                setRecoveryInterval(10);
+               }           
        }
     } catch( IllegalStateException e ) {
     }
@@ -243,7 +299,7 @@ public class UimaDefaultMessageListenerC
         } else {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
-                        "handleTempQueueFailure", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                        "handleListenerFailure", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                         "UIMAJMS_unable_to_lookup_input_channel__INFO", 
queueName);
           }
         }
@@ -291,6 +347,24 @@ public class UimaDefaultMessageListenerC
       
     } catch( Exception exx ) { // shared connection  may not exist yet if a 
broker is not up
     }
+    if (t instanceof InvalidDestinationException ) {
+       destroy();
+       if ( getMessageListener() instanceof JmsInputChannel ) {
+               try {
+               //      
((JmsInputChannel)getMessageListener()).createListenerOnTempQueue(getConnectionFactory());
+               } catch( Exception e) {
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                     UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                             "handleTempQueueFailure", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                             "UIMAJMS_exception__WARNING", e);
+                   }
+                       
+               }
+       }
+       return;
+    }
+
+    
     if ( (conn != null && conn.isTransportFailed() ) || 
             t instanceof javax.jms.IllegalStateException
             && t.getMessage().equals("The Consumer is closed")) {
@@ -340,6 +414,7 @@ public class UimaDefaultMessageListenerC
       }
     } else if (disableListener(t)) {
       handleQueueFailure(t);
+    } else {
     }
   }
 
@@ -442,7 +517,7 @@ public class UimaDefaultMessageListenerC
       }
     
     
-    setRecoveryInterval(0);
+    setRecoveryInterval(1);
 
     // Spin a shutdown thread to terminate listener.
     new Thread() {
@@ -886,7 +961,7 @@ public class UimaDefaultMessageListenerC
                     JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_debug_msg__FINEST",
                      new Object[] { msg });
          }
-       setRecoveryInterval(0);
+       setRecoveryInterval(1);
       setAutoStartup(false);
       if ( getSharedConnection() != null ) {
         ActiveMQConnection amqc = (ActiveMQConnection)getSharedConnection();

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 Fri Oct 14 19:30:40 2016
@@ -319,7 +319,9 @@ public class BaseUIMAAsynchronousEngine_
              if ( amqc != null && !amqc.isClosed() && !amqc.isClosing() && 
consumerDestination != null && 
                   consumerDestination instanceof ActiveMQTempDestination ) {
                try {
-                 
amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
+                       if ( !amqc.isClosed() && !amqc.isTransportFailed()) {
+                         
amqc.deleteTempDestination((ActiveMQTempDestination)consumerDestination);
+                       }
                } catch( Exception e) {
                  e.printStackTrace();
                }
@@ -351,6 +353,7 @@ public class BaseUIMAAsynchronousEngine_
                        sender.doStop();
                      }
                          try {
+                                 
System.out.println(this.getClass().getName()+".stop() - Stopping UIMA-AS 
Client");
                                stopConnection();
                                // Undeploy all containers
                                undeploy();

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
 Fri Oct 14 19:30:40 2016
@@ -37,6 +37,8 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -58,6 +60,8 @@ import org.apache.uima.aae.client.UimaAs
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.error.ServiceShutdownException;
+import org.apache.uima.aae.error.UimaASPingTimeout;
+import org.apache.uima.aae.error.UimaASProcessCasTimeout;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.JmsInputChannel;
@@ -120,6 +124,358 @@ public class TestUimaASExtended extends
 
        return b.getDefaultSocketURIString();
     }  
+    
+    
+    /**
+     * Tests error handling of the client. It deploys Aggregate service Cas 
Multiplier. initializes
+     * the client and sends a CAS for processing. The child CAS is than held 
in NoOp Annotator for
+     * 30 secs to simulate heavy processing. While the CAS is being processed, 
a broker is stopped.
+     * The client should timeout after 40 secs and attempt to send 2 more 
CASes. Since the broker
+     * is down, each of these 2 CASes goes into a retry list while a 
Connection is being retried.
+     * Both should timeout, and sendAndReceive() should fail due to a timeout.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testClientRecoveryFromBrokerFailure() throws Exception {
+      System.out.println("-------------- testClientRecoveryFromBrokerFailure 
-------------");
+      System.setProperty("BrokerURL", 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new 
BaseUIMAAsynchronousEngine_impl();
+      deployService(uimaAsEngine, relativePath + 
"/Deploy_AggregateMultiplierWith30SecDelay.xml");
+
+      Map<String, Object> appCtx = 
+         
buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(),
 "TopLevelTaeQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 40000);   // AE will hold the 
CAS for 30 secs so this needs to be larger
+      appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+
+      ScheduledExecutorService s = 
Executors.newSingleThreadScheduledExecutor();
+
+      // schedule a thread that will stop the broker after 10 secs
+      s.schedule(
+                 new Runnable() {
+
+                               @Override
+                               public void run() {
+                                       try {
+                                               System.out.println("Stopping 
Broker ...");
+                                               broker.stop();
+                                       broker.waitUntilStopped();
+                                               System.out.println("Broker 
Stopped...");
+
+                                       } catch( Exception e) {
+                                               
+                                       }
+                                       
+                               }
+                         
+                 }
+                 , 10, TimeUnit.SECONDS);
+      int timeoutCount=0;
+
+      // try to send 3 CASes, each should timeout
+      for (int i = 0; i < 3; i++) {
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+          try {
+                 System.out.println("............... Client Sending CAS 
#"+(i+1));
+              uimaAsEngine.sendAndReceiveCAS(cas);
+            } catch( Exception e) {
+               if ( e instanceof UimaASProcessCasTimeout ) {
+                       timeoutCount++;
+                       System.out.println("Client .............. 
"+e.getMessage());
+                       if ( e.getCause() != null && e instanceof 
UimaASPingTimeout) {
+                               System.out.println("Client .............. 
"+e.getCause().getMessage());
+                       }
+               } else if ( e.getCause() instanceof UimaASProcessCasTimeout ) {
+                       timeoutCount++;
+                       System.out.println("Client .............. 
"+e.getCause().getMessage());
+                       if ( e.getCause().getCause() != null && 
e.getCause().getCause() instanceof UimaASPingTimeout) {
+                               System.out.println("Client .............. 
"+e.getCause().getCause().getMessage());
+                       }
+               } else {
+                       e.printStackTrace();
+               }
+               //              System.out.println("Client Received Expected 
Error on CAS:"+(i+1));
+            } finally {
+              cas.release();
+            }
+      }
+      if ( timeoutCount != 3) {
+          uimaAsEngine.stop();
+         fail("Expected 3 Errors Due to Timeout, Instead Got "+timeoutCount+" 
Timeouts");
+      } else {
+          uimaAsEngine.stop();
+      }
+
+ }
+    
+    @Test
+    public void testBrokerRestartWithAggregateMultiplier() throws Exception {
+      System.out.println("-------------- 
testBrokerRestartWithAggregateMultiplier -------------");
+      System.setProperty("BrokerURL", 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
+      deployService(eeUimaEngine, relativePath + 
"/Deploy_RemoteCasMultiplier.xml");
+      deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
+      deployService(eeUimaEngine, relativePath + 
"/Deploy_AggregateMultiplier.xml");
+
+      String burl = 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+      Map<String, Object> appCtx = buildContext(burl, "TopLevelTaeQueue");
+      synchronized(this) {
+         this.wait(2000);
+      }
+      broker.stop();
+      broker.waitUntilStopped();
+
+      synchronized(this) {
+         this.wait(2000);
+      }
+
+      broker = createBroker();
+      broker.start();
+      broker.waitUntilStarted();
+      synchronized(this) {
+         this.wait(2000);
+      }
+
+
+      // reduce the cas pool size and reply window
+      appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+      appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+      runTest(appCtx, eeUimaEngine,burl,
+              "TopLevelTaeQueue", 1, PROCESS_LATCH);
+      eeUimaEngine.stop();
+ }
+    
+    /**
+     * Tests client and service recovery from broker restart. It deploys CM 
service, dispatches
+     * a CAS for processing and while the CAS is in process, it bounces a 
broker. The service
+     * listeners should be restored and the CAS should fail due to invalid 
destination. Once
+     * the client times out, it should send 2 more CASes which should force 
client to re-establish
+     * connection with a broker and replies should come back.
+     * 
+     * @throws Exception
+     */
+
+    @Test
+    public void testBrokerRestartWithAggregateMultiplierWhileProcessingCAS() 
throws Exception {
+      System.out.println("-------------- 
testBrokerRestartWithAggregateMultiplierWhileProcessingCAS -------------");
+      System.setProperty("BrokerURL", 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new 
BaseUIMAAsynchronousEngine_impl();
+      deployService(uimaAsEngine, relativePath + 
"/Deploy_AggregateMultiplierWith30SecDelay.xml");
+
+      Map<String, Object> appCtx = 
+         
buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(),
 "TopLevelTaeQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
+      appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+
+
+      ScheduledExecutorService s = 
Executors.newSingleThreadScheduledExecutor();
+
+      s.schedule(
+                 new Runnable() {
+
+                               @Override
+                               public void run() {
+                                       try {
+                                               System.out.println("Stopping 
Broker ...");
+                                               broker.stop();
+                                       broker.waitUntilStopped();
+                                               System.out.println("Broker 
Stopped...");
+
+                                           broker = createBroker();
+                                           broker.start();
+                                           broker.waitUntilStarted();
+                                           System.out.println("Broker 
Restarted...");
+
+                                       
+                                       } catch( Exception e) {
+                                               
+                                       }
+                                       
+                               }
+                         
+                 }
+                 , 10, TimeUnit.SECONDS);
+
+
+      for (int i = 0; i < 3; i++) {
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+          try {
+                 System.out.println("............... Client Sending CAS 
#"+(i+1));
+              uimaAsEngine.sendAndReceiveCAS(cas);
+            } catch( Exception e) {
+               e.printStackTrace();
+               //              System.out.println("Client Received Expected 
Error on CAS:"+(i+1));
+            } finally {
+              cas.release();
+            }
+
+
+      }
+      uimaAsEngine.stop();
+ }
+
+    
+    @Test
+    public void testBrokerRestartWhileProcessingCAS() throws Exception {
+      System.out.println("--------------  testBrokerRestartWhileProcessingCAS 
-------------");
+      System.setProperty("BrokerURL", 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl uimaAsEngine = new 
BaseUIMAAsynchronousEngine_impl();
+      
+      deployService(uimaAsEngine, relativePath + 
"/Deploy_NoOpAnnotatorWith30SecDelay.xml");
+      Map<String, Object> appCtx = 
+      
buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(),
 "NoOpAnnotatorQueue");
+      appCtx.put(UimaAsynchronousEngine.Timeout, 40000);
+      appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+      initialize(uimaAsEngine, appCtx);
+      waitUntilInitialized();
+
+
+      ScheduledExecutorService s = 
Executors.newSingleThreadScheduledExecutor();
+      
+      s.schedule(
+                 new Runnable() {
+
+                               @Override
+                               public void run() {
+                                       try {
+                                               System.out.println("Stopping 
Broker ...");
+                                               broker.stop();
+                                       broker.waitUntilStopped();
+                                               System.out.println("Broker 
Stopped...");
+
+                                           broker = createBroker();
+                                           broker.start();
+                                           broker.waitUntilStarted();
+                                           System.out.println("Broker 
Restarted...");
+
+                                       
+                                       } catch( Exception e) {
+                                               
+                                       }
+                                       
+                               }
+                         
+                 }
+                 , 10, TimeUnit.SECONDS);
+
+      for (int i = 0; i < 1; i++) {
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+          try {
+              uimaAsEngine.sendAndReceiveCAS(cas);
+            } catch( Exception e) {
+               e.printStackTrace();
+               //              System.out.println("Client Received Expected 
Error on CAS:"+(i+1));
+            } finally {
+              cas.release();
+            }
+
+
+      }
+      uimaAsEngine.stop();
+      
+      
+    }
+    /**
+     * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 
the test
+     * fails due to broker bug. What happens is that when a jms client uses 
http
+     * protocol, the connection is made but the keep-alive chat between broker 
and
+     * client is not causing a timeout and an exception. 
+     * 
+     * The exception is internal to the broker but it also happens within amq
+     * client code. To get to this, a custom spring based listener is deployed 
+     * with some of its exception handling methods overriden to capture an 
exception. 
+     *  
+     * @throws Exception
+     */
+    @Test
+    public void testServiceWithHttpListeners() throws Exception {
+           System.out.println("-------------- testServiceWithHttpListeners 
-------------");
+           // Need java monitor object on which to sleep
+           Object waitObject = new Object();
+           // Custom spring listener with handleListenerSetupFailure() 
overriden to 
+           // capture AMQ exception.
+           TestDefaultMessageListenerContainer c = new 
TestDefaultMessageListenerContainer();
+           c.setConnectionFactory(new 
ActiveMQConnectionFactory("http://localhost:18888";));
+           c.setDestinationName("TestQ");
+           c.setConcurrentConsumers(2);
+           c.setBeanName("TestBean");
+           c.setMessageListener(new JmsInputChannel());
+           c.initialize();
+           c.start();
+           
+           if ( c.isRunning() ) {
+                   System.out.println("... Listener Ready");
+               
+           }
+           // Keep-alive has a default 30 secs timeout. Sleep for bit longer 
than that
+           // If there is an exception due to keep-alive, an exception handler 
will be
+           // called on the TestDefaultMessageListenerContainer instance where 
we 
+           // capture the error.
+           System.out.println("... Waiting for 40 secs");
+           try {
+               synchronized(waitObject) {
+                       waitObject.wait(40000);
+               }
+               // had there been broker issues relateds to keep-alive the 
listener's failed
+               // flag would have been set by now. Check it and fail the test 
+               if ( c.failed() ) {
+                       fail("Broker Failed - Reason:"+c.getReasonForFailure());
+               } else {
+                       System.out.println("Stopping Listener");
+                       c.stop();
+
+               }
+           } catch( Exception e) {
+               e.printStackTrace();
+               fail(e.getMessage());
+           }
+    }
+
+    
+
+    @Test
+    public void testBrokerRestartWithPrimitiveMultiplier() throws Exception {
+      System.out.println("-------------- 
testBrokerRestartWithPrimitiveMultiplier -------------");
+      System.setProperty("BrokerURL", 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
+      BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
+      
+      deployService(eeUimaEngine, relativePath + 
"/Deploy_RemoteCasMultiplier.xml");
+     
+      
+      broker.stop();
+      broker.waitUntilStopped();
+
+      broker = createBroker();
+      broker.start();
+      broker.waitUntilStarted();
+
+      String burl = 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+      Map<String, Object> appCtx = 
+      buildContext(burl, "TestMultiplierQueue");
+
+      // reduce the cas pool size and reply window
+      appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
+      appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
+      runTest(appCtx, eeUimaEngine,burl,
+              "TestMultiplierQueue", 1, PROCESS_LATCH);
+      
+      eeUimaEngine.stop();
+    }
+
+    
+    
   /*
   public void testContinueOnRetryFailure2() throws Exception {
            System.out.println("-------------- testContinueOnRetryFailure 
-------------");
@@ -147,9 +503,159 @@ public class TestUimaASExtended extends
          }
 */
   
+    
+    
+    /*
+     * Tests 
+     */
+    @Test
+    public void testSyncClientRecoveryFromBrokerStopAndRestart3() throws 
Exception  {
+      System.out.println("-------------- 
testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+      System.setProperty("BrokerURL", 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+ 
+      // Instantiate Uima AS Client
+        BaseUIMAAsynchronousEngine_impl uimaAsEngine = new 
BaseUIMAAsynchronousEngine_impl();
+        //BrokerService broker2 = setupSecondaryBroker(true);
+        // Deploy Uima AS Primitive Service
+        deployService(uimaAsEngine, relativePath + 
"/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+        
+        Map<String, Object> appCtx = 
+        
buildContext(broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString(),
 "NoOpAnnotatorQueue");
+        appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+        appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+        appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+        initialize(uimaAsEngine, appCtx);
+        waitUntilInitialized();
+
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        
//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+        //broker2 = setupSecondaryBroker(true);
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+        int errorCount = 0;
+        System.out.println("Sending CASes");
+        for (int i = 0; i < 60; i++) {
+            CAS cas = uimaAsEngine.getCAS();
+            cas.setDocumentText("Some Text");
+            try {
+                uimaAsEngine.sendAndReceiveCAS(cas);
+              } catch( Exception e) {
+                System.out.println("Client Received Expected Error on 
CAS:"+(i+1));
+              } finally {
+                cas.release();
+              }
 
 
- 
+        }
+        uimaAsEngine.stop();
+        
+        /*
+        int errorCount=0;
+        for (int i = 0; i < 20; i++) {
+          
+          if ( i == 5 ) {
+            broker2.stop();
+            broker2.waitUntilStopped();
+          } else if ( i == 10 ) {
+            //  restart the broker 
+            
System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+            broker2 = setupSecondaryBroker(true);
+            
+            broker2.start();
+            broker2.waitUntilStarted();
+
+          }
+          CAS cas = uimaAsEngine.getCAS();
+          cas.setDocumentText("Some Text");
+        //  System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " 
Request to a Service");
+          try {
+            uimaAsEngine.sendAndReceiveCAS(cas);
+          } catch( Exception e) {
+            errorCount++;
+            System.out.println("Client Received Expected Error on CAS:"+(i+1));
+          } finally {
+            cas.release();
+          }
+        }
+        
+        uimaAsEngine.stop();
+        super.cleanBroker(broker2);
+
+        broker2.stop();
+
+        //  expecting 5 failures due to broker missing
+        if ( errorCount != 5 ) {
+          fail("Expected 5 failures due to broker down, instead 
received:"+errorCount+" failures");
+        }
+        broker2.waitUntilStopped();
+*/
+    }
+
+    /*
+    
+    @Test
+    public void testSyncClientRecoveryFromBrokerStopAndRestart2() throws 
Exception  {
+        broker.stop();
+        broker.waitUntilStopped();
+       System.out.println("-------------- 
testSyncClientRecoveryFromBrokerStopAndRestart -------------");
+         // Instantiate Uima AS Client
+          BaseUIMAAsynchronousEngine_impl uimaAsEngine = new 
BaseUIMAAsynchronousEngine_impl();
+          BrokerService broker2 = setupSecondaryBroker(true);
+          // Deploy Uima AS Primitive Service
+          deployService(uimaAsEngine, relativePath + 
"/Deploy_NoOpAnnotatorWithPlaceholder.xml");
+          
+          Map<String, Object> appCtx = 
+          
buildContext(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(),
 "NoOpAnnotatorQueue");
+          appCtx.put(UimaAsynchronousEngine.Timeout, 5100);
+          appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+          appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 20000);
+          initialize(uimaAsEngine, appCtx);
+          waitUntilInitialized();
+          
+          // Get meta received, bounce the broker now. 
+          broker2.stop();
+          broker2.waitUntilStopped();
+
+          
+//          ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("");
+//          ActiveMQConnection c = (ActiveMQConnection)f.createConnection();
+          //  restart the broker 
+          
System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+          broker2 = setupSecondaryBroker(true);
+          
+          broker2.start();
+          broker2.waitUntilStarted();
+        
+          // new broker is up. Send a few CASes now
+          for( int i=0; i < 5; i++) {
+              CAS cas = uimaAsEngine.getCAS();
+              cas.setDocumentText("Some Text");
+              try {
+                uimaAsEngine.sendAndReceiveCAS(cas);
+              } catch( Exception e) {
+                 e.printStackTrace();
+                // fail("Unexpected exception from sendAndReceive()- test 
Failed");
+              } finally {
+                cas.release();
+              }
+                 
+          }
+
+          
+          
+          uimaAsEngine.stop();
+          super.cleanBroker(broker2);
+
+          broker2.stop();
+          broker2.waitUntilStopped();
+
+      }
+
+ */
   
   /**
    * Test use of a JMS Service Adapter. Invoke from a synchronous aggregate to 
emulate usage from
@@ -636,63 +1142,6 @@ public class TestUimaASExtended extends
        }
   
 
-  /**
-   * This tests if broker keep-alive protocol is working. With AMQ 5.13.2 the 
test
-   * fails due to broker bug. What happens is that when a jms client uses http
-   * protocol, the connection is made but the keep-alive chat between broker 
and
-   * client is not causing a timeout and an exception. 
-   * 
-   * The exception is internal to the broker but it also happens within amq
-   * client code. To get to this, a custom spring based listener is deployed 
-   * with some of its exception handling methods overriden to capture an 
exception. 
-   *  
-   * @throws Exception
-   */
-  @Test
-  public void testServiceWithHttpListeners() throws Exception {
-           System.out.println("-------------- testServiceWithHttpListeners 
-------------");
-           // Need java monitor object on which to sleep
-           Object waitObject = new Object();
-           // Custom spring listener with handleListenerSetupFailure() 
overriden to 
-           // capture AMQ exception.
-           TestDefaultMessageListenerContainer c = new 
TestDefaultMessageListenerContainer();
-           c.setConnectionFactory(new 
ActiveMQConnectionFactory("http://localhost:18888";));
-           c.setDestinationName("TestQ");
-           c.setConcurrentConsumers(2);
-           c.setBeanName("TestBean");
-           c.setMessageListener(new JmsInputChannel());
-           c.initialize();
-           c.start();
-           
-           if ( c.isRunning() ) {
-                   System.out.println("... Listener Ready");
-               
-           }
-           // Keep-alive has a default 30 secs timeout. Sleep for bit longer 
than that
-           // If there is an exception due to keep-alive, an exception handler 
will be
-           // called on the TestDefaultMessageListenerContainer instance where 
we 
-           // capture the error.
-           System.out.println("... Waiting for 40 secs");
-           try {
-               synchronized(waitObject) {
-                       waitObject.wait(40000);
-               }
-               // had there been broker issues relateds to keep-alive the 
listener's failed
-               // flag would have been set by now. Check it and fail the test 
-               if ( c.failed() ) {
-                       fail("Broker Failed - Reason:"+c.getReasonForFailure());
-               } else {
-                       System.out.println("Stopping Listener");
-                       c.stop();
-
-               }
-           } catch( Exception e) {
-               e.printStackTrace();
-               fail(e.getMessage());
-           }
-  }
-
-  
   @Test
   public void testAggregateHttpTunnelling() throws Exception {
     System.out.println("-------------- testAggregateHttpTunnelling 
-------------");
@@ -1469,15 +1918,18 @@ public class TestUimaASExtended extends
     deployService(uimaAsEngine, relativePath + 
"/Deploy_NoOpAnnotatorWithLongDelay.xml");
     Map<String, Object> appCtx = 
buildContext(String.valueOf(getMasterConnectorURI(broker)),
             "NoOpAnnotatorQueueLongDelay");
-    appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+    appCtx.put(UimaAsynchronousEngine.Timeout, 300);
     initialize(uimaAsEngine, appCtx);
     waitUntilInitialized();
-
-    for (int i = 0; i < 1; i++) {
+    Object o = new Object();
+    for (int i = 0; i < 6; i++) {
       CAS cas = uimaAsEngine.getCAS();
       cas.setDocumentText("Some Text");
  //     System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request 
to a Service");
       uimaAsEngine.sendCAS(cas);
+      synchronized(o) {
+         o.wait(1000);
+      }
     }
     
     uimaAsEngine.collectionProcessingComplete();
@@ -1882,19 +2334,26 @@ public class TestUimaASExtended extends
   public void testDeployAggregateService() throws Exception {
     System.out.println("-------------- testDeployAggregateService 
-------------");
     BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
+    
+    
+ //   System.setProperty("BrokerURL", "tcp::/localhost:61616");
+
+    
     System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
     deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
     deployService(eeUimaEngine, relativePath + 
"/Deploy_AggregateAnnotator.xml");
     
     Map<String, Object> appCtx = 
buildContext(String.valueOf(getMasterConnectorURI(broker)),
+//             Map<String, Object> appCtx = 
buildContext("tcp://localhost:61616",
             "TopLevelTaeQueue");
-    appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+    appCtx.put(UimaAsynchronousEngine.Timeout, 1000);
     appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
     
     
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); 
     
-    runTest(appCtx, eeUimaEngine, 
String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
-            10, PROCESS_LATCH);
+//    runTest(appCtx, eeUimaEngine, 
String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
+    runTest(appCtx, eeUimaEngine, "tcp://localhost:61616", "TopLevelTaeQueue",
+            1, PROCESS_LATCH);
   }
   /**
    * Sends total of 10 CASes to async aggregate configured to process 2 CASes 
at a time.
@@ -2257,7 +2716,7 @@ public class TestUimaASExtended extends
            System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
            deployService(eeUimaEngine, relativePath + 
"/Deploy_ComplexAggregateWithInnerUimaAggregateCM.xml");
            runTest(null, eeUimaEngine, 
String.valueOf(getMasterConnectorURI(broker)), "TopLevelTaeQueue",
-                   10, PROCESS_LATCH);
+                   1, PROCESS_LATCH);
          }
 
   /**
@@ -2932,23 +3391,50 @@ public class TestUimaASExtended extends
   @Test
   public void testClientWithAggregateMultiplier() throws Exception {
     System.out.println("-------------- testClientWithAggregateMultiplier 
-------------");
+    System.setProperty("BrokerURL", 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString());
+
     BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
     deployService(eeUimaEngine, relativePath + 
"/Deploy_RemoteCasMultiplier.xml");
+    broker.stop();
+    broker.waitUntilStopped();
+
+    
//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+    //broker2 = setupSecondaryBroker(true);
+    broker = createBroker();
+    broker.start();
+    broker.waitUntilStarted();
+
     deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
     deployService(eeUimaEngine, relativePath + 
"/Deploy_AggregateMultiplier.xml");
 
-    Map<String, Object> appCtx = 
buildContext(String.valueOf(getMasterConnectorURI(broker)),
-            "TopLevelTaeQueue");
+    String burl = 
broker.getConnectorByName(DEFAULT_BROKER_URL_KEY).getUri().toString();
+    Map<String, Object> appCtx = 
+    buildContext(burl, "TopLevelTaeQueue");
+
+//    Map<String, Object> appCtx = 
buildContext(String.valueOf(getMasterConnectorURI(broker)),
+  //          "TopLevelTaeQueue");
+
+broker.stop();
+broker.waitUntilStopped();
+
+//System.setProperty("activemq.broker.jmx.domain","org.apache.activemq.test");
+//broker2 = setupSecondaryBroker(true);
+broker = createBroker();
+broker.start();
+broker.waitUntilStarted();
+
 
     // reduce the cas pool size and reply window
     appCtx.remove(UimaAsynchronousEngine.ShadowCasPoolSize);
     appCtx.put(UimaAsynchronousEngine.ShadowCasPoolSize, Integer.valueOf(2));
-    runTest(appCtx, eeUimaEngine, 
String.valueOf(getMasterConnectorURI(broker)),
+//    runTest(appCtx, eeUimaEngine, 
String.valueOf(getMasterConnectorURI(broker)),
+    runTest(appCtx, eeUimaEngine,burl,
             "TopLevelTaeQueue", 1, PROCESS_LATCH);
   }
   @Test
   public void testClientProcessWithRemoteMultiplier() throws Exception {
     System.out.println("-------------- testClientProcessWithRemoteMultiplier 
-------------");
+   
     BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
     deployService(eeUimaEngine, relativePath + 
"/Deploy_RemoteCasMultiplier.xml");
 

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
 Fri Oct 14 19:30:40 2016
@@ -353,8 +353,10 @@ public class ActiveMQSupport extends Tes
     System.clearProperty("BrokerURL");
   
     wait(3000);
-    cleanBroker(broker);
-    stopBroker();
+    if ( !broker.isStopped()) {
+        cleanBroker(broker);
+        stopBroker();
+    }
   }
   
   public class UimaASErrorHandler implements ErrorHandler {

Added: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
 (added)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
 Fri Oct 14 19:30:40 2016
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+   <!--
+   ***************************************************************
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+         *
+   *   http://www.apache.org/licenses/LICENSE-2.0
+   * 
+   * Unless required by applicable law or agreed to in writing,
+   * software distributed under the License is distributed on an
+   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   * KIND, either express or implied.  See the License for the
+   * specific language governing permissions and limitations
+   * under the License.
+   ***************************************************************
+   -->
+<analysisEngineDeploymentDescription 
+  xmlns="http://uima.apache.org/resourceSpecifier";>
+  
+  <name>NoOp Annotator A</name>
+  <description>Deploys NoOp Annotator Primitive AE</description>
+  
+  <deployment protocol="jms" provider="activemq">
+    <casPool numberOfCASes="5" initialFsHeapSize="500"/> 
+    <service>
+      <inputQueue endpoint="NoOpAnnotatorQueue" 
brokerURL="${DefaultBrokerURL}" prefetch="1"/>
+      <topDescriptor>
+       <import 
location="../descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml"/> 
+      </topDescriptor>
+      <analysisEngine>
+      <asyncPrimitiveErrorConfiguration>
+           <!-- <processCasErrors thresholdCount="4" thresholdWindow="10" 
thresholdAction="terminate" /> -->
+           <collectionProcessCompleteErrors additionalErrorAction="terminate" 
/>
+      </asyncPrimitiveErrorConfiguration>
+      </analysisEngine>
+    
+      
+    </service>
+  </deployment>
+
+</analysisEngineDeploymentDescription>
+

Propchange: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWith30SecDelay.xml
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
 (added)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
 Fri Oct 14 19:30:40 2016
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+  <!--
+   ***************************************************************
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+         *
+   *   http://www.apache.org/licenses/LICENSE-2.0
+   * 
+   * Unless required by applicable law or agreed to in writing,
+   * software distributed under the License is distributed on an
+   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   * KIND, either express or implied.  See the License for the
+   * specific language governing permissions and limitations
+   * under the License.
+   ***************************************************************
+   -->
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier";>
+  <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+  <primitive>true</primitive>
+  
<annotatorImplementationName>org.apache.uima.ae.noop.NoOpAnnotator</annotatorImplementationName>
+  <analysisEngineMetaData>
+    <name>NoOp Annotator</name>
+    <description>Annotator That Does Nothin</description>
+    <version>1.0</version>
+    <vendor>The Apache Software Foundation</vendor>
+    
+    <configurationParameters>
+          <configurationParameter>
+        <name>ErrorFrequency</name>
+        <description>Frequency of Generated Errors</description>
+        <type>Integer</type>
+        <multiValued>false</multiValued>
+        <mandatory>true</mandatory>
+      </configurationParameter>
+
+      <configurationParameter>
+        <name>ProcessDelay</name>
+        <description>Process Delay</description>
+        <type>Integer</type>
+        <multiValued>false</multiValued>
+        <mandatory>true</mandatory>
+      </configurationParameter>
+
+    
+    </configurationParameters>
+    
+    <configurationParameterSettings>
+          <nameValuePair>
+        <name>ErrorFrequency</name>
+        <value>
+          <integer>0</integer>
+        </value>
+      </nameValuePair>
+
+          <nameValuePair>
+        <name>ProcessDelay</name>
+        <value>
+          <integer>30000</integer>
+        </value>
+      </nameValuePair>
+    
+    </configurationParameterSettings>
+    
+    
+    
+    <typeSystemDescription>
+    </typeSystemDescription>
+    
+    <capabilities>
+    </capabilities>
+       
+    <operationalProperties>
+               <modifiesCas>true</modifiesCas>
+               <multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+               <outputsNewCASes>false</outputsNewCASes>
+       </operationalProperties>          
+  </analysisEngineMetaData>
+</analysisEngineDescription>
+

Propchange: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith30SecDelay.xml
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml?rev=1764952&view=auto
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
 (added)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
 Fri Oct 14 19:30:40 2016
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+       <!--
+        ***************************************************************
+        * Licensed to the Apache Software Foundation (ASF) under one
+        * or more contributor license agreements.  See the NOTICE file
+        * distributed with this work for additional information
+        * regarding copyright ownership.  The ASF licenses this file
+        * to you under the Apache License, Version 2.0 (the
+        * "License"); you may not use this file except in compliance
+        * with the License.  You may obtain a copy of the License at
+         *
+        *   http://www.apache.org/licenses/LICENSE-2.0
+        * 
+        * Unless required by applicable law or agreed to in writing,
+        * software distributed under the License is distributed on an
+        * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+        * KIND, either express or implied.  See the License for the
+        * specific language governing permissions and limitations
+        * under the License.
+        ***************************************************************
+   -->
+   
+<analysisEngineDescription xmlns="http://uima.apache.org/resourceSpecifier";>
+  <frameworkImplementation>org.apache.uima.java</frameworkImplementation>
+  <primitive>false</primitive>
+  <delegateAnalysisEngineSpecifiers>
+    
+    <delegateAnalysisEngine key="TestMultiplier">
+      <import 
location="../multiplier/SimpleCasGeneratorProducing1000Cases.xml"/>
+    </delegateAnalysisEngine>
+
+
+      <delegateAnalysisEngine key="NoOp">
+      <import location="NoOpAnnotatorWith30SecDelay.xml"/>
+    </delegateAnalysisEngine>
+  
+  </delegateAnalysisEngineSpecifiers>
+  <analysisEngineMetaData>
+    <name>Test Aggregate TAE</name>
+    <description>Detects Nothing</description>
+    <configurationParameters/>
+    <configurationParameterSettings/>
+    <flowConstraints>
+      <fixedFlow>
+      
+        <node>TestMultiplier</node>
+        <node>NoOp</node> 
+      </fixedFlow>
+    </flowConstraints>
+    <capabilities>
+      <capability>
+        <inputs/>
+        <outputs>
+        </outputs>
+        <languagesSupported>
+          <language>en</language>
+        </languagesSupported>
+      </capability>
+    </capabilities>
+       <operationalProperties>
+               <modifiesCas>true</modifiesCas>
+               <multipleDeploymentAllowed>true</multipleDeploymentAllowed>
+               <outputsNewCASes>true</outputsNewCASes>
+       </operationalProperties>
+  </analysisEngineMetaData>
+</analysisEngineDescription>

Propchange: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregateCasMultiplierWith30SecDelay.xml
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 Fri Oct 14 19:30:40 2016
@@ -145,9 +145,9 @@ public abstract class BaseAnalysisEngine
 
   protected long errorCount = 0;
 
-  protected List inputChannelList = new ArrayList();
+  protected List<InputChannel> inputChannelList = new 
ArrayList<InputChannel>();
 
-  protected ConcurrentHashMap inputChannelMap = new ConcurrentHashMap();
+  protected ConcurrentHashMap<String, InputChannel> inputChannelMap = new 
ConcurrentHashMap<String, InputChannel>();
 
   private UimaEEAdminContext adminContext;
 
@@ -1088,7 +1088,7 @@ public abstract class BaseAnalysisEngine
   public void addInputChannel(InputChannel anInputChannel) {
     if (!inputChannelMap.containsKey(anInputChannel.getInputQueueName())) {
       inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
-      if (inputChannelList.contains(anInputChannel)) {
+      if (!inputChannelList.contains(anInputChannel)) {
         inputChannelList.add(anInputChannel);
       }
     }
@@ -2410,13 +2410,34 @@ public abstract class BaseAnalysisEngine
     return null;
   }
 
-  public InputChannel getReplyInputChannel(String aDelegateKey) {
-    for (int i = 0; inputChannelList != null && i < inputChannelList.size(); 
i++) {
-      if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) {
-        return (InputChannel) inputChannelList.get(i);
+//  public InputChannel getReplyInputChannel(String aDelegateKey) {
+  public InputChannel getReplyInputChannel(String aDestination) {
+         InputChannel IC = null;
+         if ( inputChannelMap.containsKey(aDestination) ) {
+                 return inputChannelMap.get(aDestination);
+         }
+/*
+         for( InputChannel inputChannel : inputChannelList) {
+//               if ( inputChannel.get)
+               if ( inputChannel.isFailed(aDelegateKey)) {
+                       
System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found 
InputChannel for Delegate:"+aDelegateKey+" hashCode="+inputChannel.hashCode());
+                       IC = inputChannel;
+             }
+                       
System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next 
Input Channel - hashcode="+inputChannel.hashCode());
+
+         }
+         */
+/*
+         for (int i = 0; inputChannelList != null && i < 
inputChannelList.size(); i++) {
+
+       if (((InputChannel) inputChannelList.get(i)).isFailed(aDelegateKey)) {
+               
System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Found 
InputChannel for Delegate:"+aDelegateKey);
+               return (InputChannel) inputChannelList.get(i);
       }
+               
System.out.println("BaseAnalysisEngineController.gerReplyInputChannel()-Next 
Input Channel - hashcode="+);
     }
-    return null;
+    */
+    return IC;
 
   }
 

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
 Fri Oct 14 19:30:40 2016
@@ -1034,6 +1034,11 @@ public class PrimitiveAnalysisEngineCont
             
               //       Check for delivery failure. The client may have 
terminated while an input CAS was being processed
             if ( childCasStateEntry.deliveryToClientFailed() ) {
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(), "process",
+                            UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAEE_delivery_to_client_failed_INFO",
+                            new Object[] { getComponentName(), aCasReferenceId 
});
+              }
               clientUnreachable = true;
               if ( 
cmOutstandingCASes.containsKey(childCasStateEntry.getCasReferenceId())) {
                  
cmOutstandingCASes.remove(childCasStateEntry.getCasReferenceId());

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
 Fri Oct 14 19:30:40 2016
@@ -522,7 +522,10 @@ public class ProcessResponseHandler exte
                         getController().getEventListener());
               } else {
                 // Callback to notify that the cache is empty
-                getController().getEventListener().onCacheEmpty();
+                 
+                 // !!!!!!!!!!!!!!! WHY DO WE NEED TO CALL onCacheEmpty() IF 
CAS IS ABORTED?
+                 // !!!!!!!!!!!!!!!!!!!!!! ?????????????????????????????????
+//                getController().getEventListener().onCacheEmpty();
               }
             }
 

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties 
(original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties 
Fri Oct 14 19:30:40 2016
@@ -266,3 +266,6 @@ UIMAEE_service_warmup_start_INFO = Servi
 UIMAEE_service_warmup_success_INFO = Service: {0} Thread: {1} WarmUp Has 
Finished Successfully - Processed: {2} CASes - Time Spent Warming Up: {3} secs- 
Ready For Processing
 UIMAEE_warmup_dropping_cas__FINE = Aggregate Warmup Stage - Dropping CAS:{0} 
Processing took {1}
 UIMAEE_warmup_start_cas__FINE = Aggregate Warmup Stage - Processing CAS id:{0}
+UIMAEE_delivery_to_client_failed_INFO = Service:{0} Unable to Deliver CAS:{1} 
to Client - Dropping CAS
+UIMAEE_unable_to_deliver_msg__INFO=Service:{0} JMS unable to Deliver CAS:{1} - 
Error:{2}
+UIMAEE_force_cas_abort__INFO="Service:{0} Forcing {1} CAS:{1} to Abort
\ No newline at end of file

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 Fri Oct 14 19:30:40 2016
@@ -21,6 +21,8 @@ package org.apache.uima.adapter.jms.clie
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
@@ -41,6 +43,7 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -49,6 +52,8 @@ import javax.jms.ObjectMessage;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.DestinationDoesNotExistException;
+import org.apache.activemq.transport.TransportListener;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMARuntimeException;
 import org.apache.uima.UIMA_IllegalStateException;
@@ -1498,8 +1503,15 @@ public abstract class BaseUIMAAsynchrono
       TextMessage msg = createTextMessage();
       msg.setText("");
       setReleaseCASMessage(msg, casReferenceId);
-      // Create Message Producer for the Destination
-      MessageProducer msgProducer = 
getMessageProducer(freeCASNotificationDestination);
+       MessageProducer msgProducer = null;
+       try {
+        // Create Message Producer for the Destination
+        msgProducer = getMessageProducer(freeCASNotificationDestination);
+               
+       } catch( DestinationDoesNotExistException ee) {
+               
+       }
+
       if (msgProducer != null) {
         try {
           // Send FreeCAS message to a Cas Multiplier
@@ -3136,10 +3148,13 @@ public abstract class BaseUIMAAsynchrono
       //System.out.println("------------------------ stop2? "+stop);
       //  This loop attempts to recover broker connection every 5 seconds and 
ends when all clients 
       //  using this shared object terminate or a connection is recovered
+      boolean log = true;
       while( !stop ) {
-          //System.out.println("------------------------ clientList.size()- 
"+clientList.size());
         if ( clientList.size() == 0 ) {
           break; // no more active clients - break out of connection recovery
+        } else {
+               BaseUIMAAsynchronousEngineCommon_impl c =
+                               clientList.get(0);
         }
         try {
           //  Attempt a new connection to a broker
@@ -3154,8 +3169,22 @@ public abstract class BaseUIMAAsynchrono
           }
           break;
         } catch( Exception e) {
-               e.printStackTrace();
-          synchronized( stateMonitor ) {
+               
+               if ( log ) {
+                if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(), "retryConnectionUntilSuccessfull",
+                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_client_connection_retry__INFO",
+                          new Object[] { brokerURL });
+                }
+                       if ( e instanceof JMSException && 
e.getMessage().endsWith("Connection refused") ) {
+                               log = false;
+                       System.out.println("Uima AS Client:"+e.getMessage()+" 
Retrying every 5 seconds until successfull");
+                    
+                   } else {
+                   e.printStackTrace();
+                   }
+               }
+           synchronized( stateMonitor ) {
             try {
               stateMonitor.wait(5000); // retry every 5 secs
             } catch( InterruptedException ie) {}
@@ -3264,6 +3293,33 @@ public abstract class BaseUIMAAsynchrono
         return false;
       }
   }
+  public static class UimaAsTransportListener implements TransportListener {
+
+       @Override
+       public void onCommand(Object arg0) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       @Override
+       public void onException(IOException arg0) {
+               System.out.println("!!!!!!!!!!!!!!!!!! 
UimaAsTransportListener.onException() - lost connectipon to broker");
+               
+       }
+
+       @Override
+       public void transportInterupted() {
+               // TODO Auto-generated method stub
+               
+       }
+
+       @Override
+       public void transportResumed() {
+               // TODO Auto-generated method stub
+               
+       }
+         
+  }
   public static class UimaASShutdownHook implements Runnable {
     UimaAsynchronousEngine asEngine=null;
     public UimaASShutdownHook( UimaAsynchronousEngine asEngine) {

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
 Fri Oct 14 19:30:40 2016
@@ -174,7 +174,7 @@ public class ClientServiceDelegate exten
                       try {
                         clientUimaAsEngine.handleException(new 
UimaASProcessCasTimeout("Service Not Responding to Ping - 
CAS:"+de.getCasReferenceId(), new UimaASPingTimeout("Forced Timeout on CAS in 
PendingDispatch list. The CAS Has Not Been Dispatched since the Service Appears 
to be Unavailable")), de.getCasReferenceId(), null,cachedRequest, 
!cachedRequest.isSynchronousInvocation(), false);
                       } catch( Exception ex) {
-                        ex.printStackTrace();
+                        //ex.printStackTrace();
                       }
                     }
                     if ( clientUimaAsEngine.running ) {

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1764952&r1=1764951&r2=1764952&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 Fri Oct 14 19:30:40 2016
@@ -244,4 +244,7 @@ UIMAJMS_retrying_jms_connection__WARNING
 UIMAJMS_service_recvd_new_message__FINE = > service recvd CAS RefId: {0}
 UIMAJMS_sent_ack_message__FINE = < service sent ACK for CAS RefId: {0}
 UIMAJMS_received_service_info_FINEST = Received ServiceInfo message from {0}
-UIMAJMS_debug_msg__FINEST={0}
\ No newline at end of file
+UIMAJMS_debug_msg__FINEST={0}
+UIMAJMS_temp_destination_not_available_retrying__INFO=Service:{0} Unable to 
refresh temp destination - retrying in {1} seconds until success ...
+UIMAJMS_temp_destination_available__INFO=Service:{0} succesfully refreshed 
temp destination:{1} - FreeCas Queue:{2}
+UIMAJMS_client_connection_retry__INFO=UIMA-AS Client Unable to Connect to 
Broker:{0} - Retrying Until Success ...
\ No newline at end of file



Reply via email to