We definitely need to make the WorkerPool created by AbstractTransportListener configurable. The question whether this should be done using a property file or using parameters in TransportInDescription (as with all other configuration settings). What are the arguments in favor of doing it in a separate property file?
Andreas On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <[email protected]> wrote: > > > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <[email protected]> > wrote: >> >> I think we need to make that configurable as well.... currently hard >> codded setting will work in 98% of the cases, but there can be a scenario >> where it requires a tune up. >> >> Can we do this in a manner that we can configure them per transport. > > One simple solution would be to read a transport specific configuration file > at AbstractTransportListener#init(). The init method gets a > TransportInDescription object as an argument and from that we can retrieve > the transport name to construct a file name unique to a given transport (eg: > mail.properties, vfs.properties). This approach has the benefit that it > doesn't require changes to any of the actual transport implementations. > Everything is taken care of by the abstract class. > > However this class now belongs to the WS-Commons transports project. So the > enhancement should be made there. > > Thanks, > Hiranya > > >> >> Thanks, >> Ruwan >> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka >> <[email protected]> wrote: >>> >>> Hi Ruwan, >>> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <[email protected]> >>> wrote: >>>> >>>> Hiranya, >>>> >>>> If you can make the worker pool configurable that would be of much >>>> importance... you may have a look at the nhttp transport thread pool, which >>>> can be configurable via the nhttp.properties file. >>> >>> Currently the FIX sender initializes the WorkerPool in a manner similar >>> to the AbstractTransportListener. The WorkerPool in >>> AbstractTransportListener is used by several transports (JMS, Mail etc) via >>> inheritance. FIX listener also makes use of the same thread pool. Do we have >>> any plans to make that thread pool configurable too? Othrewise I don't think >>> it mkes much sense just to make the FIX sender's thread pool configurable. >>> >>> Thanks, >>> Hiranya >>> >>>> >>>> >>>> Thanks, >>>> Ruwan >>>> >>>> On Tue, May 12, 2009 at 1:30 PM, <[email protected]> wrote: >>>>> >>>>> Author: hiranya >>>>> Date: Tue May 12 08:00:27 2009 >>>>> New Revision: 773818 >>>>> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev >>>>> Log: >>>>> Enhancements and code cleanup in the FIX transport: >>>>> * FIX sender now has its own worker pool and hence does not rely on the >>>>> FIX listener any more. Therefore listener and sender can be enabled >>>>> individually >>>>> * Made FIXSessionFactory a singleton to effectively share session data >>>>> among the listener and the sender >>>>> * Cleanup logic for initiators during sender shutdown >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the >>>>> sample 259 and similar scenarios from operating properly >>>>> >>>>> Modified: >>>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>>>> >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>>>> >>>>> Modified: >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff >>>>> >>>>> ============================================================================== >>>>> --- >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>>>> (original) >>>>> +++ >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java >>>>> Tue May 12 08:00:27 2009 >>>>> @@ -27,15 +27,12 @@ >>>>> public class FIXApplicationFactory { >>>>> >>>>> private ConfigurationContext cfgCtx; >>>>> - private WorkerPool workerPool; >>>>> - >>>>> - public FIXApplicationFactory(ConfigurationContext cfgCtx, >>>>> WorkerPool workerPool) { >>>>> >>>>> + public FIXApplicationFactory(ConfigurationContext cfgCtx) { >>>>> this.cfgCtx = cfgCtx; >>>>> - this.workerPool = workerPool; >>>>> } >>>>> >>>>> - public Application getFIXApplication(AxisService service, boolean >>>>> acceptor) { >>>>> + public Application getFIXApplication(AxisService service, >>>>> WorkerPool workerPool, boolean acceptor) { >>>>> return new FIXIncomingMessageHandler(cfgCtx, workerPool, >>>>> service, acceptor); >>>>> } >>>>> } >>>>> \ No newline at end of file >>>>> >>>>> Modified: >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff >>>>> >>>>> ============================================================================== >>>>> --- >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>>>> (original) >>>>> +++ >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java >>>>> Tue May 12 08:00:27 2009 >>>>> @@ -23,6 +23,7 @@ >>>>> import org.apache.axis2.description.AxisService; >>>>> import org.apache.axis2.description.Parameter; >>>>> import org.apache.axis2.transport.base.BaseUtils; >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool; >>>>> import org.apache.commons.logging.Log; >>>>> import org.apache.commons.logging.LogFactory; >>>>> import quickfix.*; >>>>> @@ -65,16 +66,29 @@ >>>>> /** A Map containing all the FIX applications created for >>>>> initiators, keyed by FIX EPR */ >>>>> private Map<String, Application> applicationStore; >>>>> /** An ApplicationFactory handles creating FIX Applications >>>>> (FIXIncomingMessageHandler Objects) */ >>>>> - private FIXApplicationFactory applicationFactory; >>>>> + private static FIXApplicationFactory applicationFactory = null; >>>>> + >>>>> + private WorkerPool listenerThreadPool; >>>>> + private WorkerPool senderThreadPool; >>>>> >>>>> private Log log; >>>>> >>>>> - public FIXSessionFactory(FIXApplicationFactory applicationFactory) >>>>> { >>>>> - this.applicationFactory = applicationFactory; >>>>> + private static FIXSessionFactory INSTANCE = new >>>>> FIXSessionFactory(); >>>>> + >>>>> + public static FIXSessionFactory getInstance(FIXApplicationFactory >>>>> af) { >>>>> + if (applicationFactory == null) { >>>>> + applicationFactory = af; >>>>> + } >>>>> + return INSTANCE; >>>>> + } >>>>> + >>>>> + private FIXSessionFactory() { >>>>> this.log = LogFactory.getLog(this.getClass()); >>>>> this.acceptorStore = new HashMap<String,Acceptor>(); >>>>> this.initiatorStore = new HashMap<String, Initiator>(); >>>>> this.applicationStore = new HashMap<String, Application>(); >>>>> + this.listenerThreadPool = null; >>>>> + this.senderThreadPool = null; >>>>> } >>>>> >>>>> /** >>>>> @@ -101,7 +115,7 @@ >>>>> MessageFactory messageFactory = new >>>>> DefaultMessageFactory(); >>>>> quickfix.LogFactory logFactory = getLogFactory(service, >>>>> settings, true); >>>>> //Get a new FIX Application >>>>> - Application messageHandler = >>>>> applicationFactory.getFIXApplication(service, true); >>>>> + Application messageHandler = >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool, true); >>>>> //Create a new FIX Acceptor >>>>> Acceptor acceptor = new SocketAcceptor( >>>>> messageHandler, >>>>> @@ -174,7 +188,7 @@ >>>>> MessageStoreFactory storeFactory = >>>>> getMessageStoreFactory(service, settings, false); >>>>> MessageFactory messageFactory = new DefaultMessageFactory(); >>>>> //Get a new FIX application >>>>> - Application messageHandler = >>>>> applicationFactory.getFIXApplication(service, false); >>>>> + Application messageHandler = >>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false); >>>>> >>>>> try { >>>>> //Create a new FIX initiator >>>>> @@ -216,7 +230,7 @@ >>>>> MessageFactory messageFactory = new >>>>> DefaultMessageFactory(); >>>>> quickfix.LogFactory logFactory = getLogFactory(service, >>>>> settings, true); >>>>> //Get a new FIX Application >>>>> - Application messageHandler = >>>>> applicationFactory.getFIXApplication(service, false); >>>>> + Application messageHandler = >>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false); >>>>> >>>>> Initiator initiator = new SocketInitiator( >>>>> messageHandler, >>>>> @@ -246,10 +260,10 @@ >>>>> } >>>>> >>>>> } else { >>>>> - String msg = "The " + >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + >>>>> - "not specified. Unable to initialize the initiator >>>>> session at this stage."; >>>>> - log.info(msg); >>>>> - throw new AxisFault(msg); >>>>> + // FIX initiator session is not configured >>>>> + // It could be intentional - So not an error (we don't >>>>> need initiators at all times) >>>>> + log.info("The " + >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + >>>>> + "not specified. Unable to initialize the initiator >>>>> session at this stage."); >>>>> } >>>>> } >>>>> >>>>> @@ -276,6 +290,24 @@ >>>>> } >>>>> >>>>> /** >>>>> + * Stops all the FIX initiators created so far and cleans up all >>>>> the mappings >>>>> + * related to them >>>>> + */ >>>>> + public void disposeFIXInitiators() { >>>>> + boolean debugEnabled = log.isDebugEnabled(); >>>>> + >>>>> + for (String key : initiatorStore.keySet()) { >>>>> + initiatorStore.get(key).stop(); >>>>> + if (debugEnabled) { >>>>> + log.debug("FIX initiator to the EPR " + key + " >>>>> stopped"); >>>>> + } >>>>> + } >>>>> + >>>>> + initiatorStore.clear(); >>>>> + applicationStore.clear(); >>>>> + } >>>>> + >>>>> + /** >>>>> * Returns an array of Strings representing EPRs for the specified >>>>> service >>>>> * >>>>> * @param serviceName the name of the service >>>>> @@ -444,6 +476,14 @@ >>>>> } >>>>> return app; >>>>> } >>>>> + >>>>> + public void setListenerThreadPool(WorkerPool listenerThreadPool) { >>>>> + this.listenerThreadPool = listenerThreadPool; >>>>> + } >>>>> + >>>>> + public void setSenderThreadPool(WorkerPool senderThreadPool) { >>>>> + this.senderThreadPool = senderThreadPool; >>>>> + } >>>>> } >>>>> >>>>> >>>>> >>>>> Modified: >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff >>>>> >>>>> ============================================================================== >>>>> --- >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>>>> (original) >>>>> +++ >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java >>>>> Tue May 12 08:00:27 2009 >>>>> @@ -60,14 +60,8 @@ >>>>> TransportInDescription trpInDesc) throws AxisFault >>>>> { >>>>> >>>>> super.init(cfgCtx, trpInDesc); >>>>> - //initialize the FIXSessionFactory >>>>> - fixSessionFactory = new FIXSessionFactory( >>>>> - new FIXApplicationFactory(this.cfgCtx, >>>>> this.workerPool)); >>>>> - FIXTransportSender sender = (FIXTransportSender) cfgCtx. >>>>> - >>>>> getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender(); >>>>> - if (sender != null) { >>>>> - sender.setSessionFactory(fixSessionFactory); >>>>> - } >>>>> + fixSessionFactory = FIXSessionFactory.getInstance(new >>>>> FIXApplicationFactory(cfgCtx)); >>>>> + fixSessionFactory.setListenerThreadPool(this.workerPool); >>>>> log.info("FIX transport listener initialized..."); >>>>> } >>>>> >>>>> >>>>> Modified: >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>>>> URL: >>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff >>>>> >>>>> ============================================================================== >>>>> --- >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>>>> (original) >>>>> +++ >>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java >>>>> Tue May 12 08:00:27 2009 >>>>> @@ -28,6 +28,8 @@ >>>>> import org.apache.axis2.transport.OutTransportInfo; >>>>> import org.apache.axis2.transport.base.AbstractTransportSender; >>>>> import org.apache.axis2.transport.base.BaseUtils; >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool; >>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory; >>>>> import org.apache.commons.logging.LogFactory; >>>>> import quickfix.*; >>>>> import quickfix.field.*; >>>>> @@ -51,17 +53,12 @@ >>>>> >>>>> private FIXSessionFactory sessionFactory; >>>>> private FIXOutgoingMessageHandler messageSender; >>>>> + private WorkerPool workerPool; >>>>> >>>>> public FIXTransportSender() { >>>>> this.log = LogFactory.getLog(this.getClass()); >>>>> } >>>>> >>>>> - >>>>> - public void setSessionFactory(FIXSessionFactory sessionFactory) { >>>>> - this.sessionFactory = sessionFactory; >>>>> - this.messageSender.setSessionFactory(sessionFactory); >>>>> - } >>>>> - >>>>> /** >>>>> * @param cfgCtx the axis2 configuration context >>>>> * @param transportOut the Out Transport description >>>>> @@ -69,10 +66,25 @@ >>>>> */ >>>>> public void init(ConfigurationContext cfgCtx, >>>>> TransportOutDescription transportOut) throws AxisFault { >>>>> super.init(cfgCtx, transportOut); >>>>> + this.sessionFactory = FIXSessionFactory.getInstance(new >>>>> FIXApplicationFactory(cfgCtx)); >>>>> + this.workerPool = WorkerPoolFactory.getWorkerPool( >>>>> + 10, 20, 5, -1, "FIX Sender Worker thread >>>>> group", "FIX-Worker"); >>>>> + this.sessionFactory.setSenderThreadPool(this.workerPool); >>>>> messageSender = new FIXOutgoingMessageHandler(); >>>>> + messageSender.setSessionFactory(this.sessionFactory); >>>>> log.info("FIX transport sender initialized..."); >>>>> } >>>>> >>>>> + public void stop() { >>>>> + try { >>>>> + this.workerPool.shutdown(10000); >>>>> + } catch (InterruptedException e) { >>>>> + log.warn("Thread interrupted while waiting for worker pool >>>>> to shut down"); >>>>> + } >>>>> + sessionFactory.disposeFIXInitiators(); >>>>> + super.stop(); >>>>> + } >>>>> + >>>>> /** >>>>> * Performs the actual sending of the message. >>>>> * >>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> Ruwan Linton >>>> Senior Software Engineer & Product Manager; WSO2 ESB; >>>> http://wso2.org/esb >>>> WSO2 Inc.; http://wso2.org >>>> email: [email protected]; cell: +94 77 341 3097 >>>> blog: http://ruwansblog.blogspot.com >>> >>> >>> >>> -- >>> Hiranya Jayathilaka >>> Software Engineer; >>> WSO2 Inc.; http://wso2.org >>> E-mail: [email protected]; Mobile: +94 77 633 3491 >>> Blog: http://techfeast-hiranya.blogspot.com >> >> >> >> -- >> Ruwan Linton >> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb >> WSO2 Inc.; http://wso2.org >> email: [email protected]; cell: +94 77 341 3097 >> blog: http://ruwansblog.blogspot.com > > > > -- > Hiranya Jayathilaka > Software Engineer; > WSO2 Inc.; http://wso2.org > E-mail: [email protected]; Mobile: +94 77 633 3491 > Blog: http://techfeast-hiranya.blogspot.com > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
