Author: rajdavies
Date: Wed Mar 19 09:07:54 2008
New Revision: 638910

URL: http://svn.apache.org/viewvc?rev=638910&view=rev
Log:
Added separately configurable initial delay for timeout tasks on 
InactivityMonitor

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
 Wed Mar 19 09:07:54 2008
@@ -259,9 +259,20 @@
         return l == null ? 0 : l.longValue();
     }
 
-    public void seMaxInactivityDuration(long maxInactivityDuration) throws 
IOException {
+    public void setMaxInactivityDuration(long maxInactivityDuration) throws 
IOException {
         setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
     }
+    
+    public long getMaxInactivityDurationInitalDelay() throws IOException {
+        Long l = (Long)getProperty("MaxInactivityDurationInitalDelay");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxInactivityDurationInitalDelay(long 
maxInactivityDurationInitalDelay) throws IOException {
+        setProperty("MaxInactivityDurationInitalDelay", new 
Long(maxInactivityDurationInitalDelay));
+    }
+    
+   
 
     /**
      * @throws IOException

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
 Wed Mar 19 09:07:54 2008
@@ -36,7 +36,8 @@
     private boolean cacheEnabled = true;
     private boolean tightEncodingEnabled = true;
     private boolean sizePrefixDisabled;
-    private long maxInactivityDuration = 30 * 1000;
+    private long maxInactivityDuration = 30*1000;
+    private long maxInactivityDurationInitalDelay = 10*1000;
     private int cacheSize = 1024;
 
     public WireFormat createWireFormat() {
@@ -49,7 +50,8 @@
             info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
             info.setTightEncodingEnabled(tightEncodingEnabled);
             info.setSizePrefixDisabled(sizePrefixDisabled);
-            info.seMaxInactivityDuration(maxInactivityDuration);
+            info.setMaxInactivityDuration(maxInactivityDuration);
+            
info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
             info.setCacheSize(cacheSize);
         } catch (Exception e) {
             IllegalStateException ise = new IllegalStateException("Could not 
configure WireFormatInfo");
@@ -124,5 +126,14 @@
 
     public void setCacheSize(int cacheSize) {
         this.cacheSize = cacheSize;
+    }
+
+    public long getMaxInactivityDurationInitalDelay() {
+        return maxInactivityDurationInitalDelay;
+    }
+
+    public void setMaxInactivityDurationInitalDelay(
+            long maxInactivityDurationInitalDelay) {
+        this.maxInactivityDurationInitalDelay = 
maxInactivityDurationInitalDelay;
     }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
 Wed Mar 19 09:07:54 2008
@@ -23,6 +23,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activemq.command.KeepAliveInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.thread.SchedulerTimerTask;
@@ -51,6 +52,7 @@
     private final AtomicBoolean commandSent = new AtomicBoolean(false);
     private final AtomicBoolean inSend = new AtomicBoolean(false);
     private final AtomicBoolean failed = new AtomicBoolean(false);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
 
     private final AtomicBoolean commandReceived = new AtomicBoolean(true);
     private final AtomicBoolean inReceive = new AtomicBoolean(false);
@@ -59,6 +61,7 @@
     
     private long readCheckTime;
     private long writeCheckTime;
+    private long initialDelayTime;
     
     private final Runnable readChecker = new Runnable() {
         long lastRunTime;
@@ -107,7 +110,7 @@
     }
 
     public void stop() throws Exception {
-        stopMonitorThreads();
+        closeDown();
         next.stop();
     }
 
@@ -125,12 +128,15 @@
             }
             ASYNC_TASKS.execute(new Runnable() {
                 public void run() {
-                    try {
-                        KeepAliveInfo info = new KeepAliveInfo();
-                        info.setResponseRequired(true);
-                        oneway(info);
-                    } catch (IOException e) {
-                        onException(e);
+                    if (stopped.get() == false) {
+                        try {
+
+                            KeepAliveInfo info = new KeepAliveInfo();
+                            info.setResponseRequired(true);
+                            oneway(info);
+                        } catch (IOException e) {
+                            onException(e);
+                        }
                     }
                 };
             });
@@ -155,9 +161,10 @@
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No message received since last read check for " 
+ toString() + "! Throwing InactivityIOException.");
                 }
+                closeDown();
                 ASYNC_TASKS.execute(new Runnable() {  
                     public void run() {
-                       handleException(new InactivityIOException("Channel was 
inactive for too long: "+next.getRemoteAddress()));
+                        onException(new InactivityIOException("Channel was 
inactive for too long: "+next.getRemoteAddress()));
                     };
                     
                 });
@@ -218,15 +225,17 @@
         synchronized(inSend) {
             inSend.set(true);
             try {
+                
+                if( failed.get() ) {
+                    closeDown();
+                    throw new InactivityIOException("Channel was inactive for 
too long: "+next.getRemoteAddress());
+                }
                 if (o.getClass() == WireFormatInfo.class) {
                     synchronized (this) {
                         localWireFormatInfo = (WireFormatInfo)o;
                         startMonitorThreads();
                     }
                 }
-                if( failed.get() ) {
-                    throw new InactivityIOException("Channel was inactive for 
too long: "+next.getRemoteAddress());
-                }
                 next.oneway(o);
             } finally {
                 commandSent.set(true);
@@ -236,17 +245,18 @@
     }
 
     public void onException(IOException error) {
-       if( !failed.getAndSet(true) ) {
-               handleException(error);
-       }
+        closeDown();
+        if (!failed.getAndSet(true)) {
+            transportListener.onException(error);
+        }
+    }
+       
+       private void closeDown() {
+        stopped.set(true);
+        if (monitorStarted.get()) {
+            stopMonitorThreads();
+        }
     }
-
-       private void handleException(IOException error) {
-               if (monitorStarted.get()) {
-                   stopMonitorThreads();
-               }
-               transportListener.onException(error);
-       }
 
     private synchronized void startMonitorThreads() throws IOException {
         if (monitorStarted.get()) {
@@ -260,6 +270,7 @@
         }
 
         readCheckTime = 
Math.min(localWireFormatInfo.getMaxInactivityDuration(), 
remoteWireFormatInfo.getMaxInactivityDuration());
+        initialDelayTime =  
Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), 
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
         if (readCheckTime > 0) {
             monitorStarted.set(true);
             writeCheckerTask = new SchedulerTimerTask(writeChecker);
@@ -271,8 +282,8 @@
                    WRITE_CHECK_TIMER = new Timer("InactivityMonitor 
WriteCheck");
                }
                CHECKER_COUNTER++;
-                WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, 
writeCheckTime,writeCheckTime);
-                READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, 
readCheckTime,readCheckTime);
+                WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, 
initialDelayTime,writeCheckTime);
+                READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, 
initialDelayTime,readCheckTime);
             }
         }
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
 Wed Mar 19 09:07:54 2008
@@ -45,6 +45,7 @@
        }
        public void setTransport(Transport transport) {
                this.transport = transport;
+               this.transport.setTransportListener(this);
        }
        public URI getUri() {
                return uri;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 Wed Mar 19 09:07:54 2008
@@ -107,9 +107,12 @@
             public boolean iterate() {
                boolean result=false;
                boolean buildBackup=true;
-               if (connectedTransport.get()==null && !disposed) {
-                       result=doReconnect();
-                       buildBackup=false;
+               boolean doReconnect = !disposed;
+               synchronized(backupMutex) {
+                       if (connectedTransport.get()==null && !disposed) {
+                               result=doReconnect();
+                               buildBackup=false;
+                       }
                }
                if(buildBackup) {
                        buildBackups();
@@ -253,6 +256,10 @@
             started = false;
             disposed = true;
             connected = false;
+            for (BackupTransport t:backups) {
+                t.setDisposed(true);
+            }
+            backups.clear();
 
             if (connectedTransport.get() != null) {
                 transportToStop = connectedTransport.getAndSet(null);

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=638910&r1=638909&r2=638910&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
 Wed Mar 19 09:07:54 2008
@@ -186,7 +186,7 @@
         });
         clientTransport.start();
         WireFormatInfo info = new WireFormatInfo();
-        info.seMaxInactivityDuration(1000);
+        info.setMaxInactivityDuration(1000);
         clientTransport.oneway(info);
 
         assertEquals(0, serverErrorCount.get());


Reply via email to