Author: cwiklik
Date: Thu Sep  9 13:29:35 2010
New Revision: 995417

URL: http://svn.apache.org/viewvc?rev=995417&view=rev
Log:
UIMA-1867 Modified dispatch() to detect send failures and mark CAS as 
undelivered

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=995417&r1=995416&r2=995417&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 Thu Sep  9 13:29:35 2010
@@ -45,6 +45,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
+import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 import javax.management.ServiceNotFoundException;
 
@@ -522,14 +523,17 @@ public class JmsOutputChannel implements
                   new Object[] { destination });
         }
         endpointConnection.open();
-        brokerConnectionEntry.getConnectionTimer()
-                .setConnectionCreationTimestamp(System.nanoTime());
-        if ( getAnalysisEngineController() instanceof 
AggregateAnalysisEngineController ) {
-          Endpoint masterEndpoint = 
-            ((AggregateAnalysisEngineController) 
getAnalysisEngineController()).lookUpEndpoint(
-                  anEndpoint.getDelegateKey(), false);
-          masterEndpoint.setStatus(Endpoint.OK);
-        }
+        if ( endpointConnection.isOpen()) {
+            brokerConnectionEntry.getConnectionTimer()
+            .setConnectionCreationTimestamp(System.nanoTime());
+            if ( getAnalysisEngineController() instanceof 
AggregateAnalysisEngineController &&
+                       anEndpoint.getDelegateKey() != null ) {
+               Endpoint masterEndpoint = 
+                       ((AggregateAnalysisEngineController) 
getAnalysisEngineController()).lookUpEndpoint(
+                                       anEndpoint.getDelegateKey(), false);
+               masterEndpoint.setStatus(Endpoint.OK);
+            }
+        } 
       }
     }
     return endpointConnection;
@@ -1673,7 +1677,7 @@ public class JmsOutputChannel implements
       // on the delegate that we were unable to send a message to. The 
delegate state is
       // set to FAILED. If there are retries or more CASes to send to this 
delegate the
       // connection will be retried.
-      if (isRequest) {
+      if (isRequest && anEndpoint.getDelegateKey() != null) {
         // Spin recovery thread to handle send error. After the recovery thread
         // is started the current (process) thread goes back to a thread pool 
in
         // ThreadPoolExecutor. The recovery thread can than stop the listener 
and the
@@ -1684,6 +1688,30 @@ public class JmsOutputChannel implements
         Thread t = new 
Thread(Thread.currentThread().getThreadGroup().getParent(), recoveryThread);
         t.start();
       } else {
+         try {
+                 CasStateEntry casStateEntry = getAnalysisEngineController().
+                               
getLocalCache().lookupEntry(entry.getCasReferenceId());
+                 casStateEntry.setDeliveryToClientFailed();   // Mark the CAS, 
so that later we know that the delivery to client failed
+                 if ( anEndpoint != null ) {
+                                 // Add the reply destination (temp queue) to 
a dead client map
+                 Object clientDestination = anEndpoint.getDestination();
+                         if ( clientDestination != null && clientDestination 
instanceof TemporaryQueue ) {
+                                 if ( !getAnalysisEngineController().
+                                       
getDeadClientMap().containsKey(clientDestination.toString())) {
+                         getAnalysisEngineController().
+                                       getDeadClientMap().
+                                               
put(clientDestination.toString(),clientDestination.toString());
+                                 }
+                         }
+                 }
+                 
+         } catch( Exception e ) {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                       "dispatch", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                       "UIMAJMS_exception__WARNING", e);
+         }
+         
+         
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(
                   Level.INFO,
@@ -2542,15 +2570,15 @@ public class JmsOutputChannel implements
           
ic.destroyListener(delegate.getEndpoint().getDestination().toString(), endpoint
                   .getDelegateKey());
         }
+        // Setup error context and handle failure in the error handler
+        ErrorContext errorContext = new ErrorContext();
+        errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+        errorContext.add(AsynchAEMessage.CasReference, 
entry.getCasReferenceId());
+        errorContext.add(AsynchAEMessage.Endpoint, endpoint);
+        errorContext.handleSilently(true); // dont dump exception to the log
+        // Failure on send treat as timeout
+        delegate.handleError(new MessageTimeoutException(), errorContext);
       }
-      // Setup error context and handle failure in the error handler
-      ErrorContext errorContext = new ErrorContext();
-      errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
-      errorContext.add(AsynchAEMessage.CasReference, 
entry.getCasReferenceId());
-      errorContext.add(AsynchAEMessage.Endpoint, endpoint);
-      errorContext.handleSilently(true); // dont dump exception to the log
-      // Failure on send treat as timeout
-      delegate.handleError(new MessageTimeoutException(), errorContext);
 
     }
   }


Reply via email to