I think we need to make that configurable as well.... currently hard codded setting will work in 98% of the cases, but there can be a scenario where it requires a tune up.
Can we do this in a manner that we can configure them per transport. Thanks, Ruwan On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka <[email protected]>wrote: > Hi Ruwan, > > On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <[email protected]>wrote: > >> Hiranya, >> >> If you can make the worker pool configurable that would be of much >> importance... you may have a look at the nhttp transport thread pool, which >> can be configurable via the nhttp.properties file. > > > Currently the FIX sender initializes the WorkerPool in a manner similar to > the AbstractTransportListener. The WorkerPool in AbstractTransportListener > is used by several transports (JMS, Mail etc) via inheritance. FIX listener > also makes use of the same thread pool. Do we have any plans to make that > thread pool configurable too? Othrewise I don't think it mkes much sense > just to make the FIX sender's thread pool configurable. > > Thanks, > Hiranya > > >> >> Thanks, >> Ruwan >> >> On Tue, May 12, 2009 at 1:30 PM, <[email protected]> wrote: >> >>> Author: hiranya >>> Date: Tue May 12 08:00:27 2009 >>> New Revision: 773818 >>> >>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev >>> Log: >>> Enhancements and code cleanup in the FIX transport: >>> * FIX sender now has its own worker pool and hence does not rely on the >>> FIX listener any more. Therefore listener and sender can be enabled >>> individually >>> * Made FIXSessionFactory a singleton to effectively share session data >>> among the listener and the sender >>> * Cleanup logic for initiators during sender shutdown >>> * Minor bug fix at FIXSessionFactory for a bug which prevented the sample >>> 259 and similar scenarios from operating properly >>> >>> Modified: >>> >>> >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>> >>> >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>> >>> >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>> >>> >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>> >>> Modified: >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>> URL: >>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff >>> >>> ============================================================================== >>> --- >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>> (original) >>> +++ >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>> Tue May 12 08:00:27 2009 >>> @@ -27,15 +27,12 @@ >>> public class FIXApplicationFactory { >>> >>> private ConfigurationContext cfgCtx; >>> - private WorkerPool workerPool; >>> - >>> - public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool >>> workerPool) { >>> >>> + public FIXApplicationFactory(ConfigurationContext cfgCtx) { >>> this.cfgCtx = cfgCtx; >>> - this.workerPool = workerPool; >>> } >>> >>> - public Application getFIXApplication(AxisService service, boolean >>> acceptor) { >>> + public Application getFIXApplication(AxisService service, WorkerPool >>> workerPool, boolean acceptor) { >>> return new FIXIncomingMessageHandler(cfgCtx, workerPool, service, >>> acceptor); >>> } >>> } >>> \ No newline at end of file >>> >>> Modified: >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>> URL: >>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff >>> >>> ============================================================================== >>> --- >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>> (original) >>> +++ >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>> Tue May 12 08:00:27 2009 >>> @@ -23,6 +23,7 @@ >>> import org.apache.axis2.description.AxisService; >>> import org.apache.axis2.description.Parameter; >>> import org.apache.axis2.transport.base.BaseUtils; >>> +import org.apache.axis2.transport.base.threads.WorkerPool; >>> import org.apache.commons.logging.Log; >>> import org.apache.commons.logging.LogFactory; >>> import quickfix.*; >>> @@ -65,16 +66,29 @@ >>> /** A Map containing all the FIX applications created for initiators, >>> keyed by FIX EPR */ >>> private Map<String, Application> applicationStore; >>> /** An ApplicationFactory handles creating FIX Applications >>> (FIXIncomingMessageHandler Objects) */ >>> - private FIXApplicationFactory applicationFactory; >>> + private static FIXApplicationFactory applicationFactory = null; >>> + >>> + private WorkerPool listenerThreadPool; >>> + private WorkerPool senderThreadPool; >>> >>> private Log log; >>> >>> - public FIXSessionFactory(FIXApplicationFactory applicationFactory) { >>> - this.applicationFactory = applicationFactory; >>> + private static FIXSessionFactory INSTANCE = new FIXSessionFactory(); >>> + >>> + public static FIXSessionFactory getInstance(FIXApplicationFactory >>> af) { >>> + if (applicationFactory == null) { >>> + applicationFactory = af; >>> + } >>> + return INSTANCE; >>> + } >>> + >>> + private FIXSessionFactory() { >>> this.log = LogFactory.getLog(this.getClass()); >>> this.acceptorStore = new HashMap<String,Acceptor>(); >>> this.initiatorStore = new HashMap<String, Initiator>(); >>> this.applicationStore = new HashMap<String, Application>(); >>> + this.listenerThreadPool = null; >>> + this.senderThreadPool = null; >>> } >>> >>> /** >>> @@ -101,7 +115,7 @@ >>> MessageFactory messageFactory = new >>> DefaultMessageFactory(); >>> quickfix.LogFactory logFactory = getLogFactory(service, >>> settings, true); >>> //Get a new FIX Application >>> - Application messageHandler = >>> applicationFactory.getFIXApplication(service, true); >>> + Application messageHandler = >>> applicationFactory.getFIXApplication(service, listenerThreadPool, true); >>> //Create a new FIX Acceptor >>> Acceptor acceptor = new SocketAcceptor( >>> messageHandler, >>> @@ -174,7 +188,7 @@ >>> MessageStoreFactory storeFactory = >>> getMessageStoreFactory(service, settings, false); >>> MessageFactory messageFactory = new DefaultMessageFactory(); >>> //Get a new FIX application >>> - Application messageHandler = >>> applicationFactory.getFIXApplication(service, false); >>> + Application messageHandler = >>> applicationFactory.getFIXApplication(service, senderThreadPool, false); >>> >>> try { >>> //Create a new FIX initiator >>> @@ -216,7 +230,7 @@ >>> MessageFactory messageFactory = new >>> DefaultMessageFactory(); >>> quickfix.LogFactory logFactory = getLogFactory(service, >>> settings, true); >>> //Get a new FIX Application >>> - Application messageHandler = >>> applicationFactory.getFIXApplication(service, false); >>> + Application messageHandler = >>> applicationFactory.getFIXApplication(service, senderThreadPool, false); >>> >>> Initiator initiator = new SocketInitiator( >>> messageHandler, >>> @@ -246,10 +260,10 @@ >>> } >>> >>> } else { >>> - String msg = "The " + >>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + >>> - "not specified. Unable to initialize the initiator >>> session at this stage."; >>> - log.info(msg); >>> - throw new AxisFault(msg); >>> + // FIX initiator session is not configured >>> + // It could be intentional - So not an error (we don't need >>> initiators at all times) >>> + log.info("The " + >>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + >>> + "not specified. Unable to initialize the initiator >>> session at this stage."); >>> } >>> } >>> >>> @@ -276,6 +290,24 @@ >>> } >>> >>> /** >>> + * Stops all the FIX initiators created so far and cleans up all the >>> mappings >>> + * related to them >>> + */ >>> + public void disposeFIXInitiators() { >>> + boolean debugEnabled = log.isDebugEnabled(); >>> + >>> + for (String key : initiatorStore.keySet()) { >>> + initiatorStore.get(key).stop(); >>> + if (debugEnabled) { >>> + log.debug("FIX initiator to the EPR " + key + " >>> stopped"); >>> + } >>> + } >>> + >>> + initiatorStore.clear(); >>> + applicationStore.clear(); >>> + } >>> + >>> + /** >>> * Returns an array of Strings representing EPRs for the specified >>> service >>> * >>> * @param serviceName the name of the service >>> @@ -444,6 +476,14 @@ >>> } >>> return app; >>> } >>> + >>> + public void setListenerThreadPool(WorkerPool listenerThreadPool) { >>> + this.listenerThreadPool = listenerThreadPool; >>> + } >>> + >>> + public void setSenderThreadPool(WorkerPool senderThreadPool) { >>> + this.senderThreadPool = senderThreadPool; >>> + } >>> } >>> >>> >>> >>> Modified: >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>> URL: >>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff >>> >>> ============================================================================== >>> --- >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>> (original) >>> +++ >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>> Tue May 12 08:00:27 2009 >>> @@ -60,14 +60,8 @@ >>> TransportInDescription trpInDesc) throws AxisFault { >>> >>> super.init(cfgCtx, trpInDesc); >>> - //initialize the FIXSessionFactory >>> - fixSessionFactory = new FIXSessionFactory( >>> - new FIXApplicationFactory(this.cfgCtx, >>> this.workerPool)); >>> - FIXTransportSender sender = (FIXTransportSender) cfgCtx. >>> - >>> >>> getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender(); >>> - if (sender != null) { >>> - sender.setSessionFactory(fixSessionFactory); >>> - } >>> + fixSessionFactory = FIXSessionFactory.getInstance(new >>> FIXApplicationFactory(cfgCtx)); >>> + fixSessionFactory.setListenerThreadPool(this.workerPool); >>> log.info("FIX transport listener initialized..."); >>> } >>> >>> >>> Modified: >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>> URL: >>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff >>> >>> ============================================================================== >>> --- >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>> (original) >>> +++ >>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>> Tue May 12 08:00:27 2009 >>> @@ -28,6 +28,8 @@ >>> import org.apache.axis2.transport.OutTransportInfo; >>> import org.apache.axis2.transport.base.AbstractTransportSender; >>> import org.apache.axis2.transport.base.BaseUtils; >>> +import org.apache.axis2.transport.base.threads.WorkerPool; >>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory; >>> import org.apache.commons.logging.LogFactory; >>> import quickfix.*; >>> import quickfix.field.*; >>> @@ -51,17 +53,12 @@ >>> >>> private FIXSessionFactory sessionFactory; >>> private FIXOutgoingMessageHandler messageSender; >>> + private WorkerPool workerPool; >>> >>> public FIXTransportSender() { >>> this.log = LogFactory.getLog(this.getClass()); >>> } >>> >>> - >>> - public void setSessionFactory(FIXSessionFactory sessionFactory) { >>> - this.sessionFactory = sessionFactory; >>> - this.messageSender.setSessionFactory(sessionFactory); >>> - } >>> - >>> /** >>> * @param cfgCtx the axis2 configuration context >>> * @param transportOut the Out Transport description >>> @@ -69,10 +66,25 @@ >>> */ >>> public void init(ConfigurationContext cfgCtx, TransportOutDescription >>> transportOut) throws AxisFault { >>> super.init(cfgCtx, transportOut); >>> + this.sessionFactory = FIXSessionFactory.getInstance(new >>> FIXApplicationFactory(cfgCtx)); >>> + this.workerPool = WorkerPoolFactory.getWorkerPool( >>> + 10, 20, 5, -1, "FIX Sender Worker thread >>> group", "FIX-Worker"); >>> + this.sessionFactory.setSenderThreadPool(this.workerPool); >>> messageSender = new FIXOutgoingMessageHandler(); >>> + messageSender.setSessionFactory(this.sessionFactory); >>> log.info("FIX transport sender initialized..."); >>> } >>> >>> + public void stop() { >>> + try { >>> + this.workerPool.shutdown(10000); >>> + } catch (InterruptedException e) { >>> + log.warn("Thread interrupted while waiting for worker pool >>> to shut down"); >>> + } >>> + sessionFactory.disposeFIXInitiators(); >>> + super.stop(); >>> + } >>> + >>> /** >>> * Performs the actual sending of the message. >>> * >>> >>> >>> >> >> >> -- >> Ruwan Linton >> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb >> WSO2 Inc.; http://wso2.org >> email: [email protected]; cell: +94 77 341 3097 >> blog: http://ruwansblog.blogspot.com >> > > > > -- > Hiranya Jayathilaka > Software Engineer; > WSO2 Inc.; http://wso2.org > E-mail: [email protected]; Mobile: +94 77 633 3491 > Blog: http://techfeast-hiranya.blogspot.com > -- Ruwan Linton Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb WSO2 Inc.; http://wso2.org email: [email protected]; cell: +94 77 341 3097 blog: http://ruwansblog.blogspot.com
