Author: cwiklik
Date: Thu Jul  4 12:41:00 2013
New Revision: 1499734

URL: http://svn.apache.org/r1499734
Log:
UIMA-2776 Fixed session cleanup

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1499734&r1=1499733&r2=1499734&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 Thu Jul  4 12:41:00 2013
@@ -25,8 +25,10 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.ConnectException;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Timer;
@@ -449,7 +451,6 @@ public class JmsOutputChannel implements
     // First get a Map containing destinations managed by a broker provided by 
the client
     BrokerConnectionEntry brokerConnectionEntry = null;
     boolean startInactivityReaperTimer = false;
-
     if (connectionMap.containsKey(brokerConnectionURL)) {
       brokerConnectionEntry = (BrokerConnectionEntry) 
connectionMap.get(brokerConnectionURL);
       // Findbugs thinks that the above may return null, perhaps due to a race 
condition. Add
@@ -465,10 +466,12 @@ public class JmsOutputChannel implements
         invalidateConnectionAndEndpoints(brokerConnectionEntry);
         brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
         startInactivityReaperTimer = true;
+//        System.out.println(">>>>>> Connection Map 
Size:"+connectionMap.size());
       }
     } else {
       brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
       System.out.println("---------------- New Broker "+brokerConnectionURL);
+//      System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
 //      long replyQueueInactivityTimeout = getInactivityTimeout(destination, 
brokerConnectionURL);
       startInactivityReaperTimer = true;
     }
@@ -480,6 +483,7 @@ public class JmsOutputChannel implements
         // closed.
         
brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
     }
+    
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(
               Level.FINE,
@@ -1856,6 +1860,7 @@ public class JmsOutputChannel implements
 
     public void addEndpointConnection(Object key, JmsEndpointConnection_impl 
endpointConnection) {
       endpointMap.put(key, endpointConnection);
+//      System.out.println("----------- Endpoint Map for Endpoint:"+key+" 
Has:"+endpointMap.size()+" Entries");
     }
 
     public JmsEndpointConnection_impl getEndpointConnection(Object key) {
@@ -1884,7 +1889,7 @@ public class JmsOutputChannel implements
 
     private String componentName = "";
 
-    private ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+    private ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
     
     public ConnectionTimer(BrokerConnectionEntry aBrokerDestinations) {
       brokerDestinations = aBrokerDestinations;
@@ -1907,7 +1912,7 @@ public class JmsOutputChannel implements
 
     public void stopSessionReaperTimer() {
       if ( scheduler != null ) {
-        scheduler.shutdownNow();
+         scheduler.shutdownNow();
       }
     }
     /**
@@ -1917,11 +1922,11 @@ public class JmsOutputChannel implements
      * @param aComponentName
      */
     public synchronized void startSessionReaperTimer( String aComponentName) {
-        
         //     Fire the runnable at fixed intervals equal to inactivityTimeout 
value
-        scheduler.scheduleAtFixedRate(new Runnable(){
+        scheduler.scheduleWithFixedDelay(new Runnable(){
             public void run() {
-                  //System.out.println("Hashcode:"+hashCode()+" 
Controller:"+controller.getComponentName()+" Session Cleanup Thread Woke Up 
After "+inactivityTimeout +" minutes of Sleep to Clean Up Unused JMS Sessions");
+                System.out.println("SessionReaper Thread Woke Up 
After:"+inactivityTimeout*60*1000+" Millis");
+                long inactivityThreshold = inactivityTimeout*60*1000;  // 
normalize into millis
                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                   UIMAFramework.getLogger(CLASS_NAME).logrb(
                           Level.INFO,
@@ -1944,20 +1949,22 @@ public class JmsOutputChannel implements
                        while( it.hasNext() ) {
                                Entry<Object, JmsEndpointConnection_impl> value 
= it.next();
                                long lastDispatchTime = 
value.getValue().lastDispatchTimestamp.get();
-                               if ( lastDispatchTime > 0 && 
(System.currentTimeMillis() - lastDispatchTime) >= inactivityTimeout ) {
+//                                     System.out.println("-------- 
lastDispatchTime:"+lastDispatchTime+" Delta:"+(System.currentTimeMillis() - 
lastDispatchTime)+" InactivityThreshold:"+inactivityThreshold);
+                               if ( lastDispatchTime > 0 && 
(System.currentTimeMillis() - lastDispatchTime) >= inactivityThreshold ) {
                                        value.getValue().close();  // close the 
jms session
                                        it.remove();
-/*
-                                       System.out.println("-------- Closing 
Session for Destination:"+value.getValue().delegateEndpoint.getDestination());
-                                UIMAFramework.getLogger(CLASS_NAME).logrb(
-                                        Level.INFO,
-                                        CLASS_NAME.getName(),
-                                        "startTimer",
-                                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                                        
"UIMAJMS_removed_expired_session__INFO",
-                                        new Object[] { 
Thread.currentThread().getId(), componentName,
-                                            inactivityTimeout, 
value.getValue().delegateEndpoint.getDestination(), 
brokerDestinations.getBrokerURL()  });
-*/
+//                                     System.out.println("-------- Closing 
Session for Destination:"+value.getValue().delegateEndpoint.getDestination());
+
+//                                     System.out.println("-------- Closing 
Session for Destination:"+value.getValue().delegateEndpoint.getDestination());
+//                                UIMAFramework.getLogger(CLASS_NAME).logrb(
+//                                        Level.INFO,
+//                                        CLASS_NAME.getName(),
+//                                        "startTimer",
+//                                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+//                                        
"UIMAJMS_removed_expired_session__INFO",
+//                                        new Object[] { 
Thread.currentThread().getId(), componentName,
+//                                            inactivityTimeout, 
value.getValue().delegateEndpoint.getDestination(), 
brokerDestinations.getBrokerURL()  });
+
                                }
                        }
                       } catch (Exception e) {
@@ -1970,7 +1977,8 @@ public class JmsOutputChannel implements
                                 brokerDestinations.setConnection(null);
                                 brokerDestinations.endpointMap.clear();
                                 connectionMap.remove(brokerDestinations);
-                                
+                                stopSessionReaperTimer();
+
                                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
                                     UIMAFramework.getLogger(CLASS_NAME).logrb(
                                             Level.FINE,
@@ -1983,17 +1991,19 @@ public class JmsOutputChannel implements
                                   }
                          }
                         } catch( Exception e) {
+                               e.printStackTrace();
                         }
                       }
                     }
                   } catch (Exception e) {
+                       e.printStackTrace();
                   } 
+
             }
-        }, 0, inactivityTimeout * 60,TimeUnit.SECONDS);
-        
+
+        }, inactivityTimeout * 60, inactivityTimeout * 60,TimeUnit.SECONDS);
       }
     private void cancelTimer() {
-      
       if ( scheduler != null ) {
         scheduler.shutdownNow();
       }


Reply via email to