Author: veithen
Date: Sat Aug 23 10:41:44 2008
New Revision: 688383
URL: http://svn.apache.org/viewvc?rev=688383&view=rev
Log:
Merged duplicate code in MailTransportListener and VFSTransportListener into
their base class AbstractPollingTransportListener. This should improve
maintainability and make it easier to solve SYNAPSE-434.
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
Added:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java?rev=688383&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
(added)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
Sat Aug 23 10:41:44 2008
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.base;
+
+import org.apache.axis2.addressing.EndpointReference;
+
+public abstract class AbstractPollTableEntry {
+ // status of last scan
+ public static final int SUCCSESSFUL = 0;
+ public static final int WITH_ERRORS = 1;
+ public static final int FAILED = 2;
+ public static final int NONE = 3;
+
+ /** Axis2 service name */
+ private String serviceName;
+ /** next poll time */
+ private long nextPollTime;
+ /** last poll performed at */
+ private long lastPollTime;
+ /** duration in ms between successive polls */
+ private long pollInterval;
+ /** state of the last poll */
+ private int lastPollState;
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public abstract EndpointReference getEndpointReference();
+
+ public long getNextPollTime() {
+ return nextPollTime;
+ }
+
+ public void setNextPollTime(long nextPollTime) {
+ this.nextPollTime = nextPollTime;
+ }
+
+ public long getLastPollTime() {
+ return lastPollTime;
+ }
+
+ public void setLastPollTime(long lastPollTime) {
+ this.lastPollTime = lastPollTime;
+ }
+
+ public long getPollInterval() {
+ return pollInterval;
+ }
+
+ public void setPollInterval(long pollInterval) {
+ this.pollInterval = pollInterval;
+ }
+
+ public int getLastPollState() {
+ return lastPollState;
+ }
+
+ public void setLastPollState(int lastPollState) {
+ this.lastPollState = lastPollState;
+ }
+}
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
Sat Aug 23 10:41:44 2008
@@ -18,26 +18,23 @@
*/
package org.apache.synapse.transport.base;
+import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.AxisFault;
+import java.util.ArrayList;
+import java.util.List;
import java.util.TimerTask;
import java.util.Timer;
import java.util.Map;
import java.util.HashMap;
-public abstract class AbstractPollingTransportListener extends
AbstractTransportListener {
+public abstract class AbstractPollingTransportListener<T extends
AbstractPollTableEntry>
+ extends AbstractTransportListener {
- /** the parameter in the services.xml that specifies the poll interval for
a service */
- public static final String TRANSPORT_POLL_INTERVAL =
"transport.PollInterval";
- /** the default poll interval */
- public static final int DEFAULT_POLL_INTERVAL = 5 * 60; // 5 mins by
default
-
- /** default interval in ms before polls */
- protected int pollInterval = DEFAULT_POLL_INTERVAL;
/** The main timer. */
protected Timer timer;
/** is a poll already executing? */
@@ -46,6 +43,10 @@
protected final Object pollLock = new Object();
/** a map that keeps track of services to the timer tasks created for them
*/
protected Map serviceToTimerTaskMap = new HashMap();
+ /** Keep the list of endpoints and poll durations */
+ private final List<T> pollTable = new ArrayList<T>();
+ /** Keep the list of removed pollTable entries */
+ private final List<T> removeTable = new ArrayList<T>();
@Override
public void init(ConfigurationContext cfgCtx,
@@ -124,38 +125,92 @@
}
}
- public void onPoll() {}
+ public void onPoll() {
+ if (!removeTable.isEmpty()) {
+ pollTable.removeAll(removeTable);
+ }
- protected void startListeningForService(AxisService service) {
+ for (T entry : pollTable) {
+ long startTime = System.currentTimeMillis();
+ if (startTime > entry.getNextPollTime()) {
+ poll(entry);
+ }
+ }
+ }
+
+ protected abstract void poll(T entry);
- if (service.getName().startsWith("__")) {
- return;
+ /**
+ * method to log a failure to the log file and to update the last poll
status and time
+ * @param msg text for the log message
+ * @param e optional exception encountered or null
+ * @param entry the PollTableEntry
+ */
+ protected void processFailure(String msg, Exception e, T entry) {
+ if (e == null) {
+ log.error(msg);
+ } else {
+ log.error(msg, e);
}
+ long now = System.currentTimeMillis();
+ entry.setLastPollState(AbstractPollTableEntry.FAILED);
+ entry.setLastPollTime(now);
+ entry.setNextPollTime(now + entry.getPollInterval());
+ }
+
+ @Override
+ protected void startListeningForService(AxisService service) {
- Parameter param = service.getParameter(TRANSPORT_POLL_INTERVAL);
- long pollInterval = DEFAULT_POLL_INTERVAL;
+ Parameter param =
service.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
+ long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
if (param != null && param.getValue() instanceof String) {
try {
pollInterval = Integer.parseInt(param.getValue().toString());
} catch (NumberFormatException e) {
log.error("Invalid poll interval : " + param.getValue() + "
for service : " +
- service.getName() + " Using defaults", e);
- disableTransportForService(service);
+ service.getName() + " default to : "
+ + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) +
"sec", e);
}
}
- schedulePoll(service, pollInterval);
- }
-
- protected void stopListeningForService(AxisService service) {
- cancelPoll(service);
+
+ T entry = createPollTableEntry(service);
+ if (entry == null) {
+ disableTransportForService(service);
+ } else {
+ entry.setServiceName(service.getName());
+ schedulePoll(service, pollInterval);
+ pollTable.add(entry);
+ }
}
+
+ protected abstract T createPollTableEntry(AxisService service);
- public int getPollInterval() {
- return pollInterval;
+ /**
+ * Get the EPR for the given service
+ *
+ * @param serviceName service name
+ * @param ip ignored
+ * @return the EPR for the service
+ * @throws AxisFault not used
+ */
+ public EndpointReference[] getEPRsForService(String serviceName, String
ip) throws AxisFault {
+ for (T entry : pollTable) {
+ if (entry.getServiceName().equals(serviceName) ||
+ serviceName.startsWith(entry.getServiceName() + ".")) {
+ return new EndpointReference[]{ entry.getEndpointReference() };
+ }
+ }
+ return null;
}
- public void setPollInterval(int pollInterval) {
- this.pollInterval = pollInterval;
+ @Override
+ protected void stopListeningForService(AxisService service) {
+ for (T entry : pollTable) {
+ if (service.getName().equals(entry.getServiceName())) {
+ cancelPoll(service);
+ removeTable.add(entry);
+ }
+ }
}
// -- jmx/management methods--
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
Sat Aug 23 10:41:44 2008
@@ -52,17 +52,12 @@
* (e.g. with imap). When checking for new mail, the transport ignores
messages already flaged as
* SEEN and DELETED
*/
-public class MailTransportListener extends AbstractPollingTransportListener
+public class MailTransportListener extends
AbstractPollingTransportListener<PollTableEntry>
implements ManagementSupport {
public static final String DELETE = "DELETE";
public static final String MOVE = "MOVE";
- /** Keep the list of email accounts and poll durations */
- private final List<PollTableEntry> pollTable = new
ArrayList<PollTableEntry>();
- /** Keep the list of removed pollTable entries */
- private final List<PollTableEntry> removeTable = new
ArrayList<PollTableEntry>();
-
/**
* Initializes the Mail transport
*
@@ -70,26 +65,15 @@
* @param trpInDesc the POP3 transport in description from the axis2.xml
* @throws AxisFault on error
*/
+ @Override
public void init(ConfigurationContext cfgCtx, TransportInDescription
trpInDesc)
throws AxisFault {
super.init(cfgCtx, trpInDesc);
}
- /**
- * On a poller tick, iterate over the list of mail accounts and check if
- * it is time to scan the contents for new files
- */
- public void onPoll() {
- if (!removeTable.isEmpty()) {
- pollTable.removeAll(removeTable);
- }
-
- for (PollTableEntry entry : pollTable) {
- long startTime = System.currentTimeMillis();
- if (startTime > entry.getNextPollTime()) {
- checkMail(entry, entry.getEmailAddress());
- }
- }
+ @Override
+ protected void poll(PollTableEntry entry) {
+ checkMail(entry, entry.getEmailAddress());
}
/**
@@ -445,58 +429,8 @@
}
}
-
- /**
- * method to log a failure to the log file and to update the last poll
status and time
- *
- * @param msg text for the log message
- * @param e optiona exception encountered or null
- * @param entry the PollTableEntry
- */
- private void processFailure(String msg, Exception e, PollTableEntry entry)
{
- if (e == null) {
- log.error(msg);
- } else {
- log.error(msg, e);
- }
- long now = System.currentTimeMillis();
- entry.setLastPollState(PollTableEntry.FAILED);
- entry.setLastPollTime(now);
- entry.setNextPollTime(now + entry.getPollInterval());
- }
-
-
- /**
- * Get the EPR for the given service over the Mail transport
- *
- * @param serviceName service name
- * @param ip ignored
- * @return the EPR for the service
- * @throws AxisFault not used
- */
- public EndpointReference[] getEPRsForService(String serviceName, String
ip) throws AxisFault {
- for (PollTableEntry entry : pollTable) {
- if (entry.getServiceName().equals(serviceName) ||
- serviceName.startsWith(entry.getServiceName() + ".")) {
- return new EndpointReference[]{new EndpointReference(
- MailConstants.TRANSPORT_PREFIX +
entry.getEmailAddress())};
- }
- }
- return null;
- }
-
- protected void startListeningForService(AxisService service) {
-
- Parameter param =
service.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
- long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
- if (param != null && param.getValue() instanceof String) {
- try {
- pollInterval = Integer.parseInt(param.getValue().toString());
- } catch (NumberFormatException e) {
- log.error("Invalid poll interval : " + param.getValue() + "
for service : " +
- service.getName() + " default to : " +
(BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e);
- }
- }
+ @Override
+ protected PollTableEntry createPollTableEntry(AxisService service) {
PollTableEntry entry = new PollTableEntry();
try {
@@ -558,30 +492,19 @@
if (strReconnectTimeout != null)
entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout) * 1000);
- entry.setServiceName(service.getName());
- schedulePoll(service, pollInterval);
- pollTable.add(entry);
-
+ return entry;
+
} catch (AxisFault axisFault) {
String msg = "Error configuring the Mail transport for Service : "
+
service.getName() + " :: " + axisFault.getMessage();
log.warn(msg);
- disableTransportForService(service);
+ return null;
} catch (AddressException e) {
String msg = "Error configuring the Mail transport for Service : "
+
" Invalid email address specified by '" +
MailConstants.TRANSPORT_MAIL_ADDRESS +
"'parameter for service : " + service.getName() + " :: " +
e.getMessage();
log.warn(msg);
- disableTransportForService(service);
- }
- }
-
- protected void stopListeningForService(AxisService service) {
- for (PollTableEntry entry : pollTable) {
- if (service.getName().equals(entry.getServiceName())) {
- cancelPoll(service);
- removeTable.add(entry);
- }
+ return null;
}
}
}
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
Sat Aug 23 10:41:44 2008
@@ -19,31 +19,28 @@
package org.apache.synapse.transport.mail;
-import javax.mail.Session;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.AddressException;
-import java.util.List;
import java.util.ArrayList;
-import java.util.StringTokenizer;
+import java.util.List;
import java.util.Properties;
+import java.util.StringTokenizer;
+
+import javax.mail.Session;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.InternetAddress;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.synapse.transport.base.AbstractPollTableEntry;
/**
* Holds information about an entry in the VFS transport poll table used by the
* VFS Transport Listener
*/
-public class PollTableEntry {
-
- // status of last mail check
- public static final int SUCCSESSFUL = 0;
- public static final int FAILED = 2;
- public static final int NONE = 3;
+public class PollTableEntry extends AbstractPollTableEntry {
// operation after mail check
public static final int DELETE = 0;
public static final int MOVE = 1;
- /** Axis2 service name */
- private String serviceName;
/** The email address mapped to the service */
private InternetAddress emailAddress = null;
@@ -66,18 +63,9 @@
private InternetAddress replyAddress = null;
/** list of mail headers to be preserved into the Axis2 message as
transport headers */
- private List preserveHeaders = null;
+ private List<String> preserveHeaders = null;
/** list of mail headers to be removed from the Axis2 message transport
headers */
- private List removeHeaders = null;
-
- /** last poll performed at */
- private long lastPollTime;
- /** duration in ms between successive polls */
- private long pollInterval;
- /** next poll time */
- private long nextPollTime;
- /** state of the last poll */
- private int lastPollState;
+ private List<String> removeHeaders = null;
/** action to take after a successful poll */
private int actionAfterProcess = DELETE;
@@ -92,6 +80,10 @@
private int maxRetryCount;
private long reconnectTimeout;
+ @Override
+ public EndpointReference getEndpointReference() {
+ return new EndpointReference(MailConstants.TRANSPORT_PREFIX +
emailAddress);
+ }
public InternetAddress getEmailAddress() {
return emailAddress;
@@ -101,14 +93,6 @@
this.emailAddress = new InternetAddress(emailAddress);
}
- public String getServiceName() {
- return serviceName;
- }
-
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
-
public String getUserName() {
return userName;
}
@@ -141,38 +125,6 @@
this.contentType = contentType;
}
- public long getLastPollTime() {
- return lastPollTime;
- }
-
- public void setLastPollTime(long lastPollTime) {
- this.lastPollTime = lastPollTime;
- }
-
- public long getPollInterval() {
- return pollInterval;
- }
-
- public void setPollInterval(long pollInterval) {
- this.pollInterval = pollInterval;
- }
-
- public long getNextPollTime() {
- return nextPollTime;
- }
-
- public void setNextPollTime(long nextPollTime) {
- this.nextPollTime = nextPollTime;
- }
-
- public int getLastPollState() {
- return lastPollState;
- }
-
- public void setLastPollState(int lastPollState) {
- this.lastPollState = lastPollState;
- }
-
public int getActionAfterProcess() {
return actionAfterProcess;
}
@@ -270,7 +222,7 @@
public void addPreserveHeaders(String headerList) {
if (headerList == null) return;
StringTokenizer st = new StringTokenizer(headerList, " ,");
- preserveHeaders = new ArrayList();
+ preserveHeaders = new ArrayList<String>();
while (st.hasMoreTokens()) {
String token = st.nextToken();
if (token.length() != 0) {
@@ -282,7 +234,7 @@
public void addRemoveHeaders(String headerList) {
if (headerList == null) return;
StringTokenizer st = new StringTokenizer(headerList, " ,");
- removeHeaders = new ArrayList();
+ removeHeaders = new ArrayList<String>();
while (st.hasMoreTokens()) {
String token = st.nextToken();
if (token.length() != 0) {
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
Sat Aug 23 10:41:44 2008
@@ -18,27 +18,21 @@
*/
package org.apache.synapse.transport.vfs;
-import java.text.SimpleDateFormat;
import java.text.DateFormat;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.synapse.transport.base.AbstractPollTableEntry;
+
/**
* Holds information about an entry in the VFS transport poll table used by the
* VFS Transport Listener
*/
-public class PollTableEntry {
-
- // status of last scan
- public static final int SUCCSESSFUL = 0;
- public static final int WITH_ERRORS = 1;
- public static final int FAILED = 2;
- public static final int NONE = 3;
+public class PollTableEntry extends AbstractPollTableEntry {
// operation after scan
public static final int DELETE = 0;
public static final int MOVE = 1;
- /** Axis2 service name */
- private String serviceName;
/** File or Directory to scan */
private String fileURI;
/** file name pattern for a directory or compressed file entry */
@@ -46,14 +40,6 @@
/** Content-Type to use for the message */
private String contentType;
- /** last poll performed at */
- private long lastPollTime;
- /** duration in ms between successive polls */
- private long pollInterval;
- /** next poll time */
- private long nextPollTime;
- /** state of the last poll */
- private int lastPollState;
/** action to take after a successful poll */
private int actionAfterProcess = DELETE;
/** action to take after a poll with errors */
@@ -73,12 +59,9 @@
private int maxRetryCount;
private long reconnectTimeout;
- public String getServiceName() {
- return serviceName;
- }
-
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
+ @Override
+ public EndpointReference getEndpointReference() {
+ return new EndpointReference("vfs:" + fileURI);
}
public String getFileURI() {
@@ -109,38 +92,6 @@
this.contentType = contentType;
}
- public long getLastPollTime() {
- return lastPollTime;
- }
-
- public void setLastPollTime(long lastPollTime) {
- this.lastPollTime = lastPollTime;
- }
-
- public long getPollInterval() {
- return pollInterval;
- }
-
- public void setPollInterval(long pollInterval) {
- this.pollInterval = pollInterval;
- }
-
- public long getNextPollTime() {
- return nextPollTime;
- }
-
- public void setNextPollTime(long nextPollTime) {
- this.nextPollTime = nextPollTime;
- }
-
- public int getLastPollState() {
- return lastPollState;
- }
-
- public void setLastPollState(int lastPollState) {
- this.lastPollState = lastPollState;
- }
-
public int getActionAfterProcess() {
return actionAfterProcess;
}
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
Sat Aug 23 10:41:44 2008
@@ -24,7 +24,6 @@
import org.apache.synapse.transport.base.ManagementSupport;
import org.apache.synapse.transport.base.ParamUtils;
import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.description.*;
@@ -98,7 +97,7 @@
* ftp://ftpuser:[EMAIL PROTECTED]/somefile.csv?passive=true
* ftp://vfs:[EMAIL PROTECTED]/somepath/somefile.xml?passive=true
*/
-public class VFSTransportListener extends AbstractPollingTransportListener
+public class VFSTransportListener extends
AbstractPollingTransportListener<PollTableEntry>
implements ManagementSupport {
public static final String TRANSPORT_NAME = "vfs";
@@ -106,10 +105,6 @@
public static final String DELETE = "DELETE";
public static final String MOVE = "MOVE";
- /** Keep the list of directories/files and poll durations */
- private final List<PollTableEntry> pollTable = new
ArrayList<PollTableEntry>();
- /** Keep the list of removed pollTable entries */
- private final List<PollTableEntry> removeTable = new
ArrayList<PollTableEntry>();
/** The VFS file system manager */
private FileSystemManager fsManager = null;
@@ -119,6 +114,7 @@
* @param trpInDesc the VFS transport in description from the axis2.xml
* @throws AxisFault on error
*/
+ @Override
public void init(ConfigurationContext cfgCtx, TransportInDescription
trpInDesc)
throws AxisFault {
super.init(cfgCtx, trpInDesc);
@@ -132,21 +128,9 @@
}
}
- /**
- * On a poller tick, iterate over the list of directories/files and check
if
- * it is time to scan the contents for new files
- */
- public void onPoll() {
- if (!removeTable.isEmpty()) {
- pollTable.removeAll(removeTable);
- }
-
- for (PollTableEntry entry : pollTable) {
- long startTime = System.currentTimeMillis();
- if (startTime > entry.getNextPollTime()) {
- scanFileOrDirectory(entry, entry.getFileURI());
- }
- }
+ @Override
+ protected void poll(PollTableEntry entry) {
+ scanFileOrDirectory(entry, entry.getFileURI());
}
/**
@@ -483,56 +467,8 @@
}
}
- /**
- * method to log a failure to the log file and to update the last poll
status and time
- * @param msg text for the log message
- * @param e optiona exception encountered or null
- * @param entry the PollTableEntry
- */
- private void processFailure(String msg, Exception e, PollTableEntry entry)
{
- if (e == null) {
- log.error(msg);
- } else {
- log.error(msg, e);
- }
- long now = System.currentTimeMillis();
- entry.setLastPollState(PollTableEntry.FAILED);
- entry.setLastPollTime(now);
- entry.setNextPollTime(now + entry.getPollInterval());
- }
-
- /**
- * Get the EPR for the given service over the VFS transport
- * vfs:uri (@see http://jakarta.apache.org/commons/vfs/filesystems.html
for the URI formats)
- * @param serviceName service name
- * @param ip ignored
- * @return the EPR for the service
- * @throws AxisFault not used
- */
- public EndpointReference[] getEPRsForService(String serviceName, String
ip) throws AxisFault {
- for (PollTableEntry entry : pollTable) {
- if (entry.getServiceName().equals(serviceName)
- || serviceName.startsWith(entry.getServiceName() + ".")) {
- return new EndpointReference[]{new EndpointReference("vfs:" +
entry.getFileURI())};
- }
- }
- return null;
- }
-
- protected void startListeningForService(AxisService service) {
-
- Parameter param =
service.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
- long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
- if (param != null && param.getValue() instanceof String) {
- try {
- pollInterval = Integer.parseInt(param.getValue().toString());
- } catch (NumberFormatException e) {
- log.error("Invalid poll interval : " + param.getValue() + "
for service : " +
- service.getName() + " default to : "
- + (BaseConstants.DEFAULT_POLL_INTERVAL/1000) + "sec",
e);
- disableTransportForService(service);
- }
- }
+ @Override
+ protected PollTableEntry createPollTableEntry(AxisService service) {
PollTableEntry entry = new PollTableEntry();
try {
@@ -582,26 +518,13 @@
if(strReconnectTimeout != null)
entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout)
* 1000);
- entry.setServiceName(service.getName());
- schedulePoll(service, pollInterval);
- pollTable.add(entry);
-
+ return entry;
+
} catch (AxisFault axisFault) {
String msg = "Error configuring the File/VFS transport for Service
: " +
service.getName() + " :: " + axisFault.getMessage();
log.warn(msg);
- disableTransportForService(service);
- }
- }
-
- protected void stopListeningForService(AxisService service) {
- Iterator iter = pollTable.iterator();
- while (iter.hasNext()) {
- PollTableEntry entry = (PollTableEntry) iter.next();
- if (service.getName().equals(entry.getServiceName())) {
- cancelPoll(service);
- removeTable.add(entry);
- }
+ return null;
}
}
}