Author: veithen
Date: Sat Aug 23 10:41:44 2008
New Revision: 688383

URL: http://svn.apache.org/viewvc?rev=688383&view=rev
Log:
Merged duplicate code in MailTransportListener and VFSTransportListener into 
their base class AbstractPollingTransportListener. This should improve 
maintainability and make it easier to solve SYNAPSE-434.

Added:
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
Modified:
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
    
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java

Added: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java?rev=688383&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
 (added)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollTableEntry.java
 Sat Aug 23 10:41:44 2008
@@ -0,0 +1,83 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.transport.base;
+
+import org.apache.axis2.addressing.EndpointReference;
+
+public abstract class AbstractPollTableEntry {
+    // status of last scan
+    public static final int SUCCSESSFUL = 0;
+    public static final int WITH_ERRORS = 1;
+    public static final int FAILED      = 2;
+    public static final int NONE        = 3;
+
+    /** Axis2 service name */
+    private String serviceName;
+    /** next poll time */
+    private long nextPollTime;
+    /** last poll performed at */
+    private long lastPollTime;
+    /** duration in ms between successive polls */
+    private long pollInterval;
+    /** state of the last poll */
+    private int lastPollState;
+    
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+    
+    public abstract EndpointReference getEndpointReference();
+
+    public long getNextPollTime() {
+        return nextPollTime;
+    }
+
+    public void setNextPollTime(long nextPollTime) {
+        this.nextPollTime = nextPollTime;
+    }
+
+    public long getLastPollTime() {
+        return lastPollTime;
+    }
+
+    public void setLastPollTime(long lastPollTime) {
+        this.lastPollTime = lastPollTime;
+    }
+
+    public long getPollInterval() {
+        return pollInterval;
+    }
+
+    public void setPollInterval(long pollInterval) {
+        this.pollInterval = pollInterval;
+    }
+
+    public int getLastPollState() {
+        return lastPollState;
+    }
+
+    public void setLastPollState(int lastPollState) {
+        this.lastPollState = lastPollState;
+    }
+}

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
 Sat Aug 23 10:41:44 2008
@@ -18,26 +18,23 @@
 */
 package org.apache.synapse.transport.base;
 
+import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.description.TransportInDescription;
 import org.apache.axis2.AxisFault;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.TimerTask;
 import java.util.Timer;
 import java.util.Map;
 import java.util.HashMap;
 
-public abstract class AbstractPollingTransportListener extends 
AbstractTransportListener {
+public abstract class AbstractPollingTransportListener<T extends 
AbstractPollTableEntry>
+        extends AbstractTransportListener {
 
-    /** the parameter in the services.xml that specifies the poll interval for 
a service */
-    public static final String TRANSPORT_POLL_INTERVAL = 
"transport.PollInterval";
-    /** the default poll interval */
-    public static final int DEFAULT_POLL_INTERVAL = 5 * 60; // 5 mins by 
default
-
-    /** default interval in ms before polls */
-    protected int pollInterval = DEFAULT_POLL_INTERVAL;
     /** The main timer. */
     protected Timer timer;
     /** is a poll already executing? */
@@ -46,6 +43,10 @@
     protected final Object pollLock = new Object();
     /** a map that keeps track of services to the timer tasks created for them 
*/
     protected Map serviceToTimerTaskMap = new HashMap();
+    /** Keep the list of endpoints and poll durations */
+    private final List<T> pollTable = new ArrayList<T>();
+    /** Keep the list of removed pollTable entries */
+    private final List<T> removeTable = new ArrayList<T>();
 
     @Override
     public void init(ConfigurationContext cfgCtx,
@@ -124,38 +125,92 @@
         }
     }
 
-    public void onPoll() {}
+    public void onPoll() {
+        if (!removeTable.isEmpty()) {
+            pollTable.removeAll(removeTable);
+        }
 
-    protected void startListeningForService(AxisService service) {
+        for (T entry : pollTable) {
+            long startTime = System.currentTimeMillis();
+            if (startTime > entry.getNextPollTime()) {
+                poll(entry);
+            }
+        }
+    }
+    
+    protected abstract void poll(T entry);
 
-        if (service.getName().startsWith("__")) {
-            return;
+    /**
+     * method to log a failure to the log file and to update the last poll 
status and time
+     * @param msg text for the log message
+     * @param e optional exception encountered or null
+     * @param entry the PollTableEntry
+     */
+    protected void processFailure(String msg, Exception e, T entry) {
+        if (e == null) {
+            log.error(msg);
+        } else {
+            log.error(msg, e);
         }
+        long now = System.currentTimeMillis();
+        entry.setLastPollState(AbstractPollTableEntry.FAILED);
+        entry.setLastPollTime(now);
+        entry.setNextPollTime(now + entry.getPollInterval());
+    }
+
+    @Override
+    protected void startListeningForService(AxisService service) {
 
-        Parameter param = service.getParameter(TRANSPORT_POLL_INTERVAL);
-        long pollInterval = DEFAULT_POLL_INTERVAL;
+        Parameter param = 
service.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
+        long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
         if (param != null && param.getValue() instanceof String) {
             try {
                 pollInterval = Integer.parseInt(param.getValue().toString());
             } catch (NumberFormatException e) {
                 log.error("Invalid poll interval : " + param.getValue() + " 
for service : " +
-                    service.getName() + " Using defaults", e);
-                disableTransportForService(service);
+                    service.getName() + " default to : "
+                        + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + 
"sec", e);
             }
         }
-        schedulePoll(service, pollInterval);
-    }
-
-    protected void stopListeningForService(AxisService service) {
-        cancelPoll(service);
+        
+        T entry = createPollTableEntry(service);
+        if (entry == null) {
+            disableTransportForService(service);
+        } else {
+            entry.setServiceName(service.getName());
+            schedulePoll(service, pollInterval);
+            pollTable.add(entry);
+        }
     }
+    
+    protected abstract T createPollTableEntry(AxisService service);
 
-    public int getPollInterval() {
-        return pollInterval;
+    /**
+     * Get the EPR for the given service
+     * 
+     * @param serviceName service name
+     * @param ip          ignored
+     * @return the EPR for the service
+     * @throws AxisFault not used
+     */
+    public EndpointReference[] getEPRsForService(String serviceName, String 
ip) throws AxisFault {
+        for (T entry : pollTable) {
+            if (entry.getServiceName().equals(serviceName) ||
+                    serviceName.startsWith(entry.getServiceName() + ".")) {
+                return new EndpointReference[]{ entry.getEndpointReference() };
+            }
+        }
+        return null;
     }
 
-    public void setPollInterval(int pollInterval) {
-        this.pollInterval = pollInterval;
+    @Override
+    protected void stopListeningForService(AxisService service) {
+        for (T entry : pollTable) {
+            if (service.getName().equals(entry.getServiceName())) {
+                cancelPoll(service);
+                removeTable.add(entry);
+            }
+        }
     }
 
     // -- jmx/management methods--

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
 Sat Aug 23 10:41:44 2008
@@ -52,17 +52,12 @@
  * (e.g. with imap). When checking for new mail, the transport ignores 
messages already flaged as
  * SEEN and DELETED
  */
-public class MailTransportListener extends AbstractPollingTransportListener
+public class MailTransportListener extends 
AbstractPollingTransportListener<PollTableEntry>
     implements ManagementSupport {
 
     public static final String DELETE = "DELETE";
     public static final String MOVE = "MOVE";
 
-    /** Keep the list of email accounts and poll durations */
-    private final List<PollTableEntry> pollTable = new 
ArrayList<PollTableEntry>();
-    /** Keep the list of removed pollTable entries */
-    private final List<PollTableEntry> removeTable = new 
ArrayList<PollTableEntry>();
-
     /**
      * Initializes the Mail transport
      *
@@ -70,26 +65,15 @@
      * @param trpInDesc the POP3 transport in description from the axis2.xml
      * @throws AxisFault on error
      */
+    @Override
     public void init(ConfigurationContext cfgCtx, TransportInDescription 
trpInDesc)
         throws AxisFault {
         super.init(cfgCtx, trpInDesc);
     }
 
-    /**
-     * On a poller tick, iterate over the list of mail accounts and check if
-     * it is time to scan the contents for new files
-     */
-    public void onPoll() {
-        if (!removeTable.isEmpty()) {
-            pollTable.removeAll(removeTable);
-        }
-
-        for (PollTableEntry entry : pollTable) {
-            long startTime = System.currentTimeMillis();
-            if (startTime > entry.getNextPollTime()) {
-                checkMail(entry, entry.getEmailAddress());
-            }
-        }
+    @Override
+    protected void poll(PollTableEntry entry) {
+        checkMail(entry, entry.getEmailAddress());
     }
 
     /**
@@ -445,58 +429,8 @@
         }
     }
 
-
-    /**
-     * method to log a failure to the log file and to update the last poll 
status and time
-     *
-     * @param msg   text for the log message
-     * @param e     optiona exception encountered or null
-     * @param entry the PollTableEntry
-     */
-    private void processFailure(String msg, Exception e, PollTableEntry entry) 
{
-        if (e == null) {
-            log.error(msg);
-        } else {
-            log.error(msg, e);
-        }
-        long now = System.currentTimeMillis();
-        entry.setLastPollState(PollTableEntry.FAILED);
-        entry.setLastPollTime(now);
-        entry.setNextPollTime(now + entry.getPollInterval());
-    }
-
-
-    /**
-     * Get the EPR for the given service over the Mail transport
-     *
-     * @param serviceName service name
-     * @param ip          ignored
-     * @return the EPR for the service
-     * @throws AxisFault not used
-     */
-    public EndpointReference[] getEPRsForService(String serviceName, String 
ip) throws AxisFault {
-        for (PollTableEntry entry : pollTable) {
-            if (entry.getServiceName().equals(serviceName) ||
-                    serviceName.startsWith(entry.getServiceName() + ".")) {
-                return new EndpointReference[]{new EndpointReference(
-                        MailConstants.TRANSPORT_PREFIX + 
entry.getEmailAddress())};
-            }
-        }
-        return null;
-    }
-
-    protected void startListeningForService(AxisService service) {
-
-        Parameter param = 
service.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
-        long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
-        if (param != null && param.getValue() instanceof String) {
-            try {
-                pollInterval = Integer.parseInt(param.getValue().toString());
-            } catch (NumberFormatException e) {
-                log.error("Invalid poll interval : " + param.getValue() + " 
for service : " +
-                    service.getName() + " default to : " + 
(BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e);
-            }
-        }
+    @Override
+    protected PollTableEntry createPollTableEntry(AxisService service) {
 
         PollTableEntry entry = new PollTableEntry();
         try {
@@ -558,30 +492,19 @@
             if (strReconnectTimeout != null)
                 
entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout) * 1000);
 
-            entry.setServiceName(service.getName());
-            schedulePoll(service, pollInterval);
-            pollTable.add(entry);
-
+            return entry;
+            
         } catch (AxisFault axisFault) {
             String msg = "Error configuring the Mail transport for Service : " 
+
                 service.getName() + " :: " + axisFault.getMessage();
             log.warn(msg);
-            disableTransportForService(service);
+            return null;
         } catch (AddressException e) {
             String msg = "Error configuring the Mail transport for Service : " 
+
                 " Invalid email address specified by '" + 
MailConstants.TRANSPORT_MAIL_ADDRESS +
                 "'parameter for service : " + service.getName() + " :: " + 
e.getMessage();
             log.warn(msg);
-            disableTransportForService(service);
-        }
-    }
-
-    protected void stopListeningForService(AxisService service) {
-        for (PollTableEntry entry : pollTable) {
-            if (service.getName().equals(entry.getServiceName())) {
-                cancelPoll(service);
-                removeTable.add(entry);
-            }
+            return null;
         }
     }
 }

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/PollTableEntry.java
 Sat Aug 23 10:41:44 2008
@@ -19,31 +19,28 @@
 
 package org.apache.synapse.transport.mail;
 
-import javax.mail.Session;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.AddressException;
-import java.util.List;
 import java.util.ArrayList;
-import java.util.StringTokenizer;
+import java.util.List;
 import java.util.Properties;
+import java.util.StringTokenizer;
+
+import javax.mail.Session;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.InternetAddress;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.synapse.transport.base.AbstractPollTableEntry;
 
 /**
  * Holds information about an entry in the VFS transport poll table used by the
  * VFS Transport Listener
  */
-public class PollTableEntry {
-
-    // status of last mail check
-    public static final int SUCCSESSFUL = 0;
-    public static final int FAILED      = 2;
-    public static final int NONE        = 3;
+public class PollTableEntry extends AbstractPollTableEntry {
 
     // operation after mail check
     public static final int DELETE = 0;
     public static final int MOVE   = 1;
 
-    /** Axis2 service name */
-    private String serviceName;
     /** The email address mapped to the service */
     private InternetAddress emailAddress = null;
 
@@ -66,18 +63,9 @@
     private InternetAddress replyAddress = null;
 
     /** list of mail headers to be preserved into the Axis2 message as 
transport headers */
-    private List preserveHeaders = null;
+    private List<String> preserveHeaders = null;
     /** list of mail headers to be removed from the Axis2 message transport 
headers */
-    private List removeHeaders = null;
-
-    /** last poll performed at */
-    private long lastPollTime;
-    /** duration in ms between successive polls */
-    private long pollInterval;
-    /** next poll time */
-    private long nextPollTime;
-    /** state of the last poll */
-    private int lastPollState;
+    private List<String> removeHeaders = null;
 
     /** action to take after a successful poll */
     private int actionAfterProcess = DELETE;
@@ -92,6 +80,10 @@
     private int maxRetryCount;
     private long reconnectTimeout;
 
+    @Override
+    public EndpointReference getEndpointReference() {
+        return new EndpointReference(MailConstants.TRANSPORT_PREFIX + 
emailAddress);
+    }
 
     public InternetAddress getEmailAddress() {
         return emailAddress;
@@ -101,14 +93,6 @@
         this.emailAddress = new InternetAddress(emailAddress);
     }
 
-    public String getServiceName() {
-        return serviceName;
-    }
-
-    public void setServiceName(String serviceName) {
-        this.serviceName = serviceName;
-    }
-
     public String getUserName() {
         return userName;
     }
@@ -141,38 +125,6 @@
         this.contentType = contentType;
     }
 
-    public long getLastPollTime() {
-        return lastPollTime;
-    }
-
-    public void setLastPollTime(long lastPollTime) {
-        this.lastPollTime = lastPollTime;
-    }
-
-    public long getPollInterval() {
-        return pollInterval;
-    }
-
-    public void setPollInterval(long pollInterval) {
-        this.pollInterval = pollInterval;
-    }
-
-    public long getNextPollTime() {
-        return nextPollTime;
-    }
-
-    public void setNextPollTime(long nextPollTime) {
-        this.nextPollTime = nextPollTime;
-    }
-
-    public int getLastPollState() {
-        return lastPollState;
-    }
-
-    public void setLastPollState(int lastPollState) {
-        this.lastPollState = lastPollState;
-    }
-
     public int getActionAfterProcess() {
         return actionAfterProcess;
     }
@@ -270,7 +222,7 @@
     public void addPreserveHeaders(String headerList) {
         if (headerList == null) return;
         StringTokenizer st = new StringTokenizer(headerList, " ,");
-        preserveHeaders = new ArrayList();
+        preserveHeaders = new ArrayList<String>();
         while (st.hasMoreTokens()) {
             String token = st.nextToken();
             if (token.length() != 0) {
@@ -282,7 +234,7 @@
     public void addRemoveHeaders(String headerList) {
         if (headerList == null) return;
         StringTokenizer st = new StringTokenizer(headerList, " ,");
-        removeHeaders = new ArrayList();
+        removeHeaders = new ArrayList<String>();
         while (st.hasMoreTokens()) {
             String token = st.nextToken();
             if (token.length() != 0) {

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/PollTableEntry.java
 Sat Aug 23 10:41:44 2008
@@ -18,27 +18,21 @@
 */
 package org.apache.synapse.transport.vfs;
 
-import java.text.SimpleDateFormat;
 import java.text.DateFormat;
 
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.synapse.transport.base.AbstractPollTableEntry;
+
 /**
  * Holds information about an entry in the VFS transport poll table used by the
  * VFS Transport Listener
  */
-public class PollTableEntry {
-
-    // status of last scan
-    public static final int SUCCSESSFUL = 0;
-    public static final int WITH_ERRORS = 1;
-    public static final int FAILED      = 2;
-    public static final int NONE        = 3;
+public class PollTableEntry extends AbstractPollTableEntry {
 
     // operation after scan
     public static final int DELETE = 0;
     public static final int MOVE   = 1;
 
-    /** Axis2 service name */
-    private String serviceName;
     /** File or Directory to scan */
     private String fileURI;
     /** file name pattern for a directory or compressed file entry */
@@ -46,14 +40,6 @@
     /** Content-Type to use for the message */
     private String contentType;
 
-    /** last poll performed at */
-    private long lastPollTime;
-    /** duration in ms between successive polls */
-    private long pollInterval;
-    /** next poll time */
-    private long nextPollTime;
-    /** state of the last poll */
-    private int lastPollState;
     /** action to take after a successful poll */
     private int actionAfterProcess = DELETE;
     /** action to take after a poll with errors */
@@ -73,12 +59,9 @@
     private int maxRetryCount;
     private long reconnectTimeout;
 
-    public String getServiceName() {
-        return serviceName;
-    }
-
-    public void setServiceName(String serviceName) {
-        this.serviceName = serviceName;
+    @Override
+    public EndpointReference getEndpointReference() {
+        return new EndpointReference("vfs:" + fileURI);
     }
 
     public String getFileURI() {
@@ -109,38 +92,6 @@
         this.contentType = contentType;
     }
 
-    public long getLastPollTime() {
-        return lastPollTime;
-    }
-
-    public void setLastPollTime(long lastPollTime) {
-        this.lastPollTime = lastPollTime;
-    }
-
-    public long getPollInterval() {
-        return pollInterval;
-    }
-
-    public void setPollInterval(long pollInterval) {
-        this.pollInterval = pollInterval;
-    }
-
-    public long getNextPollTime() {
-        return nextPollTime;
-    }
-
-    public void setNextPollTime(long nextPollTime) {
-        this.nextPollTime = nextPollTime;
-    }
-
-    public int getLastPollState() {
-        return lastPollState;
-    }
-
-    public void setLastPollState(int lastPollState) {
-        this.lastPollState = lastPollState;
-    }
-
     public int getActionAfterProcess() {
         return actionAfterProcess;
     }

Modified: 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java?rev=688383&r1=688382&r2=688383&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
 (original)
+++ 
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/vfs/VFSTransportListener.java
 Sat Aug 23 10:41:44 2008
@@ -24,7 +24,6 @@
 import org.apache.synapse.transport.base.ManagementSupport;
 import org.apache.synapse.transport.base.ParamUtils;
 import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.description.*;
@@ -98,7 +97,7 @@
  * ftp://ftpuser:[EMAIL PROTECTED]/somefile.csv?passive=true
  * ftp://vfs:[EMAIL PROTECTED]/somepath/somefile.xml?passive=true
  */
-public class VFSTransportListener extends AbstractPollingTransportListener 
+public class VFSTransportListener extends 
AbstractPollingTransportListener<PollTableEntry> 
     implements ManagementSupport {
 
     public static final String TRANSPORT_NAME = "vfs";
@@ -106,10 +105,6 @@
     public static final String DELETE = "DELETE";
     public static final String MOVE = "MOVE";
 
-    /** Keep the list of directories/files and poll durations */
-    private final List<PollTableEntry> pollTable = new 
ArrayList<PollTableEntry>();
-    /** Keep the list of removed pollTable entries */
-    private final List<PollTableEntry> removeTable = new 
ArrayList<PollTableEntry>();
     /** The VFS file system manager */
     private FileSystemManager fsManager = null;
 
@@ -119,6 +114,7 @@
      * @param trpInDesc the VFS transport in description from the axis2.xml
      * @throws AxisFault on error
      */
+    @Override
     public void init(ConfigurationContext cfgCtx, TransportInDescription 
trpInDesc)
         throws AxisFault {
         super.init(cfgCtx, trpInDesc);
@@ -132,21 +128,9 @@
         }
     }
 
-    /**
-     * On a poller tick, iterate over the list of directories/files and check 
if
-     * it is time to scan the contents for new files
-     */
-    public void onPoll() {
-        if (!removeTable.isEmpty()) {
-            pollTable.removeAll(removeTable);
-        }
-        
-        for (PollTableEntry entry : pollTable) {
-            long startTime = System.currentTimeMillis();
-            if (startTime > entry.getNextPollTime()) {
-                scanFileOrDirectory(entry, entry.getFileURI());
-            }
-        }
+    @Override
+    protected void poll(PollTableEntry entry) {
+        scanFileOrDirectory(entry, entry.getFileURI());
     }
 
     /**
@@ -483,56 +467,8 @@
         }
     }
 
-    /**
-     * method to log a failure to the log file and to update the last poll 
status and time
-     * @param msg text for the log message
-     * @param e optiona exception encountered or null
-     * @param entry the PollTableEntry
-     */
-    private void processFailure(String msg, Exception e, PollTableEntry entry) 
{
-        if (e == null) {
-            log.error(msg);
-        } else {
-            log.error(msg, e);
-        }
-        long now = System.currentTimeMillis();
-        entry.setLastPollState(PollTableEntry.FAILED);
-        entry.setLastPollTime(now);
-        entry.setNextPollTime(now + entry.getPollInterval());
-    }
-
-    /**
-     * Get the EPR for the given service over the VFS transport
-     * vfs:uri (@see http://jakarta.apache.org/commons/vfs/filesystems.html 
for the URI formats)
-     * @param serviceName service name
-     * @param ip          ignored
-     * @return the EPR for the service
-     * @throws AxisFault not used
-     */
-    public EndpointReference[] getEPRsForService(String serviceName, String 
ip) throws AxisFault {
-        for (PollTableEntry entry : pollTable) {
-            if (entry.getServiceName().equals(serviceName)
-                    || serviceName.startsWith(entry.getServiceName() + ".")) {
-                return new EndpointReference[]{new EndpointReference("vfs:" + 
entry.getFileURI())};
-            }
-        }
-        return null;
-    }
-
-    protected void startListeningForService(AxisService service) {
-
-        Parameter param = 
service.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL);
-        long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL;
-        if (param != null && param.getValue() instanceof String) {
-            try {
-                pollInterval = Integer.parseInt(param.getValue().toString());
-            } catch (NumberFormatException e) {
-                log.error("Invalid poll interval : " + param.getValue() + " 
for service : " +
-                    service.getName() + " default to : "
-                        + (BaseConstants.DEFAULT_POLL_INTERVAL/1000) + "sec", 
e);
-                disableTransportForService(service);
-            }
-        }
+    @Override
+    protected PollTableEntry createPollTableEntry(AxisService service) {
 
         PollTableEntry entry = new PollTableEntry();
         try {
@@ -582,26 +518,13 @@
             if(strReconnectTimeout != null)
               entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout) 
* 1000);
             
-            entry.setServiceName(service.getName());
-            schedulePoll(service, pollInterval);            
-            pollTable.add(entry);
-
+            return entry;
+            
         } catch (AxisFault axisFault) {
             String msg = "Error configuring the File/VFS transport for Service 
: " +
                 service.getName() + " :: " + axisFault.getMessage();
             log.warn(msg);
-            disableTransportForService(service);
-        }
-    }
-
-    protected void stopListeningForService(AxisService service) {
-        Iterator iter = pollTable.iterator();
-        while (iter.hasNext()) {
-            PollTableEntry entry = (PollTableEntry) iter.next();
-            if (service.getName().equals(entry.getServiceName())) {
-                cancelPoll(service);
-                removeTable.add(entry);
-            }
+            return null;
         }
     }
 }


Reply via email to