Author: cwiklik
Date: Wed Feb  9 19:45:32 2011
New Revision: 1069054

URL: http://svn.apache.org/viewvc?rev=1069054&view=rev
Log:
UIMA-2038 Modified to support clean shutdown

Modified:
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1069054&r1=1069053&r2=1069054&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 Wed Feb  9 19:45:32 2011
@@ -106,6 +106,7 @@ public abstract class BaseAnalysisEngine
   private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
   private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
   public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, 
FAILED };
+  public static final boolean NO_RECOVERY = true;
   
   protected ServiceState currentState = ServiceState.INITIALIZING;
   
@@ -246,7 +247,6 @@ public abstract class BaseAnalysisEngine
   private static final UimaAsVersion uimaAsVersion = new UimaAsVersion();
   // Holds destination names of clients known to be dead
   protected ConcurrentHashMap<String,String> deadClientDestinationMap = new 
ConcurrentHashMap<String, String>();
-
   
   public BaseAnalysisEngineController() {
 
@@ -1880,6 +1880,8 @@ public abstract class BaseAnalysisEngine
       // then wait until all CASes still in play are processed. When all CASes 
are processed
       // we proceed with the shutdown of delegates and finally of the top 
level service.
       if (isTopLevelComponent()) {
+          getInputChannel().setTerminating();
+
         // Stops all input channels of this service, but keep temp reply queue 
input channels open
         // to process replies.
         stopInputChannels(InputChannel.InputChannels);
@@ -1916,6 +1918,7 @@ public abstract class BaseAnalysisEngine
                   "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_onEmpty_callback_received__INFO", new Object[] { 
getComponentName() });
         }
+        getInputChannel().terminate();
         stop();
       }
     }
@@ -1958,8 +1961,13 @@ public abstract class BaseAnalysisEngine
     } else if (!isStopped()) {
       stopDelegateTimers();
       getOutputChannel().cancelTimers();
+      InputChannel iC = getInputChannel(endpointName);
+      if ( iC != null) {
+          iC.setTerminating();
+      }
       // Stop the inflow of new input CASes
       stopInputChannel();
+      iC.terminate();
       stopCasMultipliers();
       stopTransportLayer();
       if (cause != null && aCasReferenceId != null) {
@@ -2103,54 +2111,61 @@ public abstract class BaseAnalysisEngine
       }
     }
   }
-
+  private void setInputChannelForNoRecovery() {
+         if ( inputChannelMap.size() > 0 ) {
+                 InputChannel iC = getInputChannel();
+                 iC.setTerminating();
+         }
+  }
+  protected void stopInputChannels( int channelsToStop) {   //, boolean 
norecovery) {
+           InputChannel iC = null;
+           setInputChannelForNoRecovery();
+           Iterator it = inputChannelMap.keySet().iterator();
+           int i = 1;
+           while (it.hasNext()) {
+             try {
+               String key = (String) it.next();
+               if (key != null && key.trim().length() > 0) {
+                 iC = (InputChannel) inputChannelMap.get(key);
+                 if (iC != null) {
+                   if (channelsToStop == InputChannel.InputChannels && 
iC.getServiceInfo() != null
+                           && 
iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
+                     // This closes both listeners on the input queue: Process 
Listener and GetMeta
+                     // Listener
+                       iC.stop(channelsToStop);
+                     return; // Just closed input channels. Keep the others 
open
+                   }
+                   iC.stop(channelsToStop);
+                 }
+               }
+               i++;
+             } catch (Exception e) {
+               if (iC != null) {
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
getClass().getName(),
+                           "stopInputChannels", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                           "UIMAEE_unable_to_stop_inputchannel__INFO",
+                           new Object[] { getComponentName(), 
iC.getInputQueueName() });
+                 }
+               } else {
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                           "stopInputChannels", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                           "UIMAEE_service_exception_WARNING", 
getComponentName());
+                   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
+                           "stopInputChannels", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                           "UIMAEE_exception__WARNING", e);
+                 }
+               }
+             }
+           }
+         
+  }
   /**
    * Aggregates have more than one Listener channel. This method stops all 
configured input channels
    * this service is configured with.
    * 
    */
-  protected void stopInputChannels(int channelsToStop) {
-    InputChannel iC = null;
-    Iterator it = inputChannelMap.keySet().iterator();
-    int i = 1;
-    while (it.hasNext()) {
-      try {
-        String key = (String) it.next();
-        if (key != null && key.trim().length() > 0) {
-          iC = (InputChannel) inputChannelMap.get(key);
-          if (iC != null) {
-            if (channelsToStop == InputChannel.InputChannels && 
iC.getServiceInfo() != null
-                    && 
iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
-              // This closes both listeners on the input queue: Process 
Listener and GetMeta
-              // Listener
-              iC.stop(channelsToStop);
-              return; // Just closed input channels. Keep the others open
-            }
-            iC.stop(channelsToStop);
-          }
-        }
-        i++;
-      } catch (Exception e) {
-        if (iC != null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
getClass().getName(),
-                    "stopInputChannels", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_unable_to_stop_inputchannel__INFO",
-                    new Object[] { getComponentName(), iC.getInputQueueName() 
});
-          }
-        } else {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                    "stopInputChannels", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_service_exception_WARNING", getComponentName());
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
-                    "stopInputChannels", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_exception__WARNING", e);
-          }
-        }
-      }
-    }
-  }
 
   public AnalysisEngineController getCasMultiplierController(String cmKey) {
     List<AnalysisEngineController> colocatedControllerList = 


Reply via email to