Author: cwiklik
Date: Wed Mar 27 21:26:38 2013
New Revision: 1461853

URL: http://svn.apache.org/r1461853
Log:
UIMA-2776 Added jms session management 

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1461853&r1=1461852&r2=1461853&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 Wed Mar 27 21:26:38 2013
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -69,7 +70,13 @@ import org.apache.uima.util.Level;
 public class JmsEndpointConnection_impl implements ConsumerListener {
   private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
 
-  private Destination destination;
+  // timestamp containing time when the last message was dispatched from this 
dispatcher to 
+  // jms destination. This is updated every time a message is dispatched to a 
queue. 
+  // At fixed intervals a cleanup thread wakes up and checks for unused 
dispatchers by
+  // comparing value in lastDispatchTimestamp to a max allowed which by 
default is 5 minutes.
+  protected AtomicLong lastDispatchTimestamp = new AtomicLong(0);
+
+  protected Destination destination;
 
   protected Session producerSession;
 
@@ -83,7 +90,7 @@ public class JmsEndpointConnection_impl 
 
   private String endpointName;
 
-  private Endpoint delegateEndpoint;
+  protected Endpoint delegateEndpoint;
 
   private volatile boolean retryEnabled;
 
@@ -95,7 +102,7 @@ public class JmsEndpointConnection_impl 
 
   private Object semaphore = new Object();
 
-  private boolean isReplyEndpoint;
+  protected boolean isReplyEndpoint;
 
   private volatile boolean failed = false;
 
@@ -183,7 +190,7 @@ public class JmsEndpointConnection_impl 
                                    "open",
                                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                                    
"UIMAJMS_override_connection_to_endpoint__FINE",
-                                   new Object[] { aComponentName, 
getEndpoint(),
+                                   new Object[] {  aComponentName, 
getEndpoint(),
                                      ((JmsOutputChannel) 
aController.getOutputChannel()).getServerURI() });
                          }
                        }
@@ -193,6 +200,7 @@ public class JmsEndpointConnection_impl 
                          //  Check connection status and create a new one (if 
necessary) as an atomic operation
                          try {
                            connectionSemaphore.acquire();
+                           
                            if (connectionClosedOrFailed(brokerDestinations)) {
                              // Create one shared connection per unique 
brokerURL.
                              if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -209,6 +217,7 @@ public class JmsEndpointConnection_impl 
                                  //  Ignore exceptions on a close of a bad 
connection
                                }
                              }
+                             System.out.println("---------- Opening New Broker 
Connection ---------------");
                              ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerUri);
                              //  Create shared jms connection to a broker
                              conn = factory.createConnection();
@@ -240,6 +249,8 @@ public class JmsEndpointConnection_impl 
                          
                          connectionCreationTimestamp = System.nanoTime();
                          failed = false;
+                       } else {
+                               System.out.println("...... Reusing Existing 
Broker Connetion");
                        }
                        Connection conn = brokerDestinations.getConnection();
                        if (failed) {
@@ -595,9 +606,12 @@ public class JmsEndpointConnection_impl 
       // restarted. The main purpose of the timer is to close connections
       // that are not used.
       if (startTimer) {
-        
brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
-                delegateEndpoint);
+//        
brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
+//                delegateEndpoint);
       }
+      // record the time when this dispatches sent a message. This time will 
be used
+      // to find inactive sessions.
+         lastDispatchTimestamp.set(System.currentTimeMillis());
       
       // Succeeded sending the CAS
       return true;

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1461853&r1=1461852&r2=1461853&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 Wed Mar 27 21:26:38 2013
@@ -84,7 +84,7 @@ public class JmsOutputChannel implements
 
   private static final Class CLASS_NAME = JmsOutputChannel.class;
 
-  private static final long INACTIVITY_TIMEOUT = 1800000; // 30 minutes in 
term of millis
+  private static final long INACTIVITY_TIMEOUT = 300000; // 5 minutes in term 
of millis
 
   private CountDownLatch controllerLatch = new CountDownLatch(1);
 
@@ -343,13 +343,13 @@ public class JmsOutputChannel implements
     Connection conn = brokerConnectionEntry.getConnection();
     try {
        if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
-         brokerConnectionEntry.getConnection().stop();
-         brokerConnectionEntry.getConnection().close();
-         brokerConnectionEntry.setConnection(null);
-         for (Entry<Object, JmsEndpointConnection_impl> endpoints : 
brokerConnectionEntry.endpointMap
-                .entrySet()) {
-           endpoints.getValue().close(); // close session and producer
-         }
+           for (Entry<Object, JmsEndpointConnection_impl> endpoints : 
brokerConnectionEntry.endpointMap
+                   .entrySet()) {
+              endpoints.getValue().close(); // close session and producer
+           }
+           brokerConnectionEntry.getConnection().stop();
+           brokerConnectionEntry.getConnection().close();
+           brokerConnectionEntry.setConnection(null);
        }
     } catch (Exception e) {
       // Ignore this for now. Attempting to close connection that has been 
closed
@@ -395,17 +395,18 @@ public class JmsOutputChannel implements
     ConnectionTimer connectionTimer = new 
ConnectionTimer(brokerConnectionEntry);
     connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
     brokerConnectionEntry.setConnectionTimer(connectionTimer);
+    brokerConnectionEntry.setBrokerURL(brokerURL);
     return brokerConnectionEntry;
   }
   /**
    * Returns {@link JmsEndpointConnection_impl} instance bound to a 
destination defined in the
    * {@link Endpoint} The endpoint identifies the destination that should 
receive the message. This
-   * method refrences a cache that stores active connections. Active 
connections are those that are
+   * method references a cache that stores active connections. Active 
connections are those that are
    * fully bound and being used for communication. The key to locate the entry 
in the connection
    * cache is the queue name + broker URI. This uniquely identifies the 
destination. If an entry
    * does not exist in the cache, this routine will create a new connection, 
initialize it, and
-   * cache it for future use. The cache is purely for optimization, to prevent 
openinig a connection
-   * for every message which is a costly operation. Instead the connection is 
open, cached and
+   * cache it for future use. The cache is purely for optimization, to prevent 
opening a connection
+   * for every message which is a costly operation. Instead the connection is 
opened, cached and
    * reused. The {@link JmsEndpointConnection_impl} instance is stored in the 
cache, and uses a
    * timer to make sure stale connection are removed. If a connection is not 
used in a given time
    * interval, the connection is considered stale and is dropped from the 
cache.
@@ -435,8 +436,13 @@ public class JmsOutputChannel implements
       brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : 
anEndpoint.getServerURI();
       
     }
+    String key = getLookupKey(anEndpoint);
+    String destination = getDestinationName(anEndpoint);
+
     // First get a Map containing destinations managed by a broker provided by 
the client
     BrokerConnectionEntry brokerConnectionEntry = null;
+    boolean startInactivityReaperTimer = false;
+
     if (connectionMap.containsKey(brokerConnectionURL)) {
       brokerConnectionEntry = (BrokerConnectionEntry) 
connectionMap.get(brokerConnectionURL);
       // Findbugs thinks that the above may return null, perhaps due to a race 
condition. Add
@@ -448,14 +454,25 @@ public class JmsOutputChannel implements
       } 
       brokerConnectionEntry.setBrokerURL(brokerConnectionURL);
       if ( 
JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
+         brokerConnectionEntry.getConnectionTimer().cancelTimer();
         invalidateConnectionAndEndpoints(brokerConnectionEntry);
         brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
+        startInactivityReaperTimer = true;
       }
     } else {
       brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
+      System.out.println("---------------- New Broker "+brokerConnectionURL);
+//      long replyQueueInactivityTimeout = getInactivityTimeout(destination, 
brokerConnectionURL);
+      startInactivityReaperTimer = true;
+    }
+    if ( startInactivityReaperTimer ) {
+        long inactivityTimeout = getInactivityTimeout(destination, 
brokerConnectionURL);
+        
brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(inactivityTimeout);
+        // Start the FixedRate timer which wakes up at regular intervals 
defined by 'replyQueueInactivityTimeout'.
+        // The purpose is to find inactive jms sessions. All sessions found 
the be inactive will be
+        // closed.
+        
brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
     }
-    String key = getLookupKey(anEndpoint);
-    String destination = getDestinationName(anEndpoint);
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(
               Level.FINE,
@@ -524,8 +541,12 @@ public class JmsOutputChannel implements
       endpointConnection = new 
JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
               getAnalysisEngineController());
       brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
-      long replyQueueInactivityTimeout = getInactivityTimeout(destination, 
brokerConnectionURL);
-      
brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
+//      long replyQueueInactivityTimeout = getInactivityTimeout(destination, 
brokerConnectionURL);
+//      
brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
+//      // Start the FixedRate timer which wakes up at regular intervals 
defined by 'replyQueueInactivityTimeout'.
+//      // The purpose is to find inactive jms sessions. All sessions found 
the be inactive will be
+//      // closed.
+//      
brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
 
       // Connection is not in the cache, create a new connection, initialize 
it and cache it
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -588,6 +609,8 @@ public class JmsOutputChannel implements
         } 
       }
     }
+    
+    //System.out.println("+++++++++++++++++++++ ConnectionMap 
Size:"+connectionMap.size());
     return endpointConnection;
   }
 
@@ -1854,12 +1877,106 @@ public class JmsOutputChannel implements
       connectionCreationTimestamp = aConnectionCreationTimestamp;
     }
 
-    public void startTimer(long aConnectionCreationTimestamp, final Endpoint 
endpoint) {
-      startTimer(aConnectionCreationTimestamp, endpoint, inactivityTimeout, 
componentName);
+//    public void startTimer(long aConnectionCreationTimestamp, final Endpoint 
endpoint) {
+//      startTimer(aConnectionCreationTimestamp, endpoint, inactivityTimeout, 
componentName);
+//    }
+
+    public synchronized void startSessionReaperTimer( String aComponentName) {
+      //Date timeToRun = new Date(System.currentTimeMillis() + 
inactivityTimeout);
+      if (timer != null) {
+        timer.cancel();
+      }
+      if (controller != null) {
+        timer = new Timer("Controller:" + aComponentName + ":Session Reaper 
TimerThread-:"
+                + System.nanoTime());
+      } else {
+        timer = new Timer("Session Reaper TimerThread-:" + System.nanoTime());
+      }
+         System.out.println("Controller:"+controller.getComponentName()+" 
Starting Session Cleanup Thread with Expiration Time:"+(inactivityTimeout/1000) 
+" secs"+" Broker:"+brokerDestinations.getBrokerURL());
+      timer.scheduleAtFixedRate(new TimerTask() {
+          public void run() {
+                // System.out.println("Hashcode:"+hashCode()+" 
Controller:"+controller.getComponentName()+" Session Cleanup Thread Woke Up 
After "+(inactivityTimeout/1000) +" secs of Sleep to Clean Up Unused JMS 
Sessions");
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(
+                        Level.FINE,
+                        CLASS_NAME.getName(),
+                        "startTimer",
+                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAJMS_inactivity_timer_expired__FINE",
+                        new Object[] { Thread.currentThread().getId(), 
componentName,
+                            inactivityTimeout, 
brokerDestinations.getBrokerURL() });
+              }
+              try {
+                  if (brokerDestinations.getConnection() != null
+                          && !((ActiveMQConnection) 
brokerDestinations.getConnection()).isClosed()) {
+                    try {
+                       Iterator<Entry<Object, JmsEndpointConnection_impl>> it 
= 
+                                       
brokerDestinations.endpointMap.entrySet().iterator();
+                       while( it.hasNext() ) {
+                               Entry<Object, JmsEndpointConnection_impl> value 
= it.next();
+                               long lastDispatchTime = 
value.getValue().lastDispatchTimestamp.get();
+                               String dest = (value.getValue().isReplyEndpoint 
)? 
+                                               
value.getValue().delegateEndpoint.getDestination().toString(): 
+                                               
value.getValue().destination.toString();
+                       //      
System.out.println("\t\tController:"+controller.getComponentName()+" Session 
Iterator: Session:"+dest);
+                               if ( (System.currentTimeMillis() - 
lastDispatchTime) >= inactivityTimeout ) {
+                                       value.getValue().close();
+                                       
System.out.println("Controller:"+controller.getComponentName()+" Closing 
Session for Destination:"+dest);
+
+                                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                    UIMAFramework.getLogger(CLASS_NAME).logrb(
+                                            Level.FINE,
+                                            CLASS_NAME.getName(),
+                                            "startTimer",
+                                            
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                            
"UIMAJMS_removed_expired_session__FINE",
+                                            new Object[] { 
Thread.currentThread().getId(), componentName,
+                                                inactivityTimeout, 
brokerDestinations.getBrokerURL(),dest });
+                                  }
+                                       it.remove();
+                               }
+                       }
+                    } catch (Exception e) {
+                       e.printStackTrace();
+                      // Ignore this for now. Attempting to close connection 
that has been closed
+                      // Ignore we are shutting down
+                    } finally {
+                      try {
+                         if ( brokerDestinations.endpointMap.isEmpty() ) {
+                                 brokerDestinations.getConnection().stop();
+                              brokerDestinations.getConnection().close();
+                              brokerDestinations.setConnection(null);
+                              brokerDestinations.endpointMap.clear();
+                              connectionMap.remove(brokerDestinations);
+                              
+                              if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                  UIMAFramework.getLogger(CLASS_NAME).logrb(
+                                          Level.FINE,
+                                          CLASS_NAME.getName(),
+                                          "startTimer",
+                                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                          
"UIMAJMS_closing_broker_connection__FINE",
+                                          new Object[] { 
Thread.currentThread().getId(), componentName,
+                                              
brokerDestinations.getBrokerURL(),inactivityTimeout  });
+                                }
+                         }
+                      } catch( Exception e) {
+                      }
+                    }
+                  }
+//                  brokerDestinations.setConnection(null);
+                } catch (Exception e) {
+                } 
+//              finally {
+//                  removeDestinationFromManagedList(brokerDestinations, 
endpoint);
+//                }
+          }
+      }, inactivityTimeout, inactivityTimeout);
+      
     }
 
     public synchronized void startTimer(long aConnectionCreationTimestamp, 
final Endpoint endpoint,
-            long currentInactivityTimeout, String aComponentName) {
+            final long currentInactivityTimeout, String aComponentName) {
       final long cachedConnectionCreationTimestamp = 
aConnectionCreationTimestamp;
       Date timeToRun = new Date(System.currentTimeMillis() + 
currentInactivityTimeout);
       if (timer != null) {
@@ -1871,6 +1988,7 @@ public class JmsOutputChannel implements
       } else {
         timer = new Timer("Reply TimerThread-:" + endpoint + ":" + 
System.nanoTime());
       }
+     
       timer.schedule(new TimerTask() {
         public void run() {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -1920,8 +2038,8 @@ public class JmsOutputChannel implements
           cancelTimer();
         }
       }, timeToRun);
-    }
 
+    }
     private void removeDestinationFromManagedList(BrokerConnectionEntry 
brokerDestinations,
             Endpoint endpoint) {
       //  If this is a reply to a client, use the same broker URL that manages 
this service input queue.


Reply via email to