I prefer this to be in an optional properties file, where it is not a must to have this properties file, as in the nio http transport case.
This is because the thread pool configuration shouldn't be a normal configuration that the users are doing. If we put these configurations as a parameter in the axis2.xml that might lead people to think that it is a frequently configurable parameter. So I think the properties file approach is better than the parameter approach. Thanks, Ruwan On Wed, May 13, 2009 at 4:12 PM, Hiranya Jayathilaka <[email protected]>wrote: > > > On Wed, May 13, 2009 at 2:41 PM, Andreas Veithen < > [email protected]> wrote: > >> We definitely need to make the WorkerPool created by >> AbstractTransportListener configurable. The question whether this >> should be done using a property file or using parameters in >> TransportInDescription (as with all other configuration settings). >> What are the arguments in favor of doing it in a separate property >> file? > > > Nothing significant I guess. It might probably save us a few bytes of > memory since with a file based approach we don't really have to save those > parameters in the configuration context. But that's not a big motivating > facator IMO. May be the only plus point worth considering is that it's just > consistent with the existing approach for configuring thread pools. We > currently use nhttp.properties to configure the HTTP-NIO thread pool. > > But I'm ok with using transport parameters to do this. > > Thanks, > Hiranya > > >> >> Andreas >> >> On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <[email protected]> >> wrote: >> > >> > >> > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <[email protected]> >> > wrote: >> >> >> >> I think we need to make that configurable as well.... currently hard >> >> codded setting will work in 98% of the cases, but there can be a >> scenario >> >> where it requires a tune up. >> >> >> >> Can we do this in a manner that we can configure them per transport. >> > >> > One simple solution would be to read a transport specific configuration >> file >> > at AbstractTransportListener#init(). The init method gets a >> > TransportInDescription object as an argument and from that we can >> retrieve >> > the transport name to construct a file name unique to a given transport >> (eg: >> > mail.properties, vfs.properties). This approach has the benefit that it >> > doesn't require changes to any of the actual transport implementations. >> > Everything is taken care of by the abstract class. >> > >> > However this class now belongs to the WS-Commons transports project. So >> the >> > enhancement should be made there. >> > >> > Thanks, >> > Hiranya >> > >> > >> >> >> >> Thanks, >> >> Ruwan >> >> >> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka >> >> <[email protected]> wrote: >> >>> >> >>> Hi Ruwan, >> >>> >> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton < >> [email protected]> >> >>> wrote: >> >>>> >> >>>> Hiranya, >> >>>> >> >>>> If you can make the worker pool configurable that would be of much >> >>>> importance... you may have a look at the nhttp transport thread pool, >> which >> >>>> can be configurable via the nhttp.properties file. >> >>> >> >>> Currently the FIX sender initializes the WorkerPool in a manner >> similar >> >>> to the AbstractTransportListener. The WorkerPool in >> >>> AbstractTransportListener is used by several transports (JMS, Mail >> etc) via >> >>> inheritance. FIX listener also makes use of the same thread pool. Do >> we have >> >>> any plans to make that thread pool configurable too? Othrewise I don't >> think >> >>> it mkes much sense just to make the FIX sender's thread pool >> configurable. >> >>> >> >>> Thanks, >> >>> Hiranya >> >>> >> >>>> >> >>>> >> >>>> Thanks, >> >>>> Ruwan >> >>>> >> >>>> On Tue, May 12, 2009 at 1:30 PM, <[email protected]> wrote: >> >>>>> >> >>>>> Author: hiranya >> >>>>> Date: Tue May 12 08:00:27 2009 >> >>>>> New Revision: 773818 >> >>>>> >> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev >> >>>>> Log: >> >>>>> Enhancements and code cleanup in the FIX transport: >> >>>>> * FIX sender now has its own worker pool and hence does not rely on >> the >> >>>>> FIX listener any more. Therefore listener and sender can be enabled >> >>>>> individually >> >>>>> * Made FIXSessionFactory a singleton to effectively share session >> data >> >>>>> among the listener and the sender >> >>>>> * Cleanup logic for initiators during sender shutdown >> >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the >> >>>>> sample 259 and similar scenarios from operating properly >> >>>>> >> >>>>> Modified: >> >>>>> >> >>>>> >> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >> >>>>> >> >>>>> >> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >> >>>>> >> >>>>> >> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >> >>>>> >> >>>>> >> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >> >>>>> >> >>>>> Modified: >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >> >>>>> URL: >> >>>>> >> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff >> >>>>> >> >>>>> >> ============================================================================== >> >>>>> --- >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >> >>>>> (original) >> >>>>> +++ >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >> >>>>> Tue May 12 08:00:27 2009 >> >>>>> @@ -27,15 +27,12 @@ >> >>>>> public class FIXApplicationFactory { >> >>>>> >> >>>>> private ConfigurationContext cfgCtx; >> >>>>> - private WorkerPool workerPool; >> >>>>> - >> >>>>> - public FIXApplicationFactory(ConfigurationContext cfgCtx, >> >>>>> WorkerPool workerPool) { >> >>>>> >> >>>>> + public FIXApplicationFactory(ConfigurationContext cfgCtx) { >> >>>>> this.cfgCtx = cfgCtx; >> >>>>> - this.workerPool = workerPool; >> >>>>> } >> >>>>> >> >>>>> - public Application getFIXApplication(AxisService service, >> boolean >> >>>>> acceptor) { >> >>>>> + public Application getFIXApplication(AxisService service, >> >>>>> WorkerPool workerPool, boolean acceptor) { >> >>>>> return new FIXIncomingMessageHandler(cfgCtx, workerPool, >> >>>>> service, acceptor); >> >>>>> } >> >>>>> } >> >>>>> \ No newline at end of file >> >>>>> >> >>>>> Modified: >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >> >>>>> URL: >> >>>>> >> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff >> >>>>> >> >>>>> >> ============================================================================== >> >>>>> --- >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >> >>>>> (original) >> >>>>> +++ >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >> >>>>> Tue May 12 08:00:27 2009 >> >>>>> @@ -23,6 +23,7 @@ >> >>>>> import org.apache.axis2.description.AxisService; >> >>>>> import org.apache.axis2.description.Parameter; >> >>>>> import org.apache.axis2.transport.base.BaseUtils; >> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool; >> >>>>> import org.apache.commons.logging.Log; >> >>>>> import org.apache.commons.logging.LogFactory; >> >>>>> import quickfix.*; >> >>>>> @@ -65,16 +66,29 @@ >> >>>>> /** A Map containing all the FIX applications created for >> >>>>> initiators, keyed by FIX EPR */ >> >>>>> private Map<String, Application> applicationStore; >> >>>>> /** An ApplicationFactory handles creating FIX Applications >> >>>>> (FIXIncomingMessageHandler Objects) */ >> >>>>> - private FIXApplicationFactory applicationFactory; >> >>>>> + private static FIXApplicationFactory applicationFactory = null; >> >>>>> + >> >>>>> + private WorkerPool listenerThreadPool; >> >>>>> + private WorkerPool senderThreadPool; >> >>>>> >> >>>>> private Log log; >> >>>>> >> >>>>> - public FIXSessionFactory(FIXApplicationFactory >> applicationFactory) >> >>>>> { >> >>>>> - this.applicationFactory = applicationFactory; >> >>>>> + private static FIXSessionFactory INSTANCE = new >> >>>>> FIXSessionFactory(); >> >>>>> + >> >>>>> + public static FIXSessionFactory >> getInstance(FIXApplicationFactory >> >>>>> af) { >> >>>>> + if (applicationFactory == null) { >> >>>>> + applicationFactory = af; >> >>>>> + } >> >>>>> + return INSTANCE; >> >>>>> + } >> >>>>> + >> >>>>> + private FIXSessionFactory() { >> >>>>> this.log = LogFactory.getLog(this.getClass()); >> >>>>> this.acceptorStore = new HashMap<String,Acceptor>(); >> >>>>> this.initiatorStore = new HashMap<String, Initiator>(); >> >>>>> this.applicationStore = new HashMap<String, Application>(); >> >>>>> + this.listenerThreadPool = null; >> >>>>> + this.senderThreadPool = null; >> >>>>> } >> >>>>> >> >>>>> /** >> >>>>> @@ -101,7 +115,7 @@ >> >>>>> MessageFactory messageFactory = new >> >>>>> DefaultMessageFactory(); >> >>>>> quickfix.LogFactory logFactory = >> getLogFactory(service, >> >>>>> settings, true); >> >>>>> //Get a new FIX Application >> >>>>> - Application messageHandler = >> >>>>> applicationFactory.getFIXApplication(service, true); >> >>>>> + Application messageHandler = >> >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool, >> true); >> >>>>> //Create a new FIX Acceptor >> >>>>> Acceptor acceptor = new SocketAcceptor( >> >>>>> messageHandler, >> >>>>> @@ -174,7 +188,7 @@ >> >>>>> MessageStoreFactory storeFactory = >> >>>>> getMessageStoreFactory(service, settings, false); >> >>>>> MessageFactory messageFactory = new DefaultMessageFactory(); >> >>>>> //Get a new FIX application >> >>>>> - Application messageHandler = >> >>>>> applicationFactory.getFIXApplication(service, false); >> >>>>> + Application messageHandler = >> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool, >> false); >> >>>>> >> >>>>> try { >> >>>>> //Create a new FIX initiator >> >>>>> @@ -216,7 +230,7 @@ >> >>>>> MessageFactory messageFactory = new >> >>>>> DefaultMessageFactory(); >> >>>>> quickfix.LogFactory logFactory = >> getLogFactory(service, >> >>>>> settings, true); >> >>>>> //Get a new FIX Application >> >>>>> - Application messageHandler = >> >>>>> applicationFactory.getFIXApplication(service, false); >> >>>>> + Application messageHandler = >> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool, >> false); >> >>>>> >> >>>>> Initiator initiator = new SocketInitiator( >> >>>>> messageHandler, >> >>>>> @@ -246,10 +260,10 @@ >> >>>>> } >> >>>>> >> >>>>> } else { >> >>>>> - String msg = "The " + >> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + >> >>>>> - "not specified. Unable to initialize the >> initiator >> >>>>> session at this stage."; >> >>>>> - log.info(msg); >> >>>>> - throw new AxisFault(msg); >> >>>>> + // FIX initiator session is not configured >> >>>>> + // It could be intentional - So not an error (we don't >> >>>>> need initiators at all times) >> >>>>> + log.info("The " + >> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + >> >>>>> + "not specified. Unable to initialize the >> initiator >> >>>>> session at this stage."); >> >>>>> } >> >>>>> } >> >>>>> >> >>>>> @@ -276,6 +290,24 @@ >> >>>>> } >> >>>>> >> >>>>> /** >> >>>>> + * Stops all the FIX initiators created so far and cleans up >> all >> >>>>> the mappings >> >>>>> + * related to them >> >>>>> + */ >> >>>>> + public void disposeFIXInitiators() { >> >>>>> + boolean debugEnabled = log.isDebugEnabled(); >> >>>>> + >> >>>>> + for (String key : initiatorStore.keySet()) { >> >>>>> + initiatorStore.get(key).stop(); >> >>>>> + if (debugEnabled) { >> >>>>> + log.debug("FIX initiator to the EPR " + key + " >> >>>>> stopped"); >> >>>>> + } >> >>>>> + } >> >>>>> + >> >>>>> + initiatorStore.clear(); >> >>>>> + applicationStore.clear(); >> >>>>> + } >> >>>>> + >> >>>>> + /** >> >>>>> * Returns an array of Strings representing EPRs for the >> specified >> >>>>> service >> >>>>> * >> >>>>> * @param serviceName the name of the service >> >>>>> @@ -444,6 +476,14 @@ >> >>>>> } >> >>>>> return app; >> >>>>> } >> >>>>> + >> >>>>> + public void setListenerThreadPool(WorkerPool >> listenerThreadPool) { >> >>>>> + this.listenerThreadPool = listenerThreadPool; >> >>>>> + } >> >>>>> + >> >>>>> + public void setSenderThreadPool(WorkerPool senderThreadPool) { >> >>>>> + this.senderThreadPool = senderThreadPool; >> >>>>> + } >> >>>>> } >> >>>>> >> >>>>> >> >>>>> >> >>>>> Modified: >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >> >>>>> URL: >> >>>>> >> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff >> >>>>> >> >>>>> >> ============================================================================== >> >>>>> --- >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >> >>>>> (original) >> >>>>> +++ >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >> >>>>> Tue May 12 08:00:27 2009 >> >>>>> @@ -60,14 +60,8 @@ >> >>>>> TransportInDescription trpInDesc) throws >> AxisFault >> >>>>> { >> >>>>> >> >>>>> super.init(cfgCtx, trpInDesc); >> >>>>> - //initialize the FIXSessionFactory >> >>>>> - fixSessionFactory = new FIXSessionFactory( >> >>>>> - new FIXApplicationFactory(this.cfgCtx, >> >>>>> this.workerPool)); >> >>>>> - FIXTransportSender sender = (FIXTransportSender) cfgCtx. >> >>>>> - >> >>>>> >> >> getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender(); >> >>>>> - if (sender != null) { >> >>>>> - sender.setSessionFactory(fixSessionFactory); >> >>>>> - } >> >>>>> + fixSessionFactory = FIXSessionFactory.getInstance(new >> >>>>> FIXApplicationFactory(cfgCtx)); >> >>>>> + fixSessionFactory.setListenerThreadPool(this.workerPool); >> >>>>> log.info("FIX transport listener initialized..."); >> >>>>> } >> >>>>> >> >>>>> >> >>>>> Modified: >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >> >>>>> URL: >> >>>>> >> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff >> >>>>> >> >>>>> >> ============================================================================== >> >>>>> --- >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >> >>>>> (original) >> >>>>> +++ >> >>>>> >> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >> >>>>> Tue May 12 08:00:27 2009 >> >>>>> @@ -28,6 +28,8 @@ >> >>>>> import org.apache.axis2.transport.OutTransportInfo; >> >>>>> import org.apache.axis2.transport.base.AbstractTransportSender; >> >>>>> import org.apache.axis2.transport.base.BaseUtils; >> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool; >> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory; >> >>>>> import org.apache.commons.logging.LogFactory; >> >>>>> import quickfix.*; >> >>>>> import quickfix.field.*; >> >>>>> @@ -51,17 +53,12 @@ >> >>>>> >> >>>>> private FIXSessionFactory sessionFactory; >> >>>>> private FIXOutgoingMessageHandler messageSender; >> >>>>> + private WorkerPool workerPool; >> >>>>> >> >>>>> public FIXTransportSender() { >> >>>>> this.log = LogFactory.getLog(this.getClass()); >> >>>>> } >> >>>>> >> >>>>> - >> >>>>> - public void setSessionFactory(FIXSessionFactory sessionFactory) >> { >> >>>>> - this.sessionFactory = sessionFactory; >> >>>>> - this.messageSender.setSessionFactory(sessionFactory); >> >>>>> - } >> >>>>> - >> >>>>> /** >> >>>>> * @param cfgCtx the axis2 configuration context >> >>>>> * @param transportOut the Out Transport description >> >>>>> @@ -69,10 +66,25 @@ >> >>>>> */ >> >>>>> public void init(ConfigurationContext cfgCtx, >> >>>>> TransportOutDescription transportOut) throws AxisFault { >> >>>>> super.init(cfgCtx, transportOut); >> >>>>> + this.sessionFactory = FIXSessionFactory.getInstance(new >> >>>>> FIXApplicationFactory(cfgCtx)); >> >>>>> + this.workerPool = WorkerPoolFactory.getWorkerPool( >> >>>>> + 10, 20, 5, -1, "FIX Sender Worker >> thread >> >>>>> group", "FIX-Worker"); >> >>>>> + this.sessionFactory.setSenderThreadPool(this.workerPool); >> >>>>> messageSender = new FIXOutgoingMessageHandler(); >> >>>>> + messageSender.setSessionFactory(this.sessionFactory); >> >>>>> log.info("FIX transport sender initialized..."); >> >>>>> } >> >>>>> >> >>>>> + public void stop() { >> >>>>> + try { >> >>>>> + this.workerPool.shutdown(10000); >> >>>>> + } catch (InterruptedException e) { >> >>>>> + log.warn("Thread interrupted while waiting for worker >> pool >> >>>>> to shut down"); >> >>>>> + } >> >>>>> + sessionFactory.disposeFIXInitiators(); >> >>>>> + super.stop(); >> >>>>> + } >> >>>>> + >> >>>>> /** >> >>>>> * Performs the actual sending of the message. >> >>>>> * >> >>>>> >> >>>>> >> >>>> >> >>>> >> >>>> >> >>>> -- >> >>>> Ruwan Linton >> >>>> Senior Software Engineer & Product Manager; WSO2 ESB; >> >>>> http://wso2.org/esb >> >>>> WSO2 Inc.; http://wso2.org >> >>>> email: [email protected]; cell: +94 77 341 3097 >> >>>> blog: http://ruwansblog.blogspot.com >> >>> >> >>> >> >>> >> >>> -- >> >>> Hiranya Jayathilaka >> >>> Software Engineer; >> >>> WSO2 Inc.; http://wso2.org >> >>> E-mail: [email protected]; Mobile: +94 77 633 3491 >> >>> Blog: http://techfeast-hiranya.blogspot.com >> >> >> >> >> >> >> >> -- >> >> Ruwan Linton >> >> Senior Software Engineer & Product Manager; WSO2 ESB; >> http://wso2.org/esb >> >> WSO2 Inc.; http://wso2.org >> >> email: [email protected]; cell: +94 77 341 3097 >> >> blog: http://ruwansblog.blogspot.com >> > >> > >> > >> > -- >> > Hiranya Jayathilaka >> > Software Engineer; >> > WSO2 Inc.; http://wso2.org >> > E-mail: [email protected]; Mobile: +94 77 633 3491 >> > Blog: http://techfeast-hiranya.blogspot.com >> > >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> > > > -- > Hiranya Jayathilaka > Software Engineer; > WSO2 Inc.; http://wso2.org > E-mail: [email protected]; Mobile: +94 77 633 3491 > Blog: http://techfeast-hiranya.blogspot.com > -- Ruwan Linton Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb WSO2 Inc.; http://wso2.org email: [email protected]; cell: +94 77 341 3097 blog: http://ruwansblog.blogspot.com
