Author: veithen
Date: Sat Sep 13 17:28:53 2008
New Revision: 695095
URL: http://svn.apache.org/viewvc?rev=695095&view=rev
Log:
SYNAPSE-434: Allow polls for different services to be executed in parallel. See
AbstractPollingTransportListener#schedulePoll for a description of the new
strategy.
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java?rev=695095&r1=695094&r2=695095&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
Sat Sep 13 17:28:53 2008
@@ -19,6 +19,8 @@
package org.apache.synapse.transport.base;
+import java.util.TimerTask;
+
import org.apache.axis2.addressing.EndpointReference;
public abstract class AbstractPollTableEntry {
@@ -38,6 +40,10 @@
private long pollInterval;
/** state of the last poll */
private int lastPollState;
+ /** The timer task that will trigger the next poll */
+ TimerTask timerTask;
+ /** Flag indicating whether polling has been canceled. */
+ boolean canceled;
public String getServiceName() {
return serviceName;
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java?rev=695095&r1=695094&r2=695095&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
Sat Sep 13 17:28:53 2008
@@ -29,24 +29,14 @@
import java.util.List;
import java.util.TimerTask;
import java.util.Timer;
-import java.util.Map;
-import java.util.HashMap;
public abstract class AbstractPollingTransportListener<T extends
AbstractPollTableEntry>
extends AbstractTransportListener {
/** The main timer. */
- protected Timer timer;
- /** is a poll already executing? */
- protected boolean pollInProgress = false;
- /** a lock to prevent concurrent execution of polling */
- protected final Object pollLock = new Object();
- /** a map that keeps track of services to the timer tasks created for them
*/
- protected Map serviceToTimerTaskMap = new HashMap();
+ private Timer timer;
/** Keep the list of endpoints and poll durations */
private final List<T> pollTable = new ArrayList<T>();
- /** Keep the list of removed pollTable entries */
- private final List<T> removeTable = new ArrayList<T>();
@Override
public void init(ConfigurationContext cfgCtx,
@@ -64,78 +54,42 @@
}
/**
- * Schedule a repeated poll at the specified interval for the given service
- * @param service the service to be polled
+ * Schedule a repeated poll at the specified interval for a given service.
+ * The method will schedule a single-shot timer task with executes a work
+ * task on the worker pool. At the end of this work task, a new timer task
+ * is scheduled for the next poll (except if the polling for the service
+ * has been canceled). This effectively schedules the poll repeatedly
+ * with fixed delay.
+ * @param entry the poll table entry with the configuration for the service
* @param pollInterval the interval between successive polls in
milliseconds
*/
- public void schedulePoll(AxisService service, long pollInterval) {
- TimerTask task = (TimerTask) serviceToTimerTaskMap.get(service);
-
- // if a timer task exists, cancel it first and create a new one
- if (task != null) {
- task.cancel();
- }
-
- task = new TimerTask() {
+ void schedulePoll(final T entry, final long pollInterval) {
+ TimerTask timerTask = new TimerTask() {
+ @Override
public void run() {
- if (pollInProgress) {
- if (log.isDebugEnabled()) {
- log.debug("Transport " + getTransportName() +
- " onPoll() trigger : already executing
poll..");
- }
- return;
- }
-
- if (state == BaseConstants.PAUSED) {
- if (log.isDebugEnabled()) {
- log.debug("Transport " + getTransportName() +
- " onPoll() trigger : Transport is currently
paused..");
- }
- return;
- }
-
workerPool.execute(new Runnable() {
public void run() {
- synchronized (pollLock) {
- pollInProgress = true;
- try {
- onPoll();
- } finally {
- pollInProgress = false;
+ if (state == BaseConstants.PAUSED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transport " + getTransportName() +
+ " poll trigger : Transport is
currently paused..");
+ }
+ } else {
+ poll(entry);
+ }
+ synchronized (entry) {
+ if (!entry.canceled) {
+ schedulePoll(entry, pollInterval);
}
}
}
});
}
};
- serviceToTimerTaskMap.put(service, task);
- timer.scheduleAtFixedRate(task, pollInterval, pollInterval);
- }
-
- /**
- * Cancel any pending timer tasks for the given service
- * @param service the service for which the timer task should be cancelled
- */
- public void cancelPoll(AxisService service) {
- TimerTask task = (TimerTask) serviceToTimerTaskMap.get(service);
- if (task != null) {
- task.cancel();
- }
+ entry.timerTask = timerTask;
+ timer.schedule(timerTask, pollInterval);
}
- public void onPoll() {
- if (!removeTable.isEmpty()) {
- pollTable.removeAll(removeTable);
- }
-
- for (T entry : pollTable) {
- long startTime = System.currentTimeMillis();
- if (startTime > entry.getNextPollTime()) {
- poll(entry);
- }
- }
- }
-
protected abstract void poll(T entry);
/**
@@ -184,7 +138,7 @@
disableTransportForService(service);
} else {
entry.setServiceName(service.getName());
- schedulePoll(service, pollInterval);
+ schedulePoll(entry, pollInterval);
pollTable.add(entry);
}
}
@@ -213,8 +167,11 @@
protected void stopListeningForService(AxisService service) {
for (T entry : pollTable) {
if (service.getName().equals(entry.getServiceName())) {
- cancelPoll(service);
- removeTable.add(entry);
+ synchronized (entry) {
+ entry.timerTask.cancel();
+ entry.canceled = true;
+ }
+ break;
}
}
}