Author: veithen Date: Sun Jun 14 15:02:31 2009 New Revision: 784570 URL: http://svn.apache.org/viewvc?rev=784570&view=rev Log: Refactoring so that the logic that AbstractPollingTransportListener and the other AbstractTransportListener implementations have in common is moved to a single base class.
Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollTableEntry.java webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/ProtocolEndpoint.java webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramEndpoint.java webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/PollTableEntry.java webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/Endpoint.java webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollTableEntry.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollTableEntry.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollTableEntry.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollTableEntry.java Sun Jun 14 15:02:31 2009 @@ -21,7 +21,15 @@ import java.util.TimerTask; +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterInclude; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + public abstract class AbstractPollTableEntry extends ProtocolEndpoint { + private static final Log log = LogFactory.getLog(AbstractPollTableEntry.class); + // status of last scan public static final int SUCCSESSFUL = 0; public static final int WITH_ERRORS = 1; @@ -82,4 +90,27 @@ public void setConcurrentPollingAllowed(boolean concurrentPollingAllowed) { this.concurrentPollingAllowed = concurrentPollingAllowed; } + + @Override + public boolean loadConfiguration(ParameterInclude params) throws AxisFault { + Parameter param = params.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL); + pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL; + if (param != null && param.getValue() instanceof String) { + String s = (String)param.getValue(); + int multiplier; + if (s.endsWith("ms")) { + s = s.substring(0, s.length()-2); + multiplier = 1; + } else { + multiplier = 1000; + } + try { + pollInterval = Integer.parseInt(s) * multiplier; + } catch (NumberFormatException e) { + log.error("Invalid poll interval : " + param.getValue() + ", default to : " + + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e); + } + } + return true; + } } Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java Sun Jun 14 15:02:31 2009 @@ -18,11 +18,7 @@ */ package org.apache.axis2.transport.base; -import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.description.Parameter; -import org.apache.axis2.description.ParameterInclude; import org.apache.axis2.description.TransportInDescription; import org.apache.axis2.AxisFault; @@ -32,12 +28,12 @@ import java.util.Timer; public abstract class AbstractPollingTransportListener<T extends AbstractPollTableEntry> - extends AbstractTransportListener { + extends AbstractTransportListenerEx<T> { /** The main timer. */ private Timer timer; /** Keep the list of endpoints and poll durations */ - private final List<T> pollTable = new ArrayList<T>(); + private final List<T> endpoints = new ArrayList<T>(); @Override public void init(ConfigurationContext cfgCtx, @@ -45,29 +41,10 @@ timer = new Timer("PollTimer"); super.init(cfgCtx, transportIn); - T entry = createPollTableEntry(transportIn); - if (entry != null) { - entry.setPollInterval(getPollInterval(transportIn)); - schedulePoll(entry); - pollTable.add(entry); - } } @Override public void destroy() { - // Explicitly cancel all polls not predispatched to services. All other polls will - // be canceled by stopListeningForService. Pay attention to the fact the cancelPoll - // modifies pollTable. - List<T> entriesToCancel = new ArrayList<T>(); - for (T entry : pollTable) { - if (entry.getService() == null) { - entriesToCancel.add(entry); - } - } - for (T entry : entriesToCancel) { - cancelPoll(entry); - } - super.destroy(); timer.cancel(); timer = null; @@ -110,12 +87,17 @@ } } - private void cancelPoll(T entry) { - synchronized (entry) { - entry.timerTask.cancel(); - entry.canceled = true; + @Override + protected void startEndpoint(T endpoint) throws AxisFault { + schedulePoll(endpoint); + } + + @Override + protected void stopEndpoint(T endpoint) { + synchronized (endpoint) { + endpoint.timerTask.cancel(); + endpoint.canceled = true; } - pollTable.remove(entry); } protected abstract void poll(T entry); @@ -149,86 +131,6 @@ onPollCompletion(entry); } - private long getPollInterval(ParameterInclude params) { - Parameter param = params.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL); - long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL; - if (param != null && param.getValue() instanceof String) { - String s = (String)param.getValue(); - int multiplier; - if (s.endsWith("ms")) { - s = s.substring(0, s.length()-2); - multiplier = 1; - } else { - multiplier = 1000; - } - try { - pollInterval = Integer.parseInt(s) * multiplier; - } catch (NumberFormatException e) { - log.error("Invalid poll interval : " + param.getValue() + ", default to : " - + (BaseConstants.DEFAULT_POLL_INTERVAL / 1000) + "sec", e); - } - } - return pollInterval; - } - - @Override - protected void startListeningForService(AxisService service) throws AxisFault { - T entry = createPollTableEntry(service); - if (entry == null) { - throw new AxisFault("The service has no configuration for the transport"); - } - entry.setService(service); - entry.setPollInterval(getPollInterval(service)); - schedulePoll(entry); - pollTable.add(entry); - } - - /** - * Create a poll table entry based on the provided parameters. - * If no relevant parameters are found, the implementation should - * return null. An exception should only be thrown if there is an - * error or inconsistency in the parameters. - * - * @param params The source of the parameters to construct the - * poll table entry. If the parameters were defined on - * a service, this will be an {...@link AxisService} - * instance. - * @return - */ - protected abstract T createPollTableEntry(ParameterInclude params) throws AxisFault; - - /** - * Get the EPR for the given service - * - * @param serviceName service name - * @param ip ignored - * @return the EPR for the service - * @throws AxisFault not used - */ - public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { - for (T entry : pollTable) { - AxisService service = entry.getService(); - if (service != null) { - String candidateName = service.getName(); - if (candidateName.equals(serviceName) || - serviceName.startsWith(candidateName + ".")) { - return entry.getEndpointReferences(ip); - } - } - } - return null; - } - - @Override - protected void stopListeningForService(AxisService service) { - for (T entry : pollTable) { - if (service == entry.getService()) { - cancelPoll(entry); - break; - } - } - } - // -- jmx/management methods-- /** * Pause the listener - Stop accepting/processing new messages, but continues processing existing Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListenerEx.java Sun Jun 14 15:02:31 2009 @@ -18,14 +18,16 @@ */ package org.apache.axis2.transport.base; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.List; import org.apache.axis2.AxisFault; import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.TransportInDescription; /** * Partial implementation of {...@link AbstractTransportListener} with a higher level @@ -41,7 +43,39 @@ extends AbstractTransportListener { /** A Map of service name to the protocol endpoints */ - private Map<String,E> endpoints = new HashMap<String,E>(); + private List<E> endpoints = new ArrayList<E>(); + + @Override + public void init(ConfigurationContext cfgCtx, + TransportInDescription transportIn) throws AxisFault { + + super.init(cfgCtx, transportIn); + + // Create endpoint configured at transport level (if available) + E endpoint = createEndpoint(); + if (endpoint.loadConfiguration(transportIn)) { + startEndpoint(endpoint); + endpoints.add(endpoint); + } + } + + @Override + public void destroy() { + // Explicitly stop all endpoints not predispatched to services. All other endpoints will + // be stopped by stopListeningForService. + List<E> endpointsToStop = new ArrayList<E>(); + for (E endpoint : endpoints) { + if (endpoint.getService() == null) { + endpointsToStop.add(endpoint); + } + } + for (E endpoint : endpointsToStop) { + stopEndpoint(endpoint); + endpoints.remove(endpoint); + } + + super.destroy(); + } @Override public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { @@ -53,16 +87,19 @@ if (serviceName.indexOf('.') != -1) { serviceName = serviceName.substring(0, serviceName.indexOf('.')); } - E endpoint = endpoints.get(serviceName); - if (endpoint != null) { - return endpoint.getEndpointReferences(ip); - } else { - return null; + for (E endpoint : endpoints) { + AxisService service = endpoint.getService(); + if (service != null) { + if (service.getName().equals(serviceName)) { + return endpoint.getEndpointReferences(ip); + } + } } + return null; } public final Collection<E> getEndpoints() { - return Collections.unmodifiableCollection(endpoints.values()); + return Collections.unmodifiableCollection(endpoints); } protected abstract E createEndpoint(); @@ -71,22 +108,28 @@ protected final void startListeningForService(AxisService service) throws AxisFault { E endpoint = createEndpoint(); endpoint.setService(service); - configureAndStartEndpoint(endpoint, service); - endpoints.put(service.getName(), endpoint); + if (endpoint.loadConfiguration(service)) { + startEndpoint(endpoint); + endpoints.add(endpoint); + } else { + throw new AxisFault("Service doesn't have configuration information for transport " + + getTransportName()); + } } - protected abstract void configureAndStartEndpoint(E endpoint, AxisService service) throws AxisFault; + protected abstract void startEndpoint(E endpoint) throws AxisFault; @Override protected final void stopListeningForService(AxisService service) { - E endpoint = endpoints.get(service.getName()); - if (endpoint != null) { - stopEndpoint(endpoint); - endpoints.remove(service.getName()); - } else { - log.error("Unable to stop service : " + service.getName() + - " - unable to find the corresponding protocol endpoint"); + for (E endpoint : endpoints) { + if (service == endpoint.getService()) { + stopEndpoint(endpoint); + endpoints.remove(endpoint); + return; + } } + log.error("Unable to stop service : " + service.getName() + + " - unable to find the corresponding protocol endpoint"); } protected abstract void stopEndpoint(E endpoint); Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/ProtocolEndpoint.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/ProtocolEndpoint.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/ProtocolEndpoint.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/ProtocolEndpoint.java Sun Jun 14 15:02:31 2009 @@ -21,6 +21,7 @@ import org.apache.axis2.AxisFault; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.ParameterInclude; /** * Describes a protocol specific endpoint. This might be a TCP/UDP port, a mail account, @@ -45,6 +46,24 @@ } /** + * Configure the endpoint based on the provided parameters. + * If no relevant parameters are found, the implementation should + * return <code>false</code>. An exception should only be thrown if there is an + * error or inconsistency in the parameters. + * + * @param params The source of the parameters to construct the + * poll table entry. If the parameters are defined on + * a service, this will be an {...@link AxisService} + * instance. + * @return <code>true</code> if the parameters contained the required configuration + * information and the endpoint has been configured, <code>false</code> if + * the no configuration for the endpoint is present in the parameters + * @throws AxisFault if configuration information is present, but there is an + * error or inconsistency in the parameters + */ + public abstract boolean loadConfiguration(ParameterInclude params) throws AxisFault; + + /** * Get the endpoint references for this protocol endpoint. * * @param ip The host name or IP address of the local host. The implementation should use Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/AbstractDatagramTransportListener.java Sun Jun 14 15:02:31 2009 @@ -23,10 +23,8 @@ import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.description.AxisService; import org.apache.axis2.description.TransportInDescription; import org.apache.axis2.transport.base.AbstractTransportListenerEx; -import org.apache.axis2.transport.base.ParamUtils; public abstract class AbstractDatagramTransportListener<E extends DatagramEndpoint> extends AbstractTransportListenerEx<E> { @@ -57,12 +55,17 @@ } @Override - protected void configureAndStartEndpoint(E endpoint, AxisService service) throws AxisFault { + protected final E createEndpoint() { + E endpoint = doCreateEndpoint(); endpoint.setListener(this); - endpoint.setContentType(ParamUtils.getRequiredParam( - service, "transport." + getTransportName() + ".contentType")); endpoint.setMetrics(metrics); - + return endpoint; + } + + protected abstract E doCreateEndpoint(); + + @Override + protected void startEndpoint(E endpoint) throws AxisFault { try { dispatcher.addEndpoint(endpoint); } catch (IOException ex) { @@ -72,7 +75,7 @@ if (log.isDebugEnabled()) { log.debug("Started listening on endpoint " + endpoint.getEndpointReferences(defaultIp)[0] + " [contentType=" + endpoint.getContentType() - + "; service=" + service.getName() + "]"); + + "; service=" + endpoint.getServiceName() + "]"); } } Modified: webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramEndpoint.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramEndpoint.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramEndpoint.java (original) +++ webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/datagram/DatagramEndpoint.java Sun Jun 14 15:02:31 2009 @@ -18,7 +18,10 @@ */ package org.apache.axis2.transport.base.datagram; +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.ParameterInclude; import org.apache.axis2.transport.base.MetricsCollector; +import org.apache.axis2.transport.base.ParamUtils; import org.apache.axis2.transport.base.ProtocolEndpoint; /** @@ -46,10 +49,6 @@ return contentType; } - public void setContentType(String contentType) { - this.contentType = contentType; - } - public MetricsCollector getMetrics() { return metrics; } @@ -57,4 +56,11 @@ public void setMetrics(MetricsCollector metrics) { this.metrics = metrics; } + + @Override + public boolean loadConfiguration(ParameterInclude params) throws AxisFault { + contentType = ParamUtils.getRequiredParam( + params, "transport." + listener.getTransportName() + ".contentType"); + return true; + } } Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java (original) +++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java Sun Jun 14 15:02:31 2009 @@ -15,10 +15,20 @@ */ package org.apache.axis2.transport.jms; +import org.apache.axis2.AxisFault; +import org.apache.axis2.description.AxisService; import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterInclude; +import org.apache.axis2.transport.base.BaseConstants; import org.apache.axis2.transport.base.ProtocolEndpoint; +import org.apache.axis2.transport.base.threads.WorkerPool; +import org.apache.axis2.transport.jms.ctype.ContentTypeRuleFactory; import org.apache.axis2.transport.jms.ctype.ContentTypeRuleSet; +import org.apache.axis2.transport.jms.ctype.MessageTypeRule; +import org.apache.axis2.transport.jms.ctype.PropertyRule; import org.apache.axis2.addressing.EndpointReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.util.List; import java.util.ArrayList; @@ -26,6 +36,8 @@ import java.util.Set; import java.util.HashSet; +import javax.jms.BytesMessage; +import javax.jms.TextMessage; import javax.naming.Context; /** @@ -34,6 +46,11 @@ * into Axis2. */ public class JMSEndpoint extends ProtocolEndpoint { + private static final Log log = LogFactory.getLog(JMSEndpoint.class); + + private final JMSListener listener; + private final WorkerPool workerPool; + private JMSConnectionFactory cf; private String jndiDestinationName; private int destinationType = JMSConstants.GENERIC; @@ -41,15 +58,16 @@ private ContentTypeRuleSet contentTypeRuleSet; private ServiceTaskManager serviceTaskManager; - public String getJndiDestinationName() { - return jndiDestinationName; + public JMSEndpoint(JMSListener listener, WorkerPool workerPool) { + this.listener = listener; + this.workerPool = workerPool; } - public void setJndiDestinationName(String destinationJNDIName) { - this.jndiDestinationName = destinationJNDIName; + public String getJndiDestinationName() { + return jndiDestinationName; } - public void setDestinationType(String destinationType) { + private void setDestinationType(String destinationType) { if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) { this.destinationType = JMSConstants.TOPIC; } else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) { @@ -64,7 +82,7 @@ return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]); } - public void computeEPRs() { + private void computeEPRs() { List<EndpointReference> eprs = new ArrayList<EndpointReference>(); for (Object o : getService().getParameters()) { Parameter p = (Parameter) o; @@ -130,18 +148,10 @@ return contentTypeRuleSet; } - public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) { - this.contentTypeRuleSet = contentTypeRuleSet; - } - public JMSConnectionFactory getCf() { return cf; } - public void setCf(JMSConnectionFactory cf) { - this.cf = cf; - } - public ServiceTaskManager getServiceTaskManager() { return serviceTaskManager; } @@ -149,4 +159,58 @@ public void setServiceTaskManager(ServiceTaskManager serviceTaskManager) { this.serviceTaskManager = serviceTaskManager; } + + @Override + public boolean loadConfiguration(ParameterInclude params) throws AxisFault { + // We only support endpoints configured at service level + if (!(params instanceof AxisService)) { + return false; + } + + AxisService service = (AxisService)params; + + cf = listener.getConnectionFactory(service); + if (cf == null) { + return false; + } + + Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION); + if (destParam != null) { + jndiDestinationName = (String)destParam.getValue(); + } else { + // Assume that the JNDI destination name is the same as the service name + jndiDestinationName = service.getName(); + } + + Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE); + if (destTypeParam != null) { + String paramValue = (String) destTypeParam.getValue(); + if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || + JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) { + setDestinationType(paramValue); + } else { + throw new AxisFault("Invalid destinaton type value " + paramValue); + } + } else { + log.debug("JMS destination type not given. default queue"); + destinationType = JMSConstants.QUEUE; + } + + Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM); + if (contentTypeParam == null) { + contentTypeRuleSet = new ContentTypeRuleSet(); + contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE)); + contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream")); + contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain")); + } else { + contentTypeRuleSet = ContentTypeRuleFactory.parse(contentTypeParam); + } + + computeEPRs(); // compute service EPR and keep for later use + + serviceTaskManager = ServiceTaskManagerFactory.createTaskManagerForService(cf, service, workerPool); + serviceTaskManager.setJmsMessageReceiver(new JMSMessageReceiver(listener, cf, this)); + + return true; + } } Modified: webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java Sun Jun 14 15:02:31 2009 @@ -27,13 +27,6 @@ import org.apache.axis2.transport.base.event.TransportErrorListener; import org.apache.axis2.transport.base.event.TransportErrorSource; import org.apache.axis2.transport.base.event.TransportErrorSourceSupport; -import org.apache.axis2.transport.jms.ctype.ContentTypeRuleFactory; -import org.apache.axis2.transport.jms.ctype.ContentTypeRuleSet; -import org.apache.axis2.transport.jms.ctype.MessageTypeRule; -import org.apache.axis2.transport.jms.ctype.PropertyRule; - -import javax.jms.BytesMessage; -import javax.jms.TextMessage; /** * The revamped JMS Transport listener implementation. Creates {...@link ServiceTaskManager} instances @@ -78,7 +71,7 @@ @Override protected JMSEndpoint createEndpoint() { - return new JMSEndpoint(); + return new JMSEndpoint(this, workerPool); } /** @@ -87,54 +80,10 @@ * @param service the Axis service for which to listen for messages */ @Override - protected void configureAndStartEndpoint(JMSEndpoint endpoint, AxisService service) throws AxisFault { - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf == null) { - throw new AxisFault("The service doesn't specify a JMS connection factory or refers " + - "to an invalid factory."); - } - - endpoint.setCf(cf); - - Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION); - if (destParam != null) { - endpoint.setJndiDestinationName((String)destParam.getValue()); - } else { - // Assume that the JNDI destination name is the same as the service name - endpoint.setJndiDestinationName(service.getName()); - } - - Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE); - if (destTypeParam != null) { - String paramValue = (String) destTypeParam.getValue(); - if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || - JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) { - endpoint.setDestinationType(paramValue); - } else { - throw new AxisFault("Invalid destinaton type value " + paramValue); - } - } else { - log.debug("JMS destination type not given. default queue"); - endpoint.setDestinationType(JMSConstants.DESTINATION_TYPE_QUEUE); - } - - Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM); - if (contentTypeParam == null) { - ContentTypeRuleSet contentTypeRuleSet = new ContentTypeRuleSet(); - contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE)); - contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream")); - contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain")); - endpoint.setContentTypeRuleSet(contentTypeRuleSet); - } else { - endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam)); - } - - endpoint.computeEPRs(); // compute service EPR and keep for later use + protected void startEndpoint(JMSEndpoint endpoint) throws AxisFault { + ServiceTaskManager stm = endpoint.getServiceTaskManager(); - ServiceTaskManager stm = ServiceTaskManagerFactory.createTaskManagerForService(cf, service, workerPool); - stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint)); stm.start(); - endpoint.setServiceTaskManager(stm); for (int i=0; i<3; i++) { if (stm.getActiveTaskCount() > 0) { Modified: webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java Sun Jun 14 15:02:31 2009 @@ -737,109 +737,8 @@ } @Override - protected PollTableEntry createPollTableEntry(ParameterInclude paramIncl) throws AxisFault { - String address = - ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_ADDRESS); - if (address == null) { - return null; - } else { - PollTableEntry entry = new PollTableEntry(); - try { - entry.setEmailAddress(address); - } catch (AddressException e) { - throw new AxisFault("Invalid email address specified by '" + - MailConstants.TRANSPORT_MAIL_ADDRESS + "' parameter :: " + e.getMessage()); - } - - List<Parameter> params = paramIncl.getParameters(); - Properties props = new Properties(); - for (Parameter p : params) { - if (p.getName().startsWith("mail.")) { - props.setProperty(p.getName(), (String) p.getValue()); - } - - if (MailConstants.MAIL_POP3_USERNAME.equals(p.getName()) || - MailConstants.MAIL_IMAP_USERNAME.equals(p.getName())) { - entry.setUserName((String) p.getValue()); - } - if (MailConstants.MAIL_POP3_PASSWORD.equals(p.getName()) || - MailConstants.MAIL_IMAP_PASSWORD.equals(p.getName())) { - entry.setPassword((String) p.getValue()); - } - if (MailConstants.TRANSPORT_MAIL_PROTOCOL.equals(p.getName())) { - entry.setProtocol((String) p.getValue()); - } - } - - Session session = Session.getInstance(props, null); - MailUtils.setupLogging(session, log, paramIncl); - entry.setSession(session); - - entry.setContentType( - ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_CONTENT_TYPE)); - try { - entry.setReplyAddress( - ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_REPLY_ADDRESS)); - } catch (AddressException e) { - throw new AxisFault("Invalid email address specified by '" + - MailConstants.TRANSPORT_MAIL_REPLY_ADDRESS + "' parameter :: " + - e.getMessage()); - } - - entry.setFolder( - ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_FOLDER)); - - entry.addPreserveHeaders( - ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_PRESERVE_HEADERS)); - entry.addRemoveHeaders( - ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_REMOVE_HEADERS)); - - String option = ParamUtils.getOptionalParam( - paramIncl, MailConstants.TRANSPORT_MAIL_ACTION_AFTER_PROCESS); - entry.setActionAfterProcess( - MOVE.equals(option) ? PollTableEntry.MOVE : PollTableEntry.DELETE); - option = ParamUtils.getOptionalParam( - paramIncl, MailConstants.TRANSPORT_MAIL_ACTION_AFTER_FAILURE); - entry.setActionAfterFailure( - MOVE.equals(option) ? PollTableEntry.MOVE : PollTableEntry.DELETE); - - String moveFolderAfterProcess = ParamUtils.getOptionalParam( - paramIncl, MailConstants.TRANSPORT_MAIL_MOVE_AFTER_PROCESS); - entry.setMoveAfterProcess(moveFolderAfterProcess); - String modeFolderAfterFailure = ParamUtils.getOptionalParam( - paramIncl, MailConstants.TRANSPORT_MAIL_MOVE_AFTER_FAILURE); - entry.setMoveAfterFailure(modeFolderAfterFailure); - - String processInParallel = ParamUtils.getOptionalParam( - paramIncl, MailConstants.TRANSPORT_MAIL_PROCESS_IN_PARALLEL); - if (processInParallel != null) { - entry.setProcessingMailInParallel(Boolean.parseBoolean(processInParallel)); - if (log.isDebugEnabled() && entry.isProcessingMailInParallel()) { - log.debug("Parallel mail processing enabled for : " + address); - } - } - - String pollInParallel = ParamUtils.getOptionalParam( - paramIncl, BaseConstants.TRANSPORT_POLL_IN_PARALLEL); - if (pollInParallel != null) { - entry.setConcurrentPollingAllowed(Boolean.parseBoolean(pollInParallel)); - if (log.isDebugEnabled() && entry.isConcurrentPollingAllowed()) { - log.debug("Concurrent mail polling enabled for : " + address); - } - } - - String strMaxRetryCount = ParamUtils.getOptionalParam( - paramIncl, MailConstants.MAX_RETRY_COUNT); - if (strMaxRetryCount != null) - entry.setMaxRetryCount(Integer.parseInt(strMaxRetryCount)); - - String strReconnectTimeout = ParamUtils.getOptionalParam( - paramIncl, MailConstants.RECONNECT_TIMEOUT); - if (strReconnectTimeout != null) - entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout) * 1000); - - return entry; - } + protected PollTableEntry createEndpoint() { + return new PollTableEntry(log); } public void addErrorListener(TransportErrorListener listener) { Modified: webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/PollTableEntry.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/PollTableEntry.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/PollTableEntry.java (original) +++ webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/PollTableEntry.java Sun Jun 14 15:02:31 2009 @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.StringTokenizer; import java.util.Collections; @@ -28,14 +29,21 @@ import javax.mail.internet.AddressException; import javax.mail.internet.InternetAddress; +import org.apache.axis2.AxisFault; import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterInclude; import org.apache.axis2.transport.base.AbstractPollTableEntry; +import org.apache.axis2.transport.base.BaseConstants; +import org.apache.axis2.transport.base.ParamUtils; +import org.apache.commons.logging.Log; /** * Holds information about an entry in the VFS transport poll table used by the * VFS Transport Listener */ public class PollTableEntry extends AbstractPollTableEntry { + private final Log log; // operation after mail check public static final int DELETE = 0; @@ -85,6 +93,10 @@ private int maxRetryCount; private long reconnectTimeout; + public PollTableEntry(Log log) { + this.log = log; + } + @Override public EndpointReference[] getEndpointReferences(String ip) { return new EndpointReference[] { new EndpointReference(MailConstants.TRANSPORT_PREFIX @@ -95,108 +107,54 @@ return emailAddress; } - public void setEmailAddress(String emailAddress) throws AddressException { - this.emailAddress = new InternetAddress(emailAddress); - } - public String getUserName() { return userName; } - public void setUserName(String userName) { - this.userName = userName; - } - public String getPassword() { return password; } - public void setPassword(String password) { - this.password = password; - } - public String getXServicePath() { return xServicePath; } - public void setXServicePath(String xServicePath) { - this.xServicePath = xServicePath; - } - public String getContentType() { return contentType; } - public void setContentType(String contentType) { - this.contentType = contentType; - } - public int getActionAfterProcess() { return actionAfterProcess; } - public void setActionAfterProcess(int actionAfterProcess) { - this.actionAfterProcess = actionAfterProcess; - } - public int getActionAfterFailure() { return actionAfterFailure; } - public void setActionAfterFailure(int actionAfterFailure) { - this.actionAfterFailure = actionAfterFailure; - } - public String getMoveAfterProcess() { return moveAfterProcess; } - public void setMoveAfterProcess(String moveAfterProcess) { - this.moveAfterProcess = moveAfterProcess; - } - public String getMoveAfterFailure() { return moveAfterFailure; } - public void setMoveAfterFailure(String moveAfterFailure) { - this.moveAfterFailure = moveAfterFailure; - } - public int getMaxRetryCount() { return maxRetryCount; } - public void setMaxRetryCount(int maxRetryCount) { - this.maxRetryCount = maxRetryCount; - } - public long getReconnectTimeout() { return reconnectTimeout; } - public void setReconnectTimeout(long reconnectTimeout) { - this.reconnectTimeout = reconnectTimeout; - } - public String getFolder() { return folder; } - public void setFolder(String folder) { - this.folder = folder; - } - public InternetAddress getReplyAddress() { return replyAddress; } - public void setReplyAddress(String replyAddress) throws AddressException { - if (replyAddress != null) { - this.replyAddress = new InternetAddress(replyAddress); - } - } - /** * Get the mail store protocol. * This protocol identifier is used in calls to {...@link Session#getStore()}. @@ -207,25 +165,11 @@ return protocol; } - /** - * Set the mail store protocol. - * This protocol identifier is used in calls to {...@link Session#getStore()}. - * - * @param protocol the mail store protocol - */ - public void setProtocol(String protocol) { - this.protocol = protocol; - } - public Session getSession() { return session; } - public void setSession(Session session) { - this.session = session; - } - - public void addPreserveHeaders(String headerList) { + private void addPreserveHeaders(String headerList) { if (headerList == null) return; StringTokenizer st = new StringTokenizer(headerList, " ,"); preserveHeaders = new ArrayList<String>(); @@ -237,7 +181,7 @@ } } - public void addRemoveHeaders(String headerList) { + private void addRemoveHeaders(String headerList) { if (headerList == null) return; StringTokenizer st = new StringTokenizer(headerList, " ,"); removeHeaders = new ArrayList<String>(); @@ -263,8 +207,111 @@ return processingMailInParallel; } - public void setProcessingMailInParallel(boolean processingMailInParallel) { - this.processingMailInParallel = processingMailInParallel; + @Override + public boolean loadConfiguration(ParameterInclude paramIncl) throws AxisFault { + String address = + ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_ADDRESS); + if (address == null) { + return false; + } else { + try { + emailAddress = new InternetAddress(address); + } catch (AddressException e) { + throw new AxisFault("Invalid email address specified by '" + + MailConstants.TRANSPORT_MAIL_ADDRESS + "' parameter :: " + e.getMessage()); + } + + List<Parameter> params = paramIncl.getParameters(); + Properties props = new Properties(); + for (Parameter p : params) { + if (p.getName().startsWith("mail.")) { + props.setProperty(p.getName(), (String) p.getValue()); + } + + if (MailConstants.MAIL_POP3_USERNAME.equals(p.getName()) || + MailConstants.MAIL_IMAP_USERNAME.equals(p.getName())) { + userName = (String) p.getValue(); + } + if (MailConstants.MAIL_POP3_PASSWORD.equals(p.getName()) || + MailConstants.MAIL_IMAP_PASSWORD.equals(p.getName())) { + password = (String) p.getValue(); + } + if (MailConstants.TRANSPORT_MAIL_PROTOCOL.equals(p.getName())) { + protocol = (String) p.getValue(); + } + } + + session = Session.getInstance(props, null); + MailUtils.setupLogging(session, log, paramIncl); + + contentType = + ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_CONTENT_TYPE); + try { + String replyAddress = + ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_REPLY_ADDRESS); + if (replyAddress != null) { + this.replyAddress = new InternetAddress(replyAddress); + } + } catch (AddressException e) { + throw new AxisFault("Invalid email address specified by '" + + MailConstants.TRANSPORT_MAIL_REPLY_ADDRESS + "' parameter :: " + + e.getMessage()); + } + + folder = + ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_FOLDER); + + addPreserveHeaders( + ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_PRESERVE_HEADERS)); + addRemoveHeaders( + ParamUtils.getOptionalParam(paramIncl, MailConstants.TRANSPORT_MAIL_REMOVE_HEADERS)); + + String option = ParamUtils.getOptionalParam( + paramIncl, MailConstants.TRANSPORT_MAIL_ACTION_AFTER_PROCESS); + actionAfterProcess = + MailTransportListener.MOVE.equals(option) ? PollTableEntry.MOVE : PollTableEntry.DELETE; + option = ParamUtils.getOptionalParam( + paramIncl, MailConstants.TRANSPORT_MAIL_ACTION_AFTER_FAILURE); + actionAfterFailure = + MailTransportListener.MOVE.equals(option) ? PollTableEntry.MOVE : PollTableEntry.DELETE; + + moveAfterProcess = ParamUtils.getOptionalParam( + paramIncl, MailConstants.TRANSPORT_MAIL_MOVE_AFTER_PROCESS); + moveAfterFailure = ParamUtils.getOptionalParam( + paramIncl, MailConstants.TRANSPORT_MAIL_MOVE_AFTER_FAILURE); + + String processInParallel = ParamUtils.getOptionalParam( + paramIncl, MailConstants.TRANSPORT_MAIL_PROCESS_IN_PARALLEL); + if (processInParallel != null) { + processingMailInParallel = Boolean.parseBoolean(processInParallel); + if (log.isDebugEnabled() && processingMailInParallel) { + log.debug("Parallel mail processing enabled for : " + address); + } + } + + String pollInParallel = ParamUtils.getOptionalParam( + paramIncl, BaseConstants.TRANSPORT_POLL_IN_PARALLEL); + if (pollInParallel != null) { + setConcurrentPollingAllowed(Boolean.parseBoolean(pollInParallel)); + if (log.isDebugEnabled() && isConcurrentPollingAllowed()) { + log.debug("Concurrent mail polling enabled for : " + address); + } + } + + String strMaxRetryCount = ParamUtils.getOptionalParam( + paramIncl, MailConstants.MAX_RETRY_COUNT); + if (strMaxRetryCount != null) { + maxRetryCount = Integer.parseInt(strMaxRetryCount); + } + + String strReconnectTimeout = ParamUtils.getOptionalParam( + paramIncl, MailConstants.RECONNECT_TIMEOUT); + if (strReconnectTimeout != null) { + reconnectTimeout = Integer.parseInt(strReconnectTimeout) * 1000; + } + + return super.loadConfiguration(paramIncl); + } } public synchronized void processingUID(String uid) { Modified: webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/Endpoint.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/Endpoint.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/Endpoint.java (original) +++ webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/Endpoint.java Sun Jun 14 15:02:31 2009 @@ -22,6 +22,8 @@ import org.apache.axis2.AxisFault; import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.description.ParameterInclude; +import org.apache.axis2.transport.base.ParamUtils; import org.apache.axis2.transport.base.datagram.DatagramEndpoint; import org.apache.axis2.util.Utils; @@ -36,17 +38,20 @@ return port; } - public void setPort(int port) { - this.port = port; - } - public int getMaxPacketSize() { return maxPacketSize; } - public void setMaxPacketSize(int maxPacketSize) { - this.maxPacketSize = maxPacketSize; - } + @Override + public boolean loadConfiguration(ParameterInclude params) throws AxisFault { + port = ParamUtils.getOptionalParamInt(params, UDPConstants.PORT_KEY, -1); + if (port == -1) { + return false; + } + maxPacketSize = ParamUtils.getOptionalParamInt(params, UDPConstants.MAX_PACKET_SIZE_KEY, + UDPConstants.DEFAULT_MAX_PACKET_SIZE); + return super.loadConfiguration(params); + } @Override public EndpointReference[] getEndpointReferences(String ip) throws AxisFault { Modified: webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java?rev=784570&r1=784569&r2=784570&view=diff ============================================================================== --- webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java (original) +++ webservices/commons/trunk/modules/transport/modules/udp/src/main/java/org/apache/axis2/transport/udp/UDPListener.java Sun Jun 14 15:02:31 2009 @@ -20,10 +20,7 @@ import java.io.IOException; -import org.apache.axis2.AxisFault; -import org.apache.axis2.description.AxisService; import org.apache.axis2.transport.base.ManagementSupport; -import org.apache.axis2.transport.base.ParamUtils; import org.apache.axis2.transport.base.datagram.AbstractDatagramTransportListener; import org.apache.axis2.transport.base.datagram.DatagramDispatcherCallback; @@ -54,14 +51,7 @@ } @Override - protected Endpoint createEndpoint() { + protected Endpoint doCreateEndpoint() { return new Endpoint(); } - - @Override - protected void configureAndStartEndpoint(Endpoint endpoint, AxisService service) throws AxisFault { - endpoint.setPort(ParamUtils.getRequiredParamInt(service, UDPConstants.PORT_KEY)); - endpoint.setMaxPacketSize(ParamUtils.getOptionalParamInt(service, UDPConstants.MAX_PACKET_SIZE_KEY, UDPConstants.DEFAULT_MAX_PACKET_SIZE)); - super.configureAndStartEndpoint(endpoint, service); - } }