Author: veithen
Date: Sat Sep 13 17:28:53 2008
New Revision: 695095

URL: http://svn.apache.org/viewvc?rev=695095&view=rev
Log:
SYNAPSE-434: Allow polls for different services to be executed in parallel. See 
AbstractPollingTransportListener#schedulePoll for a description of the new 
strategy.

Modified:
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java?rev=695095&r1=695094&r2=695095&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
 Sat Sep 13 17:28:53 2008
@@ -19,6 +19,8 @@
 
 package org.apache.synapse.transport.base;
 
+import java.util.TimerTask;
+
 import org.apache.axis2.addressing.EndpointReference;
 
 public abstract class AbstractPollTableEntry {
@@ -38,6 +40,10 @@
     private long pollInterval;
     /** state of the last poll */
     private int lastPollState;
+    /** The timer task that will trigger the next poll */
+    TimerTask timerTask;
+    /** Flag indicating whether polling has been canceled. */
+    boolean canceled;
     
     public String getServiceName() {
         return serviceName;

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java?rev=695095&r1=695094&r2=695095&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
 Sat Sep 13 17:28:53 2008
@@ -29,24 +29,14 @@
 import java.util.List;
 import java.util.TimerTask;
 import java.util.Timer;
-import java.util.Map;
-import java.util.HashMap;
 
 public abstract class AbstractPollingTransportListener<T extends 
AbstractPollTableEntry>
         extends AbstractTransportListener {
 
     /** The main timer. */
-    protected Timer timer;
-    /** is a poll already executing? */
-    protected boolean pollInProgress = false;
-    /** a lock to prevent concurrent execution of polling */
-    protected final Object pollLock = new Object();
-    /** a map that keeps track of services to the timer tasks created for them 
*/
-    protected Map serviceToTimerTaskMap = new HashMap();
+    private Timer timer;
     /** Keep the list of endpoints and poll durations */
     private final List<T> pollTable = new ArrayList<T>();
-    /** Keep the list of removed pollTable entries */
-    private final List<T> removeTable = new ArrayList<T>();
 
     @Override
     public void init(ConfigurationContext cfgCtx,
@@ -64,78 +54,42 @@
     }
 
     /**
-     * Schedule a repeated poll at the specified interval for the given service
-     * @param service the service to be polled
+     * Schedule a repeated poll at the specified interval for a given service.
+     * The method will schedule a single-shot timer task with executes a work
+     * task on the worker pool. At the end of this work task, a new timer task
+     * is scheduled for the next poll (except if the polling for the service
+     * has been canceled). This effectively schedules the poll repeatedly
+     * with fixed delay.
+     * @param entry the poll table entry with the configuration for the service
      * @param pollInterval the interval between successive polls in 
milliseconds
      */
-    public void schedulePoll(AxisService service, long pollInterval) {
-        TimerTask task = (TimerTask) serviceToTimerTaskMap.get(service);
-
-        // if a timer task exists, cancel it first and create a new one
-        if (task != null) {
-            task.cancel();
-        }
-
-        task = new TimerTask() {
+    void schedulePoll(final T entry, final long pollInterval) {
+        TimerTask timerTask = new TimerTask() {
+            @Override
             public void run() {
-                if (pollInProgress) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Transport " + getTransportName() +
-                                " onPoll() trigger : already executing 
poll..");
-                    }
-                    return;
-                }
-
-                if (state == BaseConstants.PAUSED) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Transport " + getTransportName() +
-                                " onPoll() trigger : Transport is currently 
paused..");
-                    }
-                    return;
-                }
-
                 workerPool.execute(new Runnable() {
                     public void run() {
-                        synchronized (pollLock) {
-                            pollInProgress = true;
-                            try {
-                                onPoll();
-                            } finally {
-                                pollInProgress = false;
+                        if (state == BaseConstants.PAUSED) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Transport " + getTransportName() +
+                                        " poll trigger : Transport is 
currently paused..");
+                            }
+                        } else {
+                            poll(entry);
+                        }
+                        synchronized (entry) {
+                            if (!entry.canceled) {
+                                schedulePoll(entry, pollInterval);
                             }
                         }
                     }
                 });
             }
         };
-        serviceToTimerTaskMap.put(service, task);
-        timer.scheduleAtFixedRate(task, pollInterval, pollInterval);
-    }
-
-    /**
-     * Cancel any pending timer tasks for the given service
-     * @param service the service for which the timer task should be cancelled
-     */
-    public void cancelPoll(AxisService service) {
-        TimerTask task = (TimerTask) serviceToTimerTaskMap.get(service);
-        if (task != null) {
-            task.cancel();
-        }
+        entry.timerTask = timerTask;
+        timer.schedule(timerTask, pollInterval);
     }
 
-    public void onPoll() {
-        if (!removeTable.isEmpty()) {
-            pollTable.removeAll(removeTable);
-        }
-
-        for (T entry : pollTable) {
-            long startTime = System.currentTimeMillis();
-            if (startTime > entry.getNextPollTime()) {
-                poll(entry);
-            }
-        }
-    }
-    
     protected abstract void poll(T entry);
 
     /**
@@ -184,7 +138,7 @@
             disableTransportForService(service);
         } else {
             entry.setServiceName(service.getName());
-            schedulePoll(service, pollInterval);
+            schedulePoll(entry, pollInterval);
             pollTable.add(entry);
         }
     }
@@ -213,8 +167,11 @@
     protected void stopListeningForService(AxisService service) {
         for (T entry : pollTable) {
             if (service.getName().equals(entry.getServiceName())) {
-                cancelPoll(service);
-                removeTable.add(entry);
+                synchronized (entry) {
+                    entry.timerTask.cancel();
+                    entry.canceled = true;
+                }
+                break;
             }
         }
     }


Reply via email to