jmsnell 2002/10/28 21:15:30 Modified: java/src/org/apache/axis/ime/internal NonPersistentMessageExchangeCorrelatorService.java MessageExchangeProvider.java MessageExchangeImpl.java java/src/org/apache/axis/ime MessageExchange.java MessageExchangeCorrelatorService.java Added: java/src/org/apache/axis/ime/internal/util/handler HandlerWrapper.java java/src/org/apache/axis/ime/internal MessageExchangeSendContext.java MessageExchangeSendListener.java FirstComeFirstServeDispatchPolicy.java MessageExchangeReceiveContext.java ReceivedMessageDispatchPolicy.java java/src/org/apache/axis/ime MessageContextListener.java java/src/org/apache/axis/ime/internal/util KeyedBuffer.java WorkerPool.java NonPersistentKeyedBuffer.java Removed: java/src/org/apache/axis/ime/internal/util/handler HandlerWrapper1.java HandlerWrapper2.java java/src/org/apache/axis/ime/internal MessageExchangeProvider1.java MessageWorker.java MessageExchangeProvider2.java NonPersistentMessageChannel.java MessageWorkerGroup.java java/src/org/apache/axis/ime MessageChannel.java MessageExchangeReceiveListener.java MessageExchangeContextListener.java MessageExchangeContext.java Log: Some fairly significant changes here. I went through and simplified the MessageExchange interface and org.apache.axis.ime package and reworked the internal impl quite a bit to improve how the way sending and receiving is done. This still needs to be tested, but everything builds. Revision Changes Path 1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/handler/HandlerWrapper.java Index: HandlerWrapper.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal.util.handler; import org.apache.axis.Handler; import org.apache.axis.MessageContext; import org.apache.axis.ime.MessageExchangeCorrelator; import org.apache.axis.ime.MessageContextListener; import org.apache.axis.ime.MessageExchangeFaultListener; import org.apache.axis.ime.internal.MessageExchangeProvider; import org.apache.axis.ime.internal.MessageExchangeSendContext; import org.apache.axis.ime.internal.MessageExchangeSendListener; import org.apache.axis.ime.internal.ReceivedMessageDispatchPolicy; import org.apache.axis.ime.internal.FirstComeFirstServeDispatchPolicy; /** * Used to wrap synchronous handlers (e.g. Axis 1.0 transports) * * @author James M Snell ([EMAIL PROTECTED]) */ public class HandlerWrapper extends MessageExchangeProvider { private Handler handler; public HandlerWrapper(Handler handler) { this.handler = handler; } /** * @see org.apache.axis.ime.internal.MessageExchangeProvider1#createSendMessageContextListener() */ protected MessageExchangeSendListener getMessageExchangeSendListener() { return new Listener(handler); } protected ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy() { return new FirstComeFirstServeDispatchPolicy(RECEIVE, RECEIVE_REQUESTS); } public class Listener implements MessageExchangeSendListener { private Handler handler; public Listener(Handler handler) { this.handler = handler; } /** * @see org.apache.axis.ime.MessageExchangeContextListener#onMessageExchangeContext(MessageExchangeContext) */ public void onSend( MessageExchangeSendContext context) { MessageExchangeFaultListener listener = context.getMessageExchangeFaultListener(); try { MessageContext msgContext = context.getMessageContext(); MessageExchangeCorrelator correlator = context.getMessageExchangeCorrelator(); // should I do init's and cleanup's in here? handler.invoke(msgContext); RECEIVE.put(correlator, context); } catch (Exception exception) { if (listener != null) listener.onFault( context.getMessageExchangeCorrelator(), exception); } } } } 1.3 +3 -4 xml-axis/java/src/org/apache/axis/ime/internal/NonPersistentMessageExchangeCorrelatorService.java Index: NonPersistentMessageExchangeCorrelatorService.java =================================================================== RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/NonPersistentMessageExchangeCorrelatorService.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- NonPersistentMessageExchangeCorrelatorService.java 28 Oct 2002 13:50:40 -0000 1.2 +++ NonPersistentMessageExchangeCorrelatorService.java 29 Oct 2002 05:15:29 -0000 1.3 @@ -55,7 +55,6 @@ package org.apache.axis.ime.internal; -import org.apache.axis.ime.MessageExchangeContext; import org.apache.axis.ime.MessageExchangeCorrelator; import org.apache.axis.ime.MessageExchangeCorrelatorService; @@ -74,7 +73,7 @@ */ public void put( MessageExchangeCorrelator correlator, - MessageExchangeContext context) { + Object context) { synchronized (contexts) { contexts.put(correlator, context); } @@ -83,9 +82,9 @@ /** * @see org.apache.axis.ime.MessageExchangeCorrelatorService#get(MessageExchangeCorrelator) */ - public MessageExchangeContext get(MessageExchangeCorrelator correlator) { + public Object get(MessageExchangeCorrelator correlator) { synchronized (contexts) { - return (MessageExchangeContext) contexts.remove(correlator); + return contexts.remove(correlator); } } 1.3 +123 -11 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java Index: MessageExchangeProvider.java =================================================================== RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- MessageExchangeProvider.java 28 Oct 2002 13:50:40 -0000 1.2 +++ MessageExchangeProvider.java 29 Oct 2002 05:15:29 -0000 1.3 @@ -55,38 +55,68 @@ package org.apache.axis.ime.internal; -import org.apache.axis.ime.MessageChannel; +import org.apache.axis.i18n.Messages; +import org.apache.axis.MessageContext; import org.apache.axis.ime.MessageExchange; +import org.apache.axis.ime.MessageContextListener; +import org.apache.axis.ime.MessageExchangeCorrelator; import org.apache.axis.ime.MessageExchangeFactory; +import org.apache.axis.ime.MessageExchangeFaultListener; +import org.apache.axis.ime.internal.util.WorkerPool; +import org.apache.axis.ime.internal.util.KeyedBuffer; +import org.apache.axis.ime.internal.util.NonPersistentKeyedBuffer; /** - * Serves as a base class for MessageExchangeProviders that - * need to thread pooling on send AND receive message - * flows (as opposed to MessageExchangeProvider2 which only - * does thread pooling on send flows). - * * @author James M Snell ([EMAIL PROTECTED]) */ public abstract class MessageExchangeProvider implements MessageExchangeFactory { + public static final long SELECT_TIMEOUT = 1000 * 30; public static final long DEFAULT_THREAD_COUNT = 5; - protected final MessageWorkerGroup WORKERS = new MessageWorkerGroup(); - protected final MessageChannel SEND = new NonPersistentMessageChannel(WORKERS); - protected final MessageChannel RECEIVE = new NonPersistentMessageChannel(WORKERS); + protected final WorkerPool WORKERS = new WorkerPool(); + protected final KeyedBuffer SEND = new NonPersistentKeyedBuffer(WORKERS); + protected final KeyedBuffer RECEIVE = new NonPersistentKeyedBuffer(WORKERS); + protected final KeyedBuffer RECEIVE_REQUESTS = new NonPersistentKeyedBuffer(WORKERS); protected boolean initialized = false; + protected abstract MessageExchangeSendListener getMessageExchangeSendListener(); + + protected abstract ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy(); + public MessageExchange createMessageExchange() { - return new MessageExchangeImpl(this, SEND, RECEIVE); + return new MessageExchangeImpl(this); } public void init() { init(DEFAULT_THREAD_COUNT); } - public abstract void init(long THREAD_COUNT); + public void init(long THREAD_COUNT) { + if (initialized) + throw new IllegalStateException(Messages.getMessage("illegalStateException00")); + for (int n = 0; n < THREAD_COUNT; n++) { + WORKERS.addWorker(new MessageSender(WORKERS, SEND, getMessageExchangeSendListener())); + WORKERS.addWorker(new MessageReceiver(WORKERS, RECEIVE, getReceivedMessageDispatchPolicy())); + } + initialized = true; + } + + public void processReceive( + MessageExchangeReceiveContext context) { + RECEIVE_REQUESTS.put( + context.getMessageExchangeCorrelator(), + context); + } + + public void processSend( + MessageExchangeSendContext context) { + SEND.put( + context.getMessageExchangeCorrelator(), + context); + } public void shutdown() { shutdown(false); @@ -108,6 +138,88 @@ public void awaitShutdown(long shutdown) throws InterruptedException { WORKERS.awaitShutdown(shutdown); + } + + + + // -- Worker Classes --- // + public static class MessageReceiver + implements Runnable { + + protected WorkerPool pool; + protected KeyedBuffer channel; + protected ReceivedMessageDispatchPolicy policy; + + protected MessageReceiver( + WorkerPool pool, + KeyedBuffer channel, + ReceivedMessageDispatchPolicy policy) { + this.pool = pool; + this.channel = channel; + this.policy = policy; + } + + /** + * @see java.lang.Runnable#run() + */ + public void run() { + try { + while (!pool.isShuttingDown()) { + MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT); + policy.dispatch(context); + } + } catch (Throwable t) { + // kill the thread if any type of exception occurs. + // don't worry, we'll create another one to replace it + // if we're not currently in the process of shutting down. + // once I get the logging function plugged in, we'll + // log whatever errors do occur + } finally { + pool.workerDone(this); + } + } + + } + + + + public static class MessageSender + implements Runnable { + + protected WorkerPool pool; + protected KeyedBuffer channel; + protected MessageExchangeSendListener listener; + + protected MessageSender( + WorkerPool pool, + KeyedBuffer channel, + MessageExchangeSendListener listener) { + this.pool = pool; + this.channel = channel; + this.listener = listener; + } + + /** + * @see java.lang.Runnable#run() + */ + public void run() { + try { + while (!pool.isShuttingDown()) { + MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT); + if (context != null) + listener.onSend(context); + } + } catch (Throwable t) { + // kill the thread if any type of exception occurs. + // don't worry, we'll create another one to replace it + // if we're not currently in the process of shutting down. + // once I get the logging function plugged in, we'll + // log whatever errors do occur + } finally { + pool.workerDone(this); + } + } + } } 1.3 +112 -160 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java Index: MessageExchangeImpl.java =================================================================== RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- MessageExchangeImpl.java 28 Oct 2002 13:50:40 -0000 1.2 +++ MessageExchangeImpl.java 29 Oct 2002 05:15:29 -0000 1.3 @@ -58,16 +58,14 @@ import org.apache.axis.AxisFault; import org.apache.axis.MessageContext; import org.apache.axis.i18n.Messages; -import org.apache.axis.ime.MessageChannel; import org.apache.axis.ime.MessageExchange; import org.apache.axis.ime.MessageExchangeConstants; -import org.apache.axis.ime.MessageExchangeContext; -import org.apache.axis.ime.MessageExchangeContextListener; -import org.apache.axis.ime.MessageExchangeCorrelator; import org.apache.axis.ime.MessageExchangeFaultListener; -import org.apache.axis.ime.MessageExchangeLifecycle; -import org.apache.axis.ime.MessageExchangeReceiveListener; import org.apache.axis.ime.MessageExchangeStatusListener; +import org.apache.axis.ime.MessageExchangeCorrelator; +import org.apache.axis.ime.MessageExchangeCorrelatorService; +import org.apache.axis.ime.MessageContextListener; +import org.apache.axis.ime.MessageExchangeLifecycle; import org.apache.axis.ime.internal.util.uuid.UUIDGenFactory; /** @@ -76,25 +74,19 @@ public class MessageExchangeImpl implements MessageExchange, MessageExchangeLifecycle { + private static final long NO_TIMEOUT = -1; public static final long WORKER_COUNT = 5; public static final long DEFAULT_TIMEOUT = 1000 * 20; - private MessageExchangeProvider provider; - private MessageChannel send; - private MessageChannel receive; - private MessageExchangeReceiveListener receiveListener; - private MessageExchangeStatusListener statusListener; - private MessageExchangeFaultListener faultListener; private MessageWorkerGroup workers = new MessageWorkerGroup(); + private MessageExchangeFaultListener faultListener; + private MessageExchangeStatusListener statusListener; + private MessageExchangeProvider provider; private boolean listening = false; protected Holder holder; public MessageExchangeImpl( - MessageExchangeProvider provider, - MessageChannel sendChannel, - MessageChannel receiveChannel) { - this.send = sendChannel; - this.receive = receiveChannel; + MessageExchangeProvider provider) { } /** @@ -103,6 +95,16 @@ public MessageExchangeCorrelator send( MessageContext context) throws AxisFault { + return send(context,null); // should do default listener + } + + /** + * @see org.apache.axis.ime.MessageExchange#send(MessageContext) + */ + public MessageExchangeCorrelator send( + MessageContext context, + MessageContextListener listener) + throws AxisFault { MessageExchangeCorrelator correlator = (MessageExchangeCorrelator) context.getProperty( MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY); @@ -113,88 +115,64 @@ MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY, correlator); } - MessageExchangeContext meContext = - MessageExchangeContext.newInstance( - correlator, - statusListener, - receiveListener, - faultListener, - context); - send.put(correlator, meContext); + if (listener != null) { + provider.processReceive( + MessageExchangeReceiveContext.newInstance( + correlator, + listener, + faultListener, + statusListener)); + } + provider.processSend( + MessageExchangeSendContext.newInstance( + correlator, + context, + faultListener, + statusListener)); return correlator; } /** - * @see org.apache.axis.ime.MessageExchange#setMessageExchangeStatusListener(MessageExchangeStatusListener) - */ - public void setMessageExchangeStatusListener( - MessageExchangeStatusListener listener) - throws AxisFault { - if (listening) - throw new IllegalStateException(Messages.getMessage("illegalStateException00")); - this.statusListener = listener; - } - - /** - * @see org.apache.axis.ime.MessageExchange#setMessageExchangeReceiveListener(MessageExchangeReceiveListener) + * @see org.apache.axis.ime.MessageExchange#receive() */ - public void setMessageExchangeReceiveListener( - MessageExchangeReceiveListener listener) + public MessageContext receive() throws AxisFault { - if (listening) - throw new IllegalStateException(Messages.getMessage("illegalStateException00")); - this.receiveListener = listener; + return receive(null,NO_TIMEOUT); } /** - * @see org.apache.axis.ime.MessageExchange#setMessageExchangeReceiveListener(MessageExchangeReceiveListener) + * @see org.apache.axis.ime.MessageExchange#receive(long) */ - public void setMessageExchangeFaultListener( - MessageExchangeFaultListener listener) + public MessageContext receive( + long timeout) throws AxisFault { - if (listening) - throw new IllegalStateException(Messages.getMessage("illegalStateException00")); - this.faultListener = listener; + return receive(null,timeout); } /** - * @see org.apache.axis.ime.MessageExchange#cancel(MessageExchangeCorrelator) + * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator) */ - public MessageContext cancel( - MessageExchangeCorrelator correlator) + public MessageContext receive( + MessageExchangeCorrelator correlator) throws AxisFault { - MessageExchangeContext context = send.cancel(correlator); - if (context != null) - return context.getMessageContext(); - else - return null; + return receive(correlator,NO_TIMEOUT); } /** - * @see org.apache.axis.ime.MessageExchange#getReceiveChannel() + * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,long) */ - public MessageChannel getReceiveChannel() { - return receive; - } - - /** - * @see org.apache.axis.ime.MessageExchange#getSendChannel() - */ - public MessageChannel getSendChannel() { - return send; - } - - - public MessageContext sendAndReceive( - MessageContext context) + public MessageContext receive( + MessageExchangeCorrelator correlator, + long timeout) throws AxisFault { holder = new Holder(); Listener listener = new Listener(holder); - this.setMessageExchangeFaultListener(listener); - this.setMessageExchangeReceiveListener(listener); try { - this.send(context); - holder.waitForNotify(); + this.receive(correlator,listener); + if (timeout != NO_TIMEOUT) + holder.waitForNotify(timeout); + else + holder.waitForNotify(); } catch (InterruptedException ie) { throw AxisFault.makeFault(ie); } @@ -207,17 +185,54 @@ return null; } + /** + * @see org.apache.axis.ime.MessageExchange#receive(MessageContextListener) + */ + public void receive( + MessageContextListener listener) + throws AxisFault { + receive(null,listener); + } + + /** + * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,MessageContextListener) + */ + public void receive( + MessageExchangeCorrelator correlator, + MessageContextListener listener) + throws AxisFault { + provider.processReceive( + MessageExchangeReceiveContext.newInstance( + correlator, + listener, + faultListener, + statusListener)); + } + + /** + * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext) + */ + public MessageContext sendAndReceive( + MessageContext context) + throws AxisFault { + return sendAndReceive(context,NO_TIMEOUT); + } + + /** + * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext,long) + */ public MessageContext sendAndReceive( MessageContext context, long timeout) throws AxisFault { holder = new Holder(); Listener listener = new Listener(holder); - this.setMessageExchangeFaultListener(listener); - this.setMessageExchangeReceiveListener(listener); try { - this.send(context); - holder.waitForNotify(timeout); + this.send(context,listener); + if (timeout != NO_TIMEOUT) + holder.waitForNotify(timeout); + else + holder.waitForNotify(); } catch (InterruptedException ie) { throw AxisFault.makeFault(ie); } @@ -230,44 +245,8 @@ return null; } - /** - * @see org.apache.axis.ime.MessageExchange#startListening() - */ - public void startListening() { - if (provider instanceof MessageExchangeProvider1) - throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00")); - for (int n = 0; n < WORKER_COUNT; n++) { - workers.addWorker(receive, new ReceiverListener()); - } - listening = true; - } - /** - * @see org.apache.axis.ime.MessageExchange#startListening() - */ - public void startListening(MessageExchangeCorrelator correlator) { - throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException01", "Unsupported For Now")); - } - - /** - * @see org.apache.axis.ime.MessageExchange#stopListening() - */ - public void stopListening() { - stopListening(false); - } - - /** - * @see org.apache.axis.ime.MessageExchange#stopListening(boolean) - */ - public void stopListening(boolean force) { - if (provider instanceof MessageExchangeProvider1) - throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00")); - if (!force) - workers.safeShutdown(); - else - workers.shutdown(); - listening = false; - } + // -- Utility Classes --- // private class Holder { private MessageExchangeCorrelator correlator; @@ -305,8 +284,7 @@ } public class Listener - implements MessageExchangeReceiveListener, - MessageExchangeFaultListener { + extends MessageContextListener { protected Holder holder; @@ -335,37 +313,8 @@ } - private class ReceiverListener - implements MessageExchangeContextListener { - /** - * @see org.apache.axis.ime.MessageExchangeContextListener#onMessageExchangeContext(MessageExchangeContext) - */ - public void onMessageExchangeContext( - MessageExchangeContext context) { - - MessageContext msgContext = - context.getMessageContext(); - MessageExchangeCorrelator correlator = - context.getMessageExchangeCorrelator(); - - try { - // there should be code here to see if the message - // contains a fault. if so, the fault listener should - // be invoked - if (msgContext != null && - msgContext.getResponseMessage() != null && - receiveListener != null) { - receiveListener.onReceive(correlator, msgContext); - } - } catch (Exception exception) { - if (faultListener != null) - faultListener.onFault( - correlator, exception); - } - - } - } + // -- MessageExchangeLifecycle Implementation --- // /** * @see org.apache.axis.ime.MessageExchangeLifecycle#awaitShutdown() @@ -404,18 +353,21 @@ provider.shutdown(force); } - /** - * @see org.apache.axis.ime.MessageExchange#receive() - */ - public MessageContext receive() throws AxisFault { - throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00")); + public synchronized void setMessageExchangeFaultListener( + MessageExchangeFaultListener listener) { + this.faultListener = listener; } - - /** - * @see org.apache.axis.ime.MessageExchange#receive(long) - */ - public MessageContext receive(long timeout) throws AxisFault { - throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00")); + + public synchronized MessageExchangeFaultListener getMessageExchangeFaultListener() { + return this.faultListener; + } + + public synchronized void setMessageExchangeStatusListener( + MessageExchangeStatusListener listener) { + this.statusListener = listener; + } + + public synchronized MessageExchangeStatusListener getMessageExchangeStatusListener() { + return this.statusListener; } - } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeSendContext.java Index: MessageExchangeSendContext.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal; import org.apache.axis.MessageContext; import org.apache.axis.ime.MessageExchangeCorrelator; import org.apache.axis.ime.MessageExchangeFaultListener; import org.apache.axis.ime.MessageExchangeStatusListener; import java.io.Serializable; /** * Note: the only challenge with making this class serializable * is that org.apache.axis.MessageContext is currently NOT * serializable. MessageContext needs to change in order to * take advantage of persistent Channels and CorrelatorServices * * For thread safety, instances of this class are immutable * * @author James M Snell ([EMAIL PROTECTED]) */ public final class MessageExchangeSendContext implements Serializable { public static MessageExchangeSendContext newInstance( MessageExchangeCorrelator correlator, MessageContext context, MessageExchangeFaultListener faultListener, MessageExchangeStatusListener statusListener) { MessageExchangeSendContext mectx = new MessageExchangeSendContext(); mectx.correlator = correlator; mectx.context = context; mectx.faultListener = faultListener; mectx.statusListener = statusListener; return mectx; } protected MessageExchangeCorrelator correlator; protected MessageExchangeFaultListener faultListener; protected MessageExchangeStatusListener statusListener; protected MessageContext context; protected MessageExchangeSendContext() { } public MessageExchangeCorrelator getMessageExchangeCorrelator() { return this.correlator; } public MessageContext getMessageContext() { return this.context; } public MessageExchangeFaultListener getMessageExchangeFaultListener() { return this.faultListener; } public MessageExchangeStatusListener getMessageExchangeStatusListener() { return this.statusListener; } } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeSendListener.java Index: MessageExchangeSendListener.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal; import java.io.Serializable; /** * @author James M Snell ([EMAIL PROTECTED]) */ public interface MessageExchangeSendListener extends Serializable { public void onSend( MessageExchangeSendContext context); } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/FirstComeFirstServeDispatchPolicy.java Index: FirstComeFirstServeDispatchPolicy.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal; import org.apache.axis.MessageContext; import org.apache.axis.ime.MessageExchangeCorrelator; import org.apache.axis.ime.MessageContextListener; import org.apache.axis.ime.MessageExchangeFaultListener; import org.apache.axis.ime.internal.util.KeyedBuffer; /** * @author James M Snell ([EMAIL PROTECTED]) */ public class FirstComeFirstServeDispatchPolicy implements ReceivedMessageDispatchPolicy { protected KeyedBuffer RECEIVE_REQUESTS; protected KeyedBuffer RECEIVE; public FirstComeFirstServeDispatchPolicy( KeyedBuffer RECEIVE, KeyedBuffer RECEIVE_REQUESTS) { this.RECEIVE = RECEIVE; this.RECEIVE_REQUESTS = RECEIVE_REQUESTS; } public void dispatch( MessageExchangeSendContext context) { // 1. Get the correlator // 2. See if there are any receive requests based on the correlator // 3. If there are receive requests for the correlator, deliver to the first one // 4. If there are no receive requests for the correlator, deliver to the first "anonymous" receive request // 5. If there are no receive requests, put the message back on the Queue MessageExchangeReceiveContext receiveContext = null; MessageExchangeCorrelator correlator = context.getMessageExchangeCorrelator(); receiveContext = (MessageExchangeReceiveContext)RECEIVE_REQUESTS.get(correlator); if (receiveContext == null) { receiveContext = (MessageExchangeReceiveContext)RECEIVE_REQUESTS.get(); } if (receiveContext == null) RECEIVE.put(correlator,context); else { MessageExchangeFaultListener faultListener = receiveContext.getMessageExchangeFaultListener(); MessageContextListener contextListener = receiveContext.getMessageContextListener(); MessageContext msgContext = context.getMessageContext(); try { contextListener.onReceive( correlator, msgContext); } catch (Exception exception) { if (faultListener != null) faultListener.onFault( correlator, exception); } } } } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeReceiveContext.java Index: MessageExchangeReceiveContext.java =================================================================== package org.apache.axis.ime.internal; import org.apache.axis.ime.MessageContextListener; import org.apache.axis.ime.MessageExchangeFaultListener; import org.apache.axis.ime.MessageExchangeStatusListener; import org.apache.axis.ime.MessageExchangeCorrelator; /** * @author James M Snell ([EMAIL PROTECTED]) */ public class MessageExchangeReceiveContext { public static MessageExchangeReceiveContext newInstance( MessageExchangeCorrelator correlator, MessageContextListener listener, MessageExchangeFaultListener faultListener, MessageExchangeStatusListener statusListener) { MessageExchangeReceiveContext mectx = new MessageExchangeReceiveContext(); mectx.correlator = correlator; mectx.listener = listener; mectx.faultListener = faultListener; mectx.statusListener = statusListener; return mectx; } protected MessageContextListener listener; protected MessageExchangeFaultListener faultListener; protected MessageExchangeStatusListener statusListener; protected MessageExchangeCorrelator correlator; protected MessageExchangeReceiveContext() {} public MessageExchangeCorrelator getMessageExchangeCorrelator() { return this.correlator; } public MessageContextListener getMessageContextListener() { return this.listener; } public MessageExchangeFaultListener getMessageExchangeFaultListener() { return this.faultListener; } public MessageExchangeStatusListener getMessageExchangeStatusListener() { return this.statusListener; } } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/ReceivedMessageDispatchPolicy.java Index: ReceivedMessageDispatchPolicy.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal; /** * @author James M Snell ([EMAIL PROTECTED]) */ public interface ReceivedMessageDispatchPolicy { public void dispatch( MessageExchangeSendContext context); } 1.5 +54 -65 xml-axis/java/src/org/apache/axis/ime/MessageExchange.java Index: MessageExchange.java =================================================================== RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/MessageExchange.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- MessageExchange.java 28 Oct 2002 21:59:45 -0000 1.4 +++ MessageExchange.java 29 Oct 2002 05:15:29 -0000 1.5 @@ -80,22 +80,23 @@ throws AxisFault; /** - * Will attempt to cancel the outbound MessageExchange - * process for a given message context. Returns true if - * an only if the MessageContext was canceled. A false - * response indicates that the MessageContext could not - * be removed from the outbound channel for whatever - * reason. - * @param MessageExchangeCorrelator The correlator for the message being canceled - * @return MessageContext The canceled MessageContext + * Send an outbound message. (Impl's of this method + * need to create a new MessageExchangeCorrelator and + * put it into the MessageContext if one does not already + * exist.) + * @param MessageContext The Axis MessageContext being sent + * @param MessageContextListener The listener to which responses, faults, and status updates should be delivered + * @return MessageExchangeCorrelator The correlator for the sent MessageContext * @throws AxisFault */ - public MessageContext cancel( - MessageExchangeCorrelator correlator) + public MessageExchangeCorrelator send( + MessageContext context, + MessageContextListener listener) throws AxisFault; /** * Waits indefinitely for a message to be received + * (blocking) * @return MessageContext The received MessageContext * @throws AxisFault */ @@ -105,6 +106,7 @@ /** * Waits the specified amount of time for a message to * be received + * (blocking) * @param long The amount of time (ms) to wait * @return MessageContext The received MessageContext * @throws AxisFault @@ -114,29 +116,51 @@ throws AxisFault; /** - * Will instruct the MessageExchange provider to - * wait for a message to be received. + * Waits indefinitely for a message matching the + * specified correlator + * (blocking) + * @param MessageExchangeCorrelator + * @return MessageContext * @throws AxisFault */ - public void startListening() + public MessageContext receive( + MessageExchangeCorrelator correlator) throws AxisFault; /** - * Will instruct the MessageExchange provider to - * wait for a specific MessageExchangeCorrelator - * @param MessageExchangeCorrelator The correlator of the MessageContext to listen for + * Waits the specified amount of time for a message matching the + * specified correlator + * (blocking) + * @param MessageExchangeCorrelator + * @param long timeout + * @returns MessageContext * @throws AxisFault */ - public void startListening( - MessageExchangeCorrelator correlator) + public MessageContext receive( + MessageExchangeCorrelator correlator, + long timeout) throws AxisFault; /** - * Will instruct the MessageExchange provider to - * stop listening + * Registers a listener for receiving messages + * (nonblocking) + * @param MessageContextListener * @throws AxisFault */ - public void stopListening() + public void receive( + MessageContextListener listener) + throws AxisFault; + + /** + * Registers a listener for receiving messages + * (nonblocking) + * @param MessageExchangeCorrelator + * @param MessageContextListener + * @throws AxisFault + */ + public void receive( + MessageExchangeCorrelator correlator, + MessageContextListener listener) throws AxisFault; /** @@ -161,49 +185,14 @@ long timeout) throws AxisFault; - /** - * Allows applications to listen for changes to - * the current disposition of the MessageExchange operation - * (push model) - * @param MessageExchangeStatusListener - * @throws AxisFault - */ - public void setMessageExchangeStatusListener( - MessageExchangeStatusListener listener) - throws AxisFault; - - /** - * Allows applications to listen for inbound messages - * (push model) - * @param MessageExchangeReceiveListener - * @throws AxisFault - */ - public void setMessageExchangeReceiveListener( - MessageExchangeReceiveListener listener) - throws AxisFault; - - /** - * Allows applications to listen for faults/exceptions - * (push model) - * @param MessageExchangeFaultListener - * @throws AxisFault - */ public void setMessageExchangeFaultListener( - MessageExchangeFaultListener listener) - throws AxisFault; - - /** - * Allows MessageExchange consumers low level access - * to the Send message channel - * @return MessageChannel - */ - public MessageChannel getSendChannel(); - - /** - * Allows MessageExchange consumers low level access - * to the Receive message channel - * @return MessageChannel - */ - public MessageChannel getReceiveChannel(); - + MessageExchangeFaultListener listener); + + public MessageExchangeFaultListener getMessageExchangeFaultListener(); + + public void setMessageExchangeStatusListener( + MessageExchangeStatusListener listener); + + public MessageExchangeStatusListener getMessageExchangeStatusListener(); + } 1.4 +2 -2 xml-axis/java/src/org/apache/axis/ime/MessageExchangeCorrelatorService.java Index: MessageExchangeCorrelatorService.java =================================================================== RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/MessageExchangeCorrelatorService.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- MessageExchangeCorrelatorService.java 28 Oct 2002 21:45:59 -0000 1.3 +++ MessageExchangeCorrelatorService.java 29 Oct 2002 05:15:30 -0000 1.4 @@ -61,9 +61,9 @@ public void put( MessageExchangeCorrelator correlator, - MessageExchangeContext context); + Object context); - public MessageExchangeContext get( + public Object get( MessageExchangeCorrelator correlator); } 1.1 xml-axis/java/src/org/apache/axis/ime/MessageContextListener.java Index: MessageContextListener.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime; import java.io.Serializable; import org.apache.axis.MessageContext; /** * @author James M Snell ([EMAIL PROTECTED]) */ public abstract class MessageContextListener implements Serializable { public void onFault( MessageExchangeCorrelator correlator, Throwable exception) {} public void onReceive( MessageExchangeCorrelator correlator, MessageContext context) {} public void onStatus( MessageExchangeCorrelator correlator, MessageExchangeStatus status) {} } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/KeyedBuffer.java Index: KeyedBuffer.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal.util; /** * A KeyedBuffer is a low level hybrid FIFO Queue and Keyed map * Each MessageExchange implementation will create at least two * KeyedBuffer's, one for messages being sent, and another for * messages that have been received. * * KeyedBuffers differ from traditional FIFO Queues in that * elements put in are keyed and can be taken out of order. * * Different implementations may allow for variations on * how the KeyedBuffer model is implemented. For instance, * the code will ship with a NonPersistentKeyedBuffer that * will store all contained objects in memory. The fact that * everything is stored in memory means that the buffer is not * fault tolerant. If fault tolerance is required, then a * Persistent KeyedBuffer must be created that persists the * objects somehow. * * @author James M Snell ([EMAIL PROTECTED]) */ public interface KeyedBuffer { /** * Select, but do not remove the next message on the * channel. If one does not exist, return null */ public Object peek(); /** * Put a message onto the channel */ public void put( Object key, Object context); /** * Cancel a message that has been put on the channel. * Unlike select(Object key), this method will not block * and wait for a message with the specified key to be * put onto the MessageChannel. */ public Object cancel( Object key); /** * Select and remove all of the messages currently in * the channel (useful for bulk operations). This * method will not block. It is also not guaranteed * that the Channel will be empty once this operation * returns (it is possible that another thread may * put new MessageContexts into the channel before this * operation completes) */ public Object[] selectAll(); /** * Select and remove the next message in the channel * If a message is not available, wait indefinitely for one */ public Object select() throws InterruptedException; /** * Select and remove the next message in the channel * If a message is not available, wait the specified amount * of time for one */ public Object select( long timeout) throws InterruptedException; /** * Select and remove a specific message in the channel * If the message is not available, wait indefinitely * for one to be available */ public Object select( Object key) throws InterruptedException; /** * Select and remove a specific message in the channel * If the message is not available, wait the specified * amount of time for one */ public Object select( Object key, long timeout) throws InterruptedException; /** * Select and remove the next object in the buffer * (does not wait for a message to be put into the buffer) */ public Object get(); /** * Select and remove the specified object in the buffer * (does not wait for a message to be put into the buffer) */ public Object get(Object key); } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/WorkerPool.java Index: WorkerPool.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal.util; import org.apache.axis.i18n.Messages; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; /** * @author James M Snell ([EMAIL PROTECTED]) */ public class WorkerPool { public static final long MAX_THREADS = 100; protected Map threads = new Hashtable(); protected boolean interrupt; protected long threadcount; public boolean _shutdown; /** * Returns true if all workers have been shutdown */ public boolean isShutdown() { synchronized (this) { return _shutdown && threadcount == 0; } } /** * Returns true if all workers are in the process of shutting down */ public boolean isShuttingDown() { synchronized (this) { return _shutdown; } } /** * Returns the total number of currently active workers */ public long getWorkerCount() { synchronized (this) { return threadcount; } } /** * Adds a new worker to the pool */ public void addWorker( Runnable worker) { if (_shutdown || threadcount == MAX_THREADS) throw new IllegalStateException(Messages.getMessage("illegalStateException00")); Thread thread = new Thread(worker); threads.put(worker, thread); threadcount++; thread.start(); } /** * Forcefully interrupt all workers */ public void interruptAll() { synchronized (threads) { for (Iterator i = threads.values().iterator(); i.hasNext();) { Thread t = (Thread) i.next(); t.interrupt(); } } } /** * Forcefully shutdown the pool */ public void shutdown() { synchronized (this) { _shutdown = true; } interruptAll(); } /** * Forcefully shutdown the pool */ public void safeShutdown() { synchronized (this) { _shutdown = true; } } /** * Await shutdown of the worker */ public synchronized void awaitShutdown() throws InterruptedException { if (!_shutdown) throw new IllegalStateException(Messages.getMessage("illegalStateException00")); while (threadcount > 0) wait(); } /** * Await shutdown of the worker */ public synchronized boolean awaitShutdown(long timeout) throws InterruptedException { if (!_shutdown) throw new IllegalStateException(Messages.getMessage("illegalStateException00")); if (threadcount == 0) return true; long waittime = timeout; if (waittime <= 0) return false; long start = System.currentTimeMillis(); for (; ;) { wait(waittime); if (threadcount == 0) return true; waittime = timeout - System.currentTimeMillis(); if (waittime <= 0) return false; } } /** * Used by MessageWorkers to notify the pool that it is done */ public synchronized void workerDone( Runnable worker) { threads.remove(worker); if (--threadcount == 0 && _shutdown) { notifyAll(); } if (!_shutdown) { addWorker(worker); } } } 1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/NonPersistentKeyedBuffer.java Index: NonPersistentKeyedBuffer.java =================================================================== /* * The Apache Software License, Version 1.1 * * * Copyright (c) 2001 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Axis" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. */ package org.apache.axis.ime.internal.util; import org.apache.axis.i18n.Messages; import java.util.Vector; /** * Creates a non-persistent KeyedBuffer. Queued messages * are stored in memory. If the buffer instance is destroyed, * so is the Queue. * * @author James M Snell ([EMAIL PROTECTED]) */ public class NonPersistentKeyedBuffer implements KeyedBuffer { private final KeyedQueue messages = new KeyedQueue(); private WorkerPool WORKERS; public NonPersistentKeyedBuffer( WorkerPool workers) { this.WORKERS = workers; } public Object peek() { KeyedNode node = null; synchronized (messages) { node = messages.peek(); } if (node != null) { return node.value; } else { return null; } } public void put( Object key, Object object) { if (key == null || object == null) throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00")); synchronized (messages) { messages.put(new KeyedNode(key, object)); messages.notify(); } } public Object cancel(Object key) { if (key == null) throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00")); Object object = null; synchronized (messages) { KeyedNode node = messages.select(key); // will attempt to find and remove if (node != null) object = node.value; node.key = null; node.value = null; } return object; } public Object[] selectAll() { Vector v = new Vector(); KeyedNode node = null; synchronized (messages) { while ((node = messages.select()) != null) { v.add(node.value); node.key = null; node.value = null; } } Object[] objects = new Object[v.size()]; v.copyInto(objects); return objects; } public Object select() throws InterruptedException { for (; ;) { if (WORKERS.isShuttingDown()) throw new IllegalStateException(Messages.getMessage("illegalStateException00")); KeyedNode node = null; synchronized (messages) { node = messages.select(); } if (node != null) { Object object = node.value; node.key = null; node.value = null; return object; } else { messages.wait(); } } } public Object select(long timeout) throws InterruptedException { for (; ;) { if (WORKERS.isShuttingDown()) throw new IllegalStateException(Messages.getMessage("illegalStateException00")); KeyedNode node = null; synchronized (messages) { node = messages.select(); } if (node != null) { Object object = node.value; node.key = null; node.value = null; return object; } else { messages.wait(timeout); } } } public Object select(Object key) throws InterruptedException { for (; ;) { if (WORKERS.isShuttingDown()) throw new IllegalStateException(Messages.getMessage("illegalStateException00")); KeyedNode node = null; synchronized (messages) { node = messages.select(key); } if (node != null) { Object object = node.value; node.key = null; node.value = null; return object; } else { messages.wait(); } } } public Object select(Object key, long timeout) throws InterruptedException { for (; ;) { if (WORKERS.isShuttingDown()) throw new IllegalStateException(Messages.getMessage("illegalStateException00")); KeyedNode node = null; synchronized (messages) { node = messages.select(key); } if (node != null) { Object object = node.value; node.key = null; node.value = null; return object; } else { messages.wait(timeout); } } } public Object get() { KeyedNode node = null; Object object = null; synchronized (messages) { node = messages.select(); } if (node != null) { object = node.value; node.key = null; node.value = null; } return object; } public Object get(Object key) { KeyedNode node = null; Object object = null; synchronized (messages) { node = messages.select(key); } if (node != null) { object = node.value; node.key = null; node.value = null; } return object; } /// Support Classes /// protected static class KeyedNode { public Object key; public Object value; public KeyedNode next; public KeyedNode() { } public KeyedNode( Object key, Object value) { this.key = key; this.value = value; } public KeyedNode( Object key, Object value, KeyedNode next) { this(key, value); this.next = next; } } protected static class KeyedQueue { protected KeyedNode head; protected KeyedNode last; protected void put(KeyedNode node) { if (last == null) { last = head = node; } else { last = last.next = node; } } protected KeyedNode select() { KeyedNode node = head; if (node != null && (head = node.next) == null) { last = null; } if (node != null) node.next = null; return node; } protected KeyedNode select(Object key) { KeyedNode previous = null; for (KeyedNode node = head; node != null; node = node.next) { if (node.key.equals(key)) { if (previous != null) previous.next = node.next; node.next = null; return node; } previous = node; } return null; } protected KeyedNode peek() { KeyedNode node = head; return node; } } }