Author: deepal Date: Fri May 18 01:54:37 2007 New Revision: 539326 URL: http://svn.apache.org/viewvc?view=rev&rev=539326 Log: improved SMTP transport , now the sync model work using callback and no queues are used. -need to call the operation cleanup when the MEP complete
Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/description/OutInAxisOperation.java webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/Constants.java webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/EMailSender.java webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorker.java webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorkerManager.java webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SimpleMailListener.java webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SynchronousMailListener.java Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/description/OutInAxisOperation.java URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/description/OutInAxisOperation.java?view=diff&rev=539326&r1=539325&r2=539326 ============================================================================== --- webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/description/OutInAxisOperation.java (original) +++ webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/description/OutInAxisOperation.java Fri May 18 01:54:37 2007 @@ -74,6 +74,7 @@ } else { mep.put(MESSAGE_LABEL_IN_VALUE, msgContext); opContext.setComplete(true); + opContext.cleanup(); } } Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/Constants.java URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/Constants.java?view=diff&rev=539326&r1=539325&r2=539326 ============================================================================== --- webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/Constants.java (original) +++ webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/Constants.java Fri May 18 01:54:37 2007 @@ -81,4 +81,5 @@ public static final String IN_REPLY_TO = "In-Reply-To"; public static final String MAILTO = "mailto"; public static final String MAPPING_TABLE = "mappingTable"; + public static final String CALLBACK_TABLE = "callbackTable"; } Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/EMailSender.java URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/EMailSender.java?view=diff&rev=539326&r1=539325&r2=539326 ============================================================================== --- webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/EMailSender.java (original) +++ webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/EMailSender.java Fri May 18 01:54:37 2007 @@ -17,12 +17,12 @@ package org.apache.axis2.transport.mail; -import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; import org.apache.axiom.attachments.ByteArrayDataSource; import org.apache.axiom.om.OMOutputFormat; import org.apache.axiom.soap.SOAP11Constants; import org.apache.axiom.soap.SOAP12Constants; import org.apache.axis2.AxisFault; +import org.apache.axis2.i18n.Messages; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.client.Options; import org.apache.axis2.context.ConfigurationContext; @@ -228,46 +228,36 @@ private void sendReceive(MessageContext msgContext, String msgId) throws AxisFault { storeMessageContext(msgContext, msgId); - ConfigurationContext cc = msgContext.getConfigurationContext(); - - SimpleMailListener simpleMailListener; - + //While sysncmial listner .not complete Options options = msgContext.getOptions(); + long outInMilliSeconds = options.getTimeOutInMilliSeconds(); + SynchronousMailListener synchronousMailListener = null; if (!options.isUseSeparateListener() && !msgContext.isServerSide()) { - Object obj = cc.getProperty(Constants.MAIL_SYNC); - - if (obj == null) { - SynchronousMailListener synchronousMailListener = - new SynchronousMailListener(options.getTimeOutInMilliSeconds(), new LinkedBlockingQueue()); - cc.setProperty(Constants.MAIL_SYNC, synchronousMailListener); - - simpleMailListener = synchronousMailListener.sendReceive(msgContext, msgId); - - TransportInDescription transportIn = msgContext.getConfigurationContext() - .getAxisConfiguration().getTransportIn(org.apache.axis2.Constants.TRANSPORT_MAIL); - - Object mailPOP3Obj= msgContext.getProperty(Constants.MAIL_POP3); - if (mailPOP3Obj != null) { - simpleMailListener.initFromRuntime((Properties) obj, msgContext); - } else { - simpleMailListener.init(msgContext.getConfigurationContext(), transportIn); + if(!cc.getListenerManager().isListenerRunning(Constants.MAILTO)){ + TransportInDescription mailTo= + cc.getAxisConfiguration().getTransportIn(Constants.MAILTO); + if(mailTo==null){ + throw new AxisFault("Could not found transport for " +Constants.MAILTO ); } - msgContext.getConfigurationContext().getThreadPool().execute(simpleMailListener); - - simpleMailListener.start(); - log.info("Simple Mail Listener started for the first time and response received"); - - - } else { - SynchronousMailListener synchronousMailListener = (SynchronousMailListener)obj; - synchronousMailListener.sendReceive(msgContext,msgId).start(); - log.info("Simple mail listener started and response received"); - + cc.getListenerManager().addListener(mailTo,false); } + Hashtable callBackTable = (Hashtable) cc.getProperty(Constants.CALLBACK_TABLE); + if(callBackTable!=null){ + synchronousMailListener = + new SynchronousMailListener(messageContext, outInMilliSeconds); + callBackTable.put(msgId,synchronousMailListener); + } + while(!synchronousMailListener.isComplete()){ + try { + Thread.sleep(6000); + } catch (InterruptedException e) { + throw new AxisFault(e); + } + } + callBackTable.remove(msgId); } - } private void storeMessageContext(MessageContext msgContext, String msgId) { Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorker.java URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorker.java?view=diff&rev=539326&r1=539325&r2=539326 ============================================================================== --- webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorker.java (original) +++ webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorker.java Fri May 18 01:54:37 2007 @@ -25,17 +25,15 @@ public class MailWorker implements Runnable { private ConfigurationContext configContext = null; - private LinkedBlockingQueue messageQueue; + private MessageContext messageContext; /** * Constructor for MailWorker * - * @param messageQueue - * @param reg */ - public MailWorker(ConfigurationContext reg, LinkedBlockingQueue messageQueue) { + public MailWorker(ConfigurationContext reg, MessageContext messageContext) { this.configContext = reg; - this.messageQueue = messageQueue; + this.messageContext = messageContext; } /** @@ -43,27 +41,23 @@ */ public void run() { AxisEngine engine = new AxisEngine(configContext); - MessageContext msgContext = null; // create and initialize a message context - while (true) { - try { - msgContext = (MessageContext) messageQueue.take(); - if (msgContext.getEnvelope().getBody().hasFault()) { - engine.receiveFault(msgContext); - } else { - engine.receive(msgContext); - } + try { + if (messageContext.getEnvelope().getBody().hasFault()) { + engine.receiveFault(messageContext); + } else { + engine.receive(messageContext); + } - } catch (Exception e) { - try { - if (msgContext != null) { - MessageContext faultContext = - MessageContextBuilder.createFaultMessageContext(msgContext, e); - engine.sendFault(faultContext); - } - } catch (Exception e1) { - // Ignore errors that would possibly happen this catch + } catch (Exception e) { + try { + if (messageContext != null&&!messageContext.isServerSide()) { + MessageContext faultContext = + MessageContextBuilder.createFaultMessageContext(messageContext, e); + engine.sendFault(faultContext); } + } catch (Exception e1) { + // Ignore errors that would possibly happen this catch } } Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorkerManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorkerManager.java?view=diff&rev=539326&r1=539325&r2=539326 ============================================================================== --- webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorkerManager.java (original) +++ webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/MailWorkerManager.java Fri May 18 01:54:37 2007 @@ -43,7 +43,7 @@ public void start() throws AxisFault { for (int i = 0; i < poolSize; i++) { - workerPool.execute(new MailWorker(configurationContext, messageQueue)); +// workerPool.execute(new MailWorker(configurationContext, messageQueue)); } } } Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SimpleMailListener.java URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SimpleMailListener.java?view=diff&rev=539326&r1=539325&r2=539326 ============================================================================== --- webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SimpleMailListener.java (original) +++ webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SimpleMailListener.java Fri May 18 01:54:37 2007 @@ -18,8 +18,6 @@ package org.apache.axis2.transport.mail; import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; -import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; -import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import org.apache.axiom.soap.SOAP12Constants; import org.apache.axiom.soap.SOAPEnvelope; @@ -35,7 +33,6 @@ import org.apache.axis2.transport.TransportListener; import org.apache.axis2.transport.TransportUtils; import org.apache.axis2.util.Utils; -import org.apache.axis2.util.threadpool.DefaultThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,7 +67,7 @@ /*This hold properties for pop3 or impa server connection*/ private Properties pop3Properties = new Properties(); - private EmailReceiver receiver = null; + private final EmailReceiver receiver ; /** * Time has been put from best guest. Let the default be 3 mins. @@ -80,24 +77,8 @@ */ private int listenerWaitInterval = 1000 * 60 * 3; - private ExecutorService workerPool; - - private static final int WORKERS_MAX_THREADS = 5; - private static final long WORKER_KEEP_ALIVE = 60L; - private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; - - private LinkedBlockingQueue messageQueue; - public SimpleMailListener() { - } - - /** - * This constructor will be used in when Mail simulate the request/response - * - * @param messageQueue - */ - public SimpleMailListener(LinkedBlockingQueue messageQueue) { - this.messageQueue = messageQueue; + receiver = new EmailReceiver(); } public void init(ConfigurationContext configurationContext, TransportInDescription transportIn) @@ -162,7 +143,6 @@ urlName = new URLName(protocol, host, Integer.parseInt(port), "", user, password); } - receiver = new EmailReceiver(); receiver.setPop3Properties(pop3Properties); receiver.setUrlName(urlName); Object obj = configurationContext. @@ -172,6 +152,13 @@ org.apache.axis2.transport.mail.Constants.MAPPING_TABLE, new Hashtable()); } + Object callBackTable = configurationContext. + getProperty(org.apache.axis2.transport.mail.Constants.CALLBACK_TABLE); + if (callBackTable == null) { + configurationContext.setProperty( + org.apache.axis2.transport.mail.Constants.CALLBACK_TABLE, new Hashtable()); + } + } @@ -214,7 +201,6 @@ urlName = new URLName(protocol, host, Integer.parseInt(port), "", user, password); } - receiver = new EmailReceiver(); receiver.setPop3Properties(pop3Properties); receiver.setUrlName(urlName); Object obj = configurationContext. @@ -223,6 +209,12 @@ configurationContext.setProperty( org.apache.axis2.transport.mail.Constants.MAPPING_TABLE, new Hashtable()); } + Object callBackTable = configurationContext. + getProperty(org.apache.axis2.transport.mail.Constants.CALLBACK_TABLE); + if (callBackTable == null) { + configurationContext.setProperty( + org.apache.axis2.transport.mail.Constants.CALLBACK_TABLE, new Hashtable()); + } } /** @@ -291,14 +283,16 @@ MimeMessage msg = (MimeMessage) msgs[i]; try { MessageContext mc = createMessageContextToMailWorker(msg); - if (mc != null) { - messageQueue.add(mc); + if(mc==null){ + continue; } + msg.setFlag(Flags.Flag.DELETED, true); + MailWorker worker = new MailWorker(configurationContext,mc); + this.configurationContext.getThreadPool().execute(worker); } catch (Exception e) { log.error("Error in SimpleMailListener - processing mail", e); } finally { // delete mail in any case - msg.setFlag(Flags.Flag.DELETED, true); } } } @@ -363,13 +357,16 @@ msgContext.setProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO, transportInfo); buildSOAPEnvelope(msg, msgContext); - fillMessageContextFromAvaiableData(msgContext,inReplyTo); + if(!fillMessageContextFromAvaiableData(msgContext,inReplyTo)){ + return null; + } } return msgContext; } - private void fillMessageContextFromAvaiableData(MessageContext msgContext , String messageID) throws AxisFault{ - Hashtable mappingTable = (Hashtable) msgContext.getConfigurationContext(). + private boolean fillMessageContextFromAvaiableData(MessageContext msgContext , + String messageID) throws AxisFault{ + Hashtable mappingTable = (Hashtable) configurationContext. getProperty(org.apache.axis2.transport.mail.Constants.MAPPING_TABLE); if(mappingTable!=null&&messageID!=null){ @@ -388,6 +385,16 @@ } } } + Hashtable callBackTable = (Hashtable) configurationContext.getProperty( + org.apache.axis2.transport.mail.Constants.CALLBACK_TABLE); + if(messageID!=null&&callBackTable!=null){ + SynchronousMailListener listener = (SynchronousMailListener) callBackTable.get(messageID); + if(listener!=null){ + listener.setInMessageContext(msgContext); + return false; + } + } + return true; } private void buildSOAPEnvelope(MimeMessage msg, MessageContext msgContext) @@ -496,21 +503,7 @@ * Start this listener */ public void start() throws AxisFault { - workerPool = new ThreadPoolExecutor(1, - WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT, - new LinkedBlockingQueue(), - new DefaultThreadFactory( - new ThreadGroup("Mail Worker thread group"), - "MailWorker")); - - messageQueue = new LinkedBlockingQueue(); - this.configurationContext.getThreadPool().execute(this); - - MailWorkerManager mailWorkerManager = new MailWorkerManager(configurationContext, - messageQueue, workerPool, - WORKERS_MAX_THREADS); - mailWorkerManager.start(); } /** @@ -518,10 +511,7 @@ * <p/> */ public void stop() { - running = true; - if (!workerPool.isShutdown()) { - workerPool.shutdown(); - } + running = false; log.info("Stopping the mail listner"); } @@ -533,12 +523,13 @@ public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { return new EndpointReference[]{ new EndpointReference(Constants.TRANSPORT_MAIL + ":" + replyTo + "?" + - configurationContext.getServiceContextPath() + "/" + - serviceName), - new EndpointReference(Constants.TRANSPORT_MAIL + ":" + replyTo + "?" + org.apache.axis2.transport.mail.Constants.X_SERVICE_PATH + "=" + configurationContext.getServiceContextPath() + "/" + - serviceName)}; + serviceName), + new EndpointReference(Constants.TRANSPORT_MAIL + ":" + replyTo + "?" + + configurationContext.getServiceContextPath() + "/" + + serviceName) + }; } @@ -548,9 +539,5 @@ public void destroy() { this.configurationContext = null; - } - - public LinkedBlockingQueue getLinkedBlockingQueue() { - return messageQueue; } } Modified: webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SynchronousMailListener.java URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SynchronousMailListener.java?view=diff&rev=539326&r1=539325&r2=539326 ============================================================================== --- webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SynchronousMailListener.java (original) +++ webservices/axis2/branches/java/1_2/modules/kernel/src/org/apache/axis2/transport/mail/SynchronousMailListener.java Fri May 18 01:54:37 2007 @@ -15,91 +15,68 @@ */ package org.apache.axis2.transport.mail; -import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; import org.apache.axis2.AxisFault; import org.apache.axis2.context.MessageContext; -import org.apache.axis2.context.ContextFactory; import org.apache.axis2.context.OperationContext; -import org.apache.axis2.description.TransportInDescription; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.description.AxisMessage; import org.apache.axis2.wsdl.WSDLConstants; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.Properties; -/* - * - */ - public class SynchronousMailListener { private static Log log = LogFactory.getLog(SynchronousMailListener.class); + private boolean complete = false; + //To store out going messageconext + private MessageContext outMessageContext; + private MessageContext inMessageContext; private long timeoutInMilliseconds = -1; - private LinkedBlockingQueue queue; - public SynchronousMailListener(long timeoutInMilliseconds, LinkedBlockingQueue queue) { + + public SynchronousMailListener(MessageContext outMessageContext, + long timeoutInMilliseconds) { + this.outMessageContext = outMessageContext; this.timeoutInMilliseconds = timeoutInMilliseconds; - this.queue = queue; } - - public SimpleMailListener sendReceive(final MessageContext msgContext, final String msgId) throws AxisFault { - /** - * This will be bloked invocation - */ - return new SimpleMailListener(queue) { - public void start() throws AxisFault { - long timeStatus; - while (true) { - long startTime = System.currentTimeMillis(); - try { - MessageContext msgCtx = (MessageContext) getLinkedBlockingQueue().take(); - MailBasedOutTransportInfo transportInfo = (MailBasedOutTransportInfo) msgCtx - .getProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO); - if (transportInfo.getInReplyTo() == null) { - String error = EMailSender.class.getName() + " Coudn't simulate request/response without In-Reply-To Mail header"; - log.error(error); - throw new AxisFault(error); - } - if (transportInfo.getInReplyTo().equals(msgId)) { - OperationContext operationContext = msgContext.getOperationContext(); - MessageContext messageContext = operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE); - //FIXME - if (messageContext == null) { - if (!operationContext.isComplete()) { - messageContext = ContextFactory.createMessageContext(msgContext.getConfigurationContext()); - messageContext.setOperationContext(operationContext); - messageContext.setServiceContext(msgContext.getServiceContext()); - msgContext.getOperationContext().addMessageContext(messageContext); - messageContext.setEnvelope(msgCtx.getEnvelope()); - } - } else { - messageContext.setEnvelope(msgCtx.getEnvelope()); - } - log.info(SynchronousMailListener.class.getName() + " found the required message."); - break; - } - getLinkedBlockingQueue().put(msgCtx); - - } catch (InterruptedException e) { - log.warn(e); - throw new AxisFault(e); - } - long endTime = System.currentTimeMillis(); - timeStatus = endTime - startTime; - if (timeoutInMilliseconds != -1 && timeStatus > timeoutInMilliseconds) { - /*TODO What should be the best default value for timeoutInMilliseconds ?*/ - /*log.info(SynchronousMailListener.class.getName() + " timeout"); - break;*/ - } - - } - - + public void setInMessageContext(MessageContext inMessageContext) throws AxisFault{ + OperationContext operationContext = outMessageContext.getOperationContext(); + MessageContext msgCtx = + operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE); + if(msgCtx==null){ + inMessageContext.setOperationContext(operationContext); + inMessageContext.setServiceContext(outMessageContext.getServiceContext()); + if(!operationContext.isComplete()){ + operationContext.addMessageContext(inMessageContext); } - }; - - + AxisOperation axisOp = operationContext.getAxisOperation(); + //TODO need to handle fault case as well , + //TODO need to check whether the message contains fault , if so we need to get the fault message + AxisMessage inMessage = axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE); + inMessageContext.setAxisMessage(inMessage); + inMessageContext.setServerSide(false); + } else { + msgCtx.setOperationContext(operationContext); + msgCtx.setServiceContext(outMessageContext.getServiceContext()); + AxisOperation axisOp = operationContext.getAxisOperation(); + AxisMessage inMessage = axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE); + msgCtx.setAxisMessage(inMessage); + msgCtx.setTransportIn(inMessageContext.getTransportIn()); + msgCtx.setTransportOut(inMessageContext.getTransportOut()); + msgCtx.setServerSide(false); + msgCtx.setProperty(org.apache.axis2.transport.mail.Constants.CONTENT_TYPE, + inMessageContext.getProperty(org.apache.axis2.transport.mail.Constants.CONTENT_TYPE)); + msgCtx.setIncomingTransportName(org.apache.axis2.Constants.TRANSPORT_MAIL); + msgCtx.setEnvelope(inMessageContext.getEnvelope()); + if(!operationContext.isComplete()){ + operationContext.addMessageContext(msgCtx); + } + } + this.inMessageContext = inMessageContext; + log.info(" SynchronousMailListener found the required message."); + complete = true; } public long getTimeoutInMilliseconds() { @@ -108,5 +85,13 @@ public void setTimeoutInMilliseconds(long timeoutInMilliseconds) { this.timeoutInMilliseconds = timeoutInMilliseconds; + } + + public boolean isComplete() { + return complete; + } + + public MessageContext getInMessageContext() { + return inMessageContext; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]