I think we need to make that configurable as well.... currently hard codded
setting will work in 98% of the cases, but there can be a scenario where it
requires a tune up.

Can we do this in a manner that we can configure them per transport.

Thanks,
Ruwan

On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
<[email protected]>wrote:

> Hi Ruwan,
>
> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <[email protected]>wrote:
>
>> Hiranya,
>>
>> If you can make the worker pool configurable that would be of much
>> importance... you may have a look at the nhttp transport thread pool, which
>> can be configurable via the nhttp.properties file.
>
>
> Currently the FIX sender initializes the WorkerPool in a manner similar to
> the AbstractTransportListener. The WorkerPool in AbstractTransportListener
> is used by several transports (JMS, Mail etc) via inheritance. FIX listener
> also makes use of the same thread pool. Do we have any plans to make that
> thread pool configurable too? Othrewise I don't think it mkes much sense
> just to make the FIX sender's thread pool configurable.
>
> Thanks,
> Hiranya
>
>
>>
>> Thanks,
>> Ruwan
>>
>> On Tue, May 12, 2009 at 1:30 PM, <[email protected]> wrote:
>>
>>> Author: hiranya
>>> Date: Tue May 12 08:00:27 2009
>>> New Revision: 773818
>>>
>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>>> Log:
>>> Enhancements and code cleanup in the FIX transport:
>>> * FIX sender now has its own worker pool and hence does not rely on the
>>> FIX listener any more. Therefore listener and sender can be enabled
>>> individually
>>> * Made FIXSessionFactory a singleton to effectively share session data
>>> among the listener and the sender
>>> * Cleanup logic for initiators during sender shutdown
>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the sample
>>> 259 and similar scenarios from operating properly
>>>
>>> Modified:
>>>
>>>  
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>
>>>  
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>
>>>  
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>
>>>  
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>> Tue May 12 08:00:27 2009
>>> @@ -27,15 +27,12 @@
>>>  public class FIXApplicationFactory {
>>>
>>>     private ConfigurationContext cfgCtx;
>>> -    private WorkerPool workerPool;
>>> -
>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool
>>> workerPool) {
>>>
>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>>         this.cfgCtx = cfgCtx;
>>> -        this.workerPool = workerPool;
>>>     }
>>>
>>> -    public Application getFIXApplication(AxisService service, boolean
>>> acceptor) {
>>> +    public Application getFIXApplication(AxisService service, WorkerPool
>>> workerPool, boolean acceptor) {
>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool, service,
>>> acceptor);
>>>     }
>>>  }
>>> \ No newline at end of file
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>> Tue May 12 08:00:27 2009
>>> @@ -23,6 +23,7 @@
>>>  import org.apache.axis2.description.AxisService;
>>>  import org.apache.axis2.description.Parameter;
>>>  import org.apache.axis2.transport.base.BaseUtils;
>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>  import org.apache.commons.logging.Log;
>>>  import org.apache.commons.logging.LogFactory;
>>>  import quickfix.*;
>>> @@ -65,16 +66,29 @@
>>>     /** A Map containing all the FIX applications created for initiators,
>>> keyed by FIX EPR */
>>>     private Map<String, Application> applicationStore;
>>>     /** An ApplicationFactory handles creating FIX Applications
>>> (FIXIncomingMessageHandler Objects) */
>>> -    private FIXApplicationFactory applicationFactory;
>>> +    private static FIXApplicationFactory applicationFactory = null;
>>> +
>>> +    private WorkerPool listenerThreadPool;
>>> +    private WorkerPool senderThreadPool;
>>>
>>>     private Log log;
>>>
>>> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
>>> -        this.applicationFactory = applicationFactory;
>>> +    private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
>>> +
>>> +    public static FIXSessionFactory getInstance(FIXApplicationFactory
>>> af) {
>>> +        if (applicationFactory == null) {
>>> +            applicationFactory = af;
>>> +        }
>>> +        return INSTANCE;
>>> +    }
>>> +
>>> +    private FIXSessionFactory() {
>>>         this.log = LogFactory.getLog(this.getClass());
>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>>         this.initiatorStore = new HashMap<String, Initiator>();
>>>         this.applicationStore = new HashMap<String, Application>();
>>> +        this.listenerThreadPool = null;
>>> +        this.senderThreadPool = null;
>>>     }
>>>
>>>     /**
>>> @@ -101,7 +115,7 @@
>>>                 MessageFactory messageFactory = new
>>> DefaultMessageFactory();
>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>> settings, true);
>>>                 //Get a new FIX Application
>>> -                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, true);
>>> +                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>>>                 //Create a new FIX Acceptor
>>>                 Acceptor acceptor = new SocketAcceptor(
>>>                         messageHandler,
>>> @@ -174,7 +188,7 @@
>>>         MessageStoreFactory storeFactory =
>>> getMessageStoreFactory(service, settings, false);
>>>         MessageFactory messageFactory = new DefaultMessageFactory();
>>>         //Get a new FIX application
>>> -        Application messageHandler =
>>> applicationFactory.getFIXApplication(service, false);
>>> +        Application messageHandler =
>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>
>>>         try {
>>>            //Create a new FIX initiator
>>> @@ -216,7 +230,7 @@
>>>                 MessageFactory messageFactory = new
>>> DefaultMessageFactory();
>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>> settings, true);
>>>                 //Get a new FIX Application
>>> -                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, false);
>>> +                Application messageHandler =
>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>
>>>                 Initiator initiator = new SocketInitiator(
>>>                     messageHandler,
>>> @@ -246,10 +260,10 @@
>>>             }
>>>
>>>         } else {
>>> -            String msg = "The " +
>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>> -                    "not specified. Unable to initialize the initiator
>>> session at this stage.";
>>> -            log.info(msg);
>>> -            throw new AxisFault(msg);
>>> +            // FIX initiator session is not configured
>>> +            // It could be intentional - So not an error (we don't need
>>> initiators at all times)
>>> +            log.info("The " +
>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>> +                    "not specified. Unable to initialize the initiator
>>> session at this stage.");
>>>         }
>>>     }
>>>
>>> @@ -276,6 +290,24 @@
>>>     }
>>>
>>>     /**
>>> +     * Stops all the FIX initiators created so far and cleans up all the
>>> mappings
>>> +     * related to them
>>> +     */
>>> +    public void disposeFIXInitiators() {
>>> +        boolean debugEnabled = log.isDebugEnabled();
>>> +
>>> +        for (String key : initiatorStore.keySet()) {
>>> +            initiatorStore.get(key).stop();
>>> +            if (debugEnabled) {
>>> +                log.debug("FIX initiator to the EPR " + key + "
>>> stopped");
>>> +            }
>>> +        }
>>> +
>>> +        initiatorStore.clear();
>>> +        applicationStore.clear();
>>> +    }
>>> +
>>> +    /**
>>>      * Returns an array of Strings representing EPRs for the specified
>>> service
>>>      *
>>>      * @param serviceName the name of the service
>>> @@ -444,6 +476,14 @@
>>>         }
>>>         return app;
>>>     }
>>> +
>>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
>>> +        this.listenerThreadPool = listenerThreadPool;
>>> +    }
>>> +
>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>>> +        this.senderThreadPool = senderThreadPool;
>>> +    }
>>>  }
>>>
>>>
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>> Tue May 12 08:00:27 2009
>>> @@ -60,14 +60,8 @@
>>>                      TransportInDescription trpInDesc) throws AxisFault {
>>>
>>>         super.init(cfgCtx, trpInDesc);
>>> -        //initialize the FIXSessionFactory
>>> -        fixSessionFactory = new FIXSessionFactory(
>>> -                new FIXApplicationFactory(this.cfgCtx,
>>> this.workerPool));
>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>>> -
>>>  
>>> getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>>> -        if (sender != null) {
>>> -            sender.setSessionFactory(fixSessionFactory);
>>> -        }
>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>>> FIXApplicationFactory(cfgCtx));
>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>>         log.info("FIX transport listener initialized...");
>>>     }
>>>
>>>
>>> Modified:
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> URL:
>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> (original)
>>> +++
>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>> Tue May 12 08:00:27 2009
>>> @@ -28,6 +28,8 @@
>>>  import org.apache.axis2.transport.OutTransportInfo;
>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>>  import org.apache.axis2.transport.base.BaseUtils;
>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>>  import org.apache.commons.logging.LogFactory;
>>>  import quickfix.*;
>>>  import quickfix.field.*;
>>> @@ -51,17 +53,12 @@
>>>
>>>     private FIXSessionFactory sessionFactory;
>>>     private FIXOutgoingMessageHandler messageSender;
>>> +    private WorkerPool workerPool;
>>>
>>>     public FIXTransportSender() {
>>>         this.log = LogFactory.getLog(this.getClass());
>>>     }
>>>
>>> -
>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
>>> -        this.sessionFactory = sessionFactory;
>>> -        this.messageSender.setSessionFactory(sessionFactory);
>>> -    }
>>> -
>>>     /**
>>>      * @param cfgCtx       the axis2 configuration context
>>>      * @param transportOut the Out Transport description
>>> @@ -69,10 +66,25 @@
>>>      */
>>>     public void init(ConfigurationContext cfgCtx, TransportOutDescription
>>> transportOut) throws AxisFault {
>>>         super.init(cfgCtx, transportOut);
>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>>> FIXApplicationFactory(cfgCtx));
>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>>> +                            10, 20, 5, -1, "FIX Sender Worker thread
>>> group", "FIX-Worker");
>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>>         messageSender = new FIXOutgoingMessageHandler();
>>> +        messageSender.setSessionFactory(this.sessionFactory);
>>>         log.info("FIX transport sender initialized...");
>>>     }
>>>
>>> +    public void stop() {
>>> +        try {
>>> +            this.workerPool.shutdown(10000);
>>> +        } catch (InterruptedException e) {
>>> +            log.warn("Thread interrupted while waiting for worker pool
>>> to shut down");
>>> +        }
>>> +        sessionFactory.disposeFIXInitiators();
>>> +        super.stop();
>>> +    }
>>> +
>>>     /**
>>>      * Performs the actual sending of the message.
>>>      *
>>>
>>>
>>>
>>
>>
>> --
>> Ruwan Linton
>> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
>> WSO2 Inc.; http://wso2.org
>> email: [email protected]; cell: +94 77 341 3097
>> blog: http://ruwansblog.blogspot.com
>>
>
>
>
> --
> Hiranya Jayathilaka
> Software Engineer;
> WSO2 Inc.;  http://wso2.org
> E-mail: [email protected];  Mobile: +94 77 633 3491
> Blog: http://techfeast-hiranya.blogspot.com
>



-- 
Ruwan Linton
Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
WSO2 Inc.; http://wso2.org
email: [email protected]; cell: +94 77 341 3097
blog: http://ruwansblog.blogspot.com

Reply via email to