Author: cwiklik
Date: Thu Jul 4 12:41:00 2013
New Revision: 1499734
URL: http://svn.apache.org/r1499734
Log:
UIMA-2776 Fixed session cleanup
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL:
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1499734&r1=1499733&r2=1499734&view=diff
==============================================================================
---
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
(original)
+++
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Thu Jul 4 12:41:00 2013
@@ -25,8 +25,10 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ConnectException;
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
@@ -449,7 +451,6 @@ public class JmsOutputChannel implements
// First get a Map containing destinations managed by a broker provided by
the client
BrokerConnectionEntry brokerConnectionEntry = null;
boolean startInactivityReaperTimer = false;
-
if (connectionMap.containsKey(brokerConnectionURL)) {
brokerConnectionEntry = (BrokerConnectionEntry)
connectionMap.get(brokerConnectionURL);
// Findbugs thinks that the above may return null, perhaps due to a race
condition. Add
@@ -465,10 +466,12 @@ public class JmsOutputChannel implements
invalidateConnectionAndEndpoints(brokerConnectionEntry);
brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
startInactivityReaperTimer = true;
+// System.out.println(">>>>>> Connection Map
Size:"+connectionMap.size());
}
} else {
brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
System.out.println("---------------- New Broker "+brokerConnectionURL);
+// System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
// long replyQueueInactivityTimeout = getInactivityTimeout(destination,
brokerConnectionURL);
startInactivityReaperTimer = true;
}
@@ -480,6 +483,7 @@ public class JmsOutputChannel implements
// closed.
brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
}
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -1856,6 +1860,7 @@ public class JmsOutputChannel implements
public void addEndpointConnection(Object key, JmsEndpointConnection_impl
endpointConnection) {
endpointMap.put(key, endpointConnection);
+// System.out.println("----------- Endpoint Map for Endpoint:"+key+"
Has:"+endpointMap.size()+" Entries");
}
public JmsEndpointConnection_impl getEndpointConnection(Object key) {
@@ -1884,7 +1889,7 @@ public class JmsOutputChannel implements
private String componentName = "";
- private ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
+ private ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
public ConnectionTimer(BrokerConnectionEntry aBrokerDestinations) {
brokerDestinations = aBrokerDestinations;
@@ -1907,7 +1912,7 @@ public class JmsOutputChannel implements
public void stopSessionReaperTimer() {
if ( scheduler != null ) {
- scheduler.shutdownNow();
+ scheduler.shutdownNow();
}
}
/**
@@ -1917,11 +1922,11 @@ public class JmsOutputChannel implements
* @param aComponentName
*/
public synchronized void startSessionReaperTimer( String aComponentName) {
-
// Fire the runnable at fixed intervals equal to inactivityTimeout
value
- scheduler.scheduleAtFixedRate(new Runnable(){
+ scheduler.scheduleWithFixedDelay(new Runnable(){
public void run() {
- //System.out.println("Hashcode:"+hashCode()+"
Controller:"+controller.getComponentName()+" Session Cleanup Thread Woke Up
After "+inactivityTimeout +" minutes of Sleep to Clean Up Unused JMS Sessions");
+ System.out.println("SessionReaper Thread Woke Up
After:"+inactivityTimeout*60*1000+" Millis");
+ long inactivityThreshold = inactivityTimeout*60*1000; //
normalize into millis
if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.INFO,
@@ -1944,20 +1949,22 @@ public class JmsOutputChannel implements
while( it.hasNext() ) {
Entry<Object, JmsEndpointConnection_impl> value
= it.next();
long lastDispatchTime =
value.getValue().lastDispatchTimestamp.get();
- if ( lastDispatchTime > 0 &&
(System.currentTimeMillis() - lastDispatchTime) >= inactivityTimeout ) {
+// System.out.println("--------
lastDispatchTime:"+lastDispatchTime+" Delta:"+(System.currentTimeMillis() -
lastDispatchTime)+" InactivityThreshold:"+inactivityThreshold);
+ if ( lastDispatchTime > 0 &&
(System.currentTimeMillis() - lastDispatchTime) >= inactivityThreshold ) {
value.getValue().close(); // close the
jms session
it.remove();
-/*
- System.out.println("-------- Closing
Session for Destination:"+value.getValue().delegateEndpoint.getDestination());
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.INFO,
- CLASS_NAME.getName(),
- "startTimer",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-
"UIMAJMS_removed_expired_session__INFO",
- new Object[] {
Thread.currentThread().getId(), componentName,
- inactivityTimeout,
value.getValue().delegateEndpoint.getDestination(),
brokerDestinations.getBrokerURL() });
-*/
+// System.out.println("-------- Closing
Session for Destination:"+value.getValue().delegateEndpoint.getDestination());
+
+// System.out.println("-------- Closing
Session for Destination:"+value.getValue().delegateEndpoint.getDestination());
+// UIMAFramework.getLogger(CLASS_NAME).logrb(
+// Level.INFO,
+// CLASS_NAME.getName(),
+// "startTimer",
+// JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+//
"UIMAJMS_removed_expired_session__INFO",
+// new Object[] {
Thread.currentThread().getId(), componentName,
+// inactivityTimeout,
value.getValue().delegateEndpoint.getDestination(),
brokerDestinations.getBrokerURL() });
+
}
}
} catch (Exception e) {
@@ -1970,7 +1977,8 @@ public class JmsOutputChannel implements
brokerDestinations.setConnection(null);
brokerDestinations.endpointMap.clear();
connectionMap.remove(brokerDestinations);
-
+ stopSessionReaperTimer();
+
if
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -1983,17 +1991,19 @@ public class JmsOutputChannel implements
}
}
} catch( Exception e) {
+ e.printStackTrace();
}
}
}
} catch (Exception e) {
+ e.printStackTrace();
}
+
}
- }, 0, inactivityTimeout * 60,TimeUnit.SECONDS);
-
+
+ }, inactivityTimeout * 60, inactivityTimeout * 60,TimeUnit.SECONDS);
}
private void cancelTimer() {
-
if ( scheduler != null ) {
scheduler.shutdownNow();
}