Author: cwiklik
Date: Wed Feb  9 20:54:47 2011
New Revision: 1069089

URL: http://svn.apache.org/viewvc?rev=1069089&view=rev
Log:
UIMA-2038 Modified to support clean shutdown

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1069089&r1=1069088&r2=1069089&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
 Wed Feb  9 20:54:47 2011
@@ -19,16 +19,17 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
-import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -37,7 +38,6 @@ import javax.jms.TemporaryQueue;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
@@ -45,9 +45,9 @@ import org.apache.uima.aae.UIMAEE_Consta
 import org.apache.uima.aae.UimaAsThreadFactory;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import 
org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
-import 
org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.Threshold;
@@ -104,10 +104,20 @@ public class UimaDefaultMessageListenerC
 
   private volatile boolean awaitingShutdown = false;
   
-  private boolean brokerWithDaemonThreads = false;
+  //  When set to true, this flag prevents spring from using 
refreshUntilSuccessful
+  //  logic which attempts to recover the connection. This flag is set to true 
during the
+  //  service shutdown
+  public static volatile boolean terminating;
+
+  private ThreadPoolExecutor threadPoolExecutor = null;
+  
+  private boolean pluginThreadPool;
   
   public UimaDefaultMessageListenerContainer() {
     super();
+    // reset global static. This only effects unit testing as services are 
deployed 
+    // in the same process.
+    terminating = false;
     UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
     __listenerRef = this;
     setRecoveryInterval(5);
@@ -121,7 +131,24 @@ public class UimaDefaultMessageListenerC
     this();
     this.freeCasQueueListener = freeCasQueueListener;
   }
+  /**
+   * Overriden Spring's method that tries to recover from lost connection. We 
dont 
+   * want to recover when the service is stopping.
+   */
+  protected void refreshConnectionUntilSuccessful() {
+         if ( !terminating ) {
+                 super.refreshConnectionUntilSuccessful();
+         }
+  }
+  protected void recoverAfterListenerSetupFailure() {
+         if ( !terminating ) {
+                 super.recoverAfterListenerSetupFailure();
+         }
+  }
 
+  public void setTerminating() {
+    terminating = true;
+  }
   public void setController(AnalysisEngineController aController) {
     controller = aController;
   }
@@ -389,11 +416,8 @@ public class UimaDefaultMessageListenerC
    */
   protected void handleListenerSetupFailure(Throwable t, boolean 
alreadyHandled) {
     // If shutdown already, nothing to do
-    if (awaitingShutdown) {
-      return;
-    }
-    // If controller is stopping not need to recover the connection
-    if (controller != null && controller.isStopped()) {
+           // If controller is stopping no need to recover the connection
+    if (awaitingShutdown || terminating || (controller != null && 
controller.isStopped()) ) {
       return;
     }
     if ( controller != null ) {
@@ -520,8 +544,6 @@ public class UimaDefaultMessageListenerC
     try {
 
       injectConnectionFactory();
-      initializeTaskExecutor();
-      injectTaskExecutor();
       super.initialize();
     } catch (Exception e) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -561,14 +583,15 @@ public class UimaDefaultMessageListenerC
       super.setMessageListener(messageListener);
     }
   }
-
+  public void afterPropertiesSet() {
+    afterPropertiesSet(true);
+  }
   /**
    * Called by Spring and some Uima AS components when all properties have 
been set. This method
    * spins a thread in which the listener is initialized.
    */
-  public void afterPropertiesSet() {
+  public void afterPropertiesSet(final boolean propagate) {
     if (endpoint != null) {
-
                        // Override the prefetch size. The dd2spring always 
sets this to 1 which 
                        // may effect the throughput of a service. Change the 
prefetch size to
                        // number of consumer threads defined in DD.
@@ -590,7 +613,8 @@ public class UimaDefaultMessageListenerC
       super.setConcurrentConsumers(1);
       if (cc > 1) {
         try {
-          concurrentListener = new ConcurrentMessageListener(cc, ml, 
getDestinationName());
+          String prefix = endpoint.getDelegateKey()+" Reply Thread";
+          concurrentListener = new ConcurrentMessageListener(cc, ml, 
getDestinationName(), threadGroup,prefix);
           super.setMessageListener(concurrentListener);
         } catch (Exception e) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -607,11 +631,11 @@ public class UimaDefaultMessageListenerC
           return;
         }
       } else {
-        super.setMessageListener(ml);
+        pluginThreadPool = true;
       }
     } else {
-      super.setMessageListener(ml);
       super.setConcurrentConsumers(cc);
+      pluginThreadPool = true;
     }
     Thread t = new Thread(threadGroup, new Runnable() {
       public void run() {
@@ -643,14 +667,23 @@ public class UimaDefaultMessageListenerC
           }
           // Plug in connection Factory to Spring's Listener
           __listenerRef.injectConnectionFactory();
+          
+          if ( pluginThreadPool ) {
+            setUimaASThreadPoolExecutor(cc);
+          }
+          
           // Initialize the TaskExecutor. This call injects a custom Thread 
Pool into the
           // TaskExecutor provided in the spring xml. The custom thread pool 
initializes
           // an instance of AE in a dedicated thread
           initializeTaskExecutor();
-          // Plug in TaskExecutor to Spring's Listener
-          __listenerRef.injectTaskExecutor();
-          // Notify Spring Listener that all properties are ready
-          __listenerRef.allPropertiesSet();
+          if ( threadPoolExecutor == null ) {
+              // Plug in TaskExecutor to Spring's Listener
+              __listenerRef.injectTaskExecutor();
+          }
+          if ( propagate ) {
+            // Notify Spring Listener that all properties are ready
+            __listenerRef.allPropertiesSet();
+          }
           if (isActiveMQDestination() && destination != null) {
             destinationName = ((ActiveMQDestination) 
destination).getPhysicalName();
           }
@@ -686,6 +719,9 @@ public class UimaDefaultMessageListenerC
                   "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAJMS_jms_listener_failed_WARNING",
                   new Object[] { destination, getBrokerUrl(), e });
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                  "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_exception__WARNING", e);
         }
       }
     });
@@ -747,7 +783,9 @@ public class UimaDefaultMessageListenerC
     ((TempDestinationResolver) resolver).setListener(this);
     super.setDestinationResolver(resolver);
   }
-
+  /**
+   * Closes shares connection to a broker
+  **/
   public void closeConnection() throws Exception {
     try {
       setRecoveryInterval(0);
@@ -872,11 +910,10 @@ public class UimaDefaultMessageListenerC
    * 
    */
   public void destroy() {
+         
     if (awaitingShutdown) {
       return;
     }
-    awaitingShutdown = true;
-    
     // Spin a thread that will wait until all threads complete. This is needed 
to avoid
     // memory leak caused by the fact that we did not wait to collect the 
threads
     Thread threadGroupDestroyer = new 
Thread(threadGroup.getParent().getParent(),
@@ -884,32 +921,23 @@ public class UimaDefaultMessageListenerC
       public void run() {
         try {
                if ( !__listenerRef.awaitingShutdown && 
__listenerRef.isRunning() ) {
-                // stop Spring listener and ActiveMQ threads
+                   awaitingShutdown = true;
                 __listenerRef.stop();
-                __listenerRef.closeConnection();
+                // If using non-default TaskExecutor, stop its threads
+                if (taskExecutor != null && taskExecutor instanceof 
ThreadPoolTaskExecutor) {
+                    ((ThreadPoolTaskExecutor) 
taskExecutor).getThreadPoolExecutor().shutdownNow();
+                } else if (concurrentListener != null) {
+                  // Stop internal Executor
+                  concurrentListener.stop();
+                } else if ( threadPoolExecutor != null ) {
+                       threadPoolExecutor.shutdownNow();
+                }
+                __listenerRef.shutdown();
                }
         } catch (Exception e) {
+               e.printStackTrace();
         }
-        // If using non-default TaskExecutor, stop its threads
-        if (taskExecutor != null && taskExecutor instanceof 
ThreadPoolTaskExecutor) {
-          ((ThreadPoolTaskExecutor) 
taskExecutor).getThreadPoolExecutor().shutdown();
-          // Since the calling thread may be one of those managed by the 
executor allow
-          // for one open thread when checking active thread count.
-          while (((ThreadPoolTaskExecutor) 
taskExecutor).getThreadPoolExecutor().getActiveCount() > 1
-                  && !((ThreadPoolTaskExecutor) 
taskExecutor).getThreadPoolExecutor()
-                          .isTerminated()) {
-            try {
-              ((ThreadPoolTaskExecutor) 
taskExecutor).getThreadPoolExecutor().awaitTermination(200,
-                      TimeUnit.MILLISECONDS);
-            } catch (Exception e) {
-            }
-          }
-        } else if (concurrentListener != null) {
-          // Stop internal Executor
-          concurrentListener.stop();
-        }
-        // Shutdown the listener
-        __listenerRef.shutdown();
+
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
           threadGroup.getParent().list();
         }
@@ -957,6 +985,7 @@ public class UimaDefaultMessageListenerC
     
     
   }
+  
   private boolean isAmqThread(Thread t) {
          String tName = t.getName();
          // The following is necessary to account for the AMQ threads
@@ -969,6 +998,34 @@ public class UimaDefaultMessageListenerC
          }
          return true;
   }
+  private void setUimaASThreadPoolExecutor(int consumentCount) throws 
Exception{
+    super.setMessageListener(ml);
+    // create task executor with custom thread pool for:
+    // 1) GetMeta request processing
+    // 2) ReleaseCAS request
+    if ( taskExecutor == null ) {
+      UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
+      tf.setDaemon(true);
+      if ( isFreeCasQueueListener()) {
+        tf.setThreadNamePrefix(controller.getComponentName()+" - 
FreeCASRequest Thread");
+      } else if ( isGetMetaListener()  ) {
+        tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+      } else if ( getDestination() != null && getMessageSelector() != null ) {
+        tf.setThreadNamePrefix(controller.getComponentName() + " Process 
Thread");
+      } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
+        tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+      } else { 
+        throw new Exception("Unknown Context Detected in 
setUimaASThreadPoolExecutor()");
+      }
+      ExecutorService es = Executors.newFixedThreadPool(consumentCount,tf);
+      if ( es instanceof ThreadPoolExecutor ) {
+          threadPoolExecutor = (ThreadPoolExecutor)es;
+          super.setTaskExecutor(es);
+      }
+    }
+  }
+
+  
   /**
    * Called by Spring to inject TaskExecutor
    */
@@ -1002,6 +1059,7 @@ public class UimaDefaultMessageListenerC
       // PrimitiveController so that every thread can call it to initialize
       // the next available instance of a AE.
       tf = new UimaAsThreadFactory(threadGroup, 
(PrimitiveAnalysisEngineController) controller);
+      ((UimaAsThreadFactory)tf).setDaemon(true);
       // This ThreadExecutor will use custom thread factory instead of defult 
one
       ((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
       // Initialize the thread pool
@@ -1014,6 +1072,10 @@ public class UimaDefaultMessageListenerC
         controller.changeState(ServiceState.RUNNING);
       }
     }
+    
+    if ( threadPoolExecutor != null ) {
+       threadPoolExecutor.prestartAllCoreThreads();
+    }
   }
 
   public void stop() throws JmsException {


Reply via email to