Hiranya, If you can make the worker pool configurable that would be of much importance... you may have a look at the nhttp transport thread pool, which can be configurable via the nhttp.properties file.
Thanks, Ruwan On Tue, May 12, 2009 at 1:30 PM, <[email protected]> wrote: > Author: hiranya > Date: Tue May 12 08:00:27 2009 > New Revision: 773818 > > URL: http://svn.apache.org/viewvc?rev=773818&view=rev > Log: > Enhancements and code cleanup in the FIX transport: > * FIX sender now has its own worker pool and hence does not rely on the FIX > listener any more. Therefore listener and sender can be enabled individually > * Made FIXSessionFactory a singleton to effectively share session data > among the listener and the sender > * Cleanup logic for initiators during sender shutdown > * Minor bug fix at FIXSessionFactory for a bug which prevented the sample > 259 and similar scenarios from operating properly > > Modified: > > > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java > > > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java > > > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java > > > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java > > Modified: > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java > (original) > +++ > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java > Tue May 12 08:00:27 2009 > @@ -27,15 +27,12 @@ > public class FIXApplicationFactory { > > private ConfigurationContext cfgCtx; > - private WorkerPool workerPool; > - > - public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool > workerPool) { > > + public FIXApplicationFactory(ConfigurationContext cfgCtx) { > this.cfgCtx = cfgCtx; > - this.workerPool = workerPool; > } > > - public Application getFIXApplication(AxisService service, boolean > acceptor) { > + public Application getFIXApplication(AxisService service, WorkerPool > workerPool, boolean acceptor) { > return new FIXIncomingMessageHandler(cfgCtx, workerPool, service, > acceptor); > } > } > \ No newline at end of file > > Modified: > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java > (original) > +++ > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java > Tue May 12 08:00:27 2009 > @@ -23,6 +23,7 @@ > import org.apache.axis2.description.AxisService; > import org.apache.axis2.description.Parameter; > import org.apache.axis2.transport.base.BaseUtils; > +import org.apache.axis2.transport.base.threads.WorkerPool; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import quickfix.*; > @@ -65,16 +66,29 @@ > /** A Map containing all the FIX applications created for initiators, > keyed by FIX EPR */ > private Map<String, Application> applicationStore; > /** An ApplicationFactory handles creating FIX Applications > (FIXIncomingMessageHandler Objects) */ > - private FIXApplicationFactory applicationFactory; > + private static FIXApplicationFactory applicationFactory = null; > + > + private WorkerPool listenerThreadPool; > + private WorkerPool senderThreadPool; > > private Log log; > > - public FIXSessionFactory(FIXApplicationFactory applicationFactory) { > - this.applicationFactory = applicationFactory; > + private static FIXSessionFactory INSTANCE = new FIXSessionFactory(); > + > + public static FIXSessionFactory getInstance(FIXApplicationFactory af) > { > + if (applicationFactory == null) { > + applicationFactory = af; > + } > + return INSTANCE; > + } > + > + private FIXSessionFactory() { > this.log = LogFactory.getLog(this.getClass()); > this.acceptorStore = new HashMap<String,Acceptor>(); > this.initiatorStore = new HashMap<String, Initiator>(); > this.applicationStore = new HashMap<String, Application>(); > + this.listenerThreadPool = null; > + this.senderThreadPool = null; > } > > /** > @@ -101,7 +115,7 @@ > MessageFactory messageFactory = new > DefaultMessageFactory(); > quickfix.LogFactory logFactory = getLogFactory(service, > settings, true); > //Get a new FIX Application > - Application messageHandler = > applicationFactory.getFIXApplication(service, true); > + Application messageHandler = > applicationFactory.getFIXApplication(service, listenerThreadPool, true); > //Create a new FIX Acceptor > Acceptor acceptor = new SocketAcceptor( > messageHandler, > @@ -174,7 +188,7 @@ > MessageStoreFactory storeFactory = getMessageStoreFactory(service, > settings, false); > MessageFactory messageFactory = new DefaultMessageFactory(); > //Get a new FIX application > - Application messageHandler = > applicationFactory.getFIXApplication(service, false); > + Application messageHandler = > applicationFactory.getFIXApplication(service, senderThreadPool, false); > > try { > //Create a new FIX initiator > @@ -216,7 +230,7 @@ > MessageFactory messageFactory = new > DefaultMessageFactory(); > quickfix.LogFactory logFactory = getLogFactory(service, > settings, true); > //Get a new FIX Application > - Application messageHandler = > applicationFactory.getFIXApplication(service, false); > + Application messageHandler = > applicationFactory.getFIXApplication(service, senderThreadPool, false); > > Initiator initiator = new SocketInitiator( > messageHandler, > @@ -246,10 +260,10 @@ > } > > } else { > - String msg = "The " + > FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + > - "not specified. Unable to initialize the initiator > session at this stage."; > - log.info(msg); > - throw new AxisFault(msg); > + // FIX initiator session is not configured > + // It could be intentional - So not an error (we don't need > initiators at all times) > + log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM > + " parameter is " + > + "not specified. Unable to initialize the initiator > session at this stage."); > } > } > > @@ -276,6 +290,24 @@ > } > > /** > + * Stops all the FIX initiators created so far and cleans up all the > mappings > + * related to them > + */ > + public void disposeFIXInitiators() { > + boolean debugEnabled = log.isDebugEnabled(); > + > + for (String key : initiatorStore.keySet()) { > + initiatorStore.get(key).stop(); > + if (debugEnabled) { > + log.debug("FIX initiator to the EPR " + key + " stopped"); > + } > + } > + > + initiatorStore.clear(); > + applicationStore.clear(); > + } > + > + /** > * Returns an array of Strings representing EPRs for the specified > service > * > * @param serviceName the name of the service > @@ -444,6 +476,14 @@ > } > return app; > } > + > + public void setListenerThreadPool(WorkerPool listenerThreadPool) { > + this.listenerThreadPool = listenerThreadPool; > + } > + > + public void setSenderThreadPool(WorkerPool senderThreadPool) { > + this.senderThreadPool = senderThreadPool; > + } > } > > > > Modified: > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java > (original) > +++ > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java > Tue May 12 08:00:27 2009 > @@ -60,14 +60,8 @@ > TransportInDescription trpInDesc) throws AxisFault { > > super.init(cfgCtx, trpInDesc); > - //initialize the FIXSessionFactory > - fixSessionFactory = new FIXSessionFactory( > - new FIXApplicationFactory(this.cfgCtx, this.workerPool)); > - FIXTransportSender sender = (FIXTransportSender) cfgCtx. > - > > getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender(); > - if (sender != null) { > - sender.setSessionFactory(fixSessionFactory); > - } > + fixSessionFactory = FIXSessionFactory.getInstance(new > FIXApplicationFactory(cfgCtx)); > + fixSessionFactory.setListenerThreadPool(this.workerPool); > log.info("FIX transport listener initialized..."); > } > > > Modified: > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java > (original) > +++ > synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java > Tue May 12 08:00:27 2009 > @@ -28,6 +28,8 @@ > import org.apache.axis2.transport.OutTransportInfo; > import org.apache.axis2.transport.base.AbstractTransportSender; > import org.apache.axis2.transport.base.BaseUtils; > +import org.apache.axis2.transport.base.threads.WorkerPool; > +import org.apache.axis2.transport.base.threads.WorkerPoolFactory; > import org.apache.commons.logging.LogFactory; > import quickfix.*; > import quickfix.field.*; > @@ -51,17 +53,12 @@ > > private FIXSessionFactory sessionFactory; > private FIXOutgoingMessageHandler messageSender; > + private WorkerPool workerPool; > > public FIXTransportSender() { > this.log = LogFactory.getLog(this.getClass()); > } > > - > - public void setSessionFactory(FIXSessionFactory sessionFactory) { > - this.sessionFactory = sessionFactory; > - this.messageSender.setSessionFactory(sessionFactory); > - } > - > /** > * @param cfgCtx the axis2 configuration context > * @param transportOut the Out Transport description > @@ -69,10 +66,25 @@ > */ > public void init(ConfigurationContext cfgCtx, TransportOutDescription > transportOut) throws AxisFault { > super.init(cfgCtx, transportOut); > + this.sessionFactory = FIXSessionFactory.getInstance(new > FIXApplicationFactory(cfgCtx)); > + this.workerPool = WorkerPoolFactory.getWorkerPool( > + 10, 20, 5, -1, "FIX Sender Worker thread > group", "FIX-Worker"); > + this.sessionFactory.setSenderThreadPool(this.workerPool); > messageSender = new FIXOutgoingMessageHandler(); > + messageSender.setSessionFactory(this.sessionFactory); > log.info("FIX transport sender initialized..."); > } > > + public void stop() { > + try { > + this.workerPool.shutdown(10000); > + } catch (InterruptedException e) { > + log.warn("Thread interrupted while waiting for worker pool to > shut down"); > + } > + sessionFactory.disposeFIXInitiators(); > + super.stop(); > + } > + > /** > * Performs the actual sending of the message. > * > > > -- Ruwan Linton Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb WSO2 Inc.; http://wso2.org email: [email protected]; cell: +94 77 341 3097 blog: http://ruwansblog.blogspot.com
