I prefer this to be in an optional properties file, where it is not a must
to have this properties file, as in the nio http transport case.

This is because the thread pool configuration shouldn't be a normal
configuration that the users are doing. If we put these configurations as a
parameter in the axis2.xml that might lead people to think that it is a
frequently configurable parameter.

So I think the properties file approach is better than the parameter
approach.

Thanks,
Ruwan

On Wed, May 13, 2009 at 4:12 PM, Hiranya Jayathilaka
<[email protected]>wrote:

>
>
> On Wed, May 13, 2009 at 2:41 PM, Andreas Veithen <
> [email protected]> wrote:
>
>> We definitely need to make the WorkerPool created by
>> AbstractTransportListener configurable. The question whether this
>> should be done using a property file or using parameters in
>> TransportInDescription (as with all other configuration settings).
>> What are the arguments in favor of doing it in a separate property
>> file?
>
>
> Nothing significant I guess. It might probably save us a few bytes of
> memory since with a file based approach we don't really have to save those
> parameters in the configuration context. But that's not a big motivating
> facator IMO. May be the only plus point worth considering is that it's just
> consistent with the existing approach for configuring thread pools. We
> currently use nhttp.properties to configure the HTTP-NIO thread pool.
>
> But I'm ok with using transport parameters to do this.
>
> Thanks,
> Hiranya
>
>
>>
>> Andreas
>>
>> On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <[email protected]>
>> wrote:
>> >
>> >
>> > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <[email protected]>
>> > wrote:
>> >>
>> >> I think we need to make that configurable as well.... currently hard
>> >> codded setting will work in 98% of the cases, but there can be a
>> scenario
>> >> where it requires a tune up.
>> >>
>> >> Can we do this in a manner that we can configure them per transport.
>> >
>> > One simple solution would be to read a transport specific configuration
>> file
>> > at AbstractTransportListener#init(). The init method gets a
>> > TransportInDescription object as an argument and from that we can
>> retrieve
>> > the transport name to construct a file name unique to a given transport
>> (eg:
>> > mail.properties, vfs.properties). This approach has the benefit that it
>> > doesn't require changes to any of the actual transport implementations.
>> > Everything is taken care of by the abstract class.
>> >
>> > However this class now belongs to the WS-Commons transports project. So
>> the
>> > enhancement should be made there.
>> >
>> > Thanks,
>> > Hiranya
>> >
>> >
>> >>
>> >> Thanks,
>> >> Ruwan
>> >>
>> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
>> >> <[email protected]> wrote:
>> >>>
>> >>> Hi Ruwan,
>> >>>
>> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <
>> [email protected]>
>> >>> wrote:
>> >>>>
>> >>>> Hiranya,
>> >>>>
>> >>>> If you can make the worker pool configurable that would be of much
>> >>>> importance... you may have a look at the nhttp transport thread pool,
>> which
>> >>>> can be configurable via the nhttp.properties file.
>> >>>
>> >>> Currently the FIX sender initializes the WorkerPool in a manner
>> similar
>> >>> to the AbstractTransportListener. The WorkerPool in
>> >>> AbstractTransportListener is used by several transports (JMS, Mail
>> etc) via
>> >>> inheritance. FIX listener also makes use of the same thread pool. Do
>> we have
>> >>> any plans to make that thread pool configurable too? Othrewise I don't
>> think
>> >>> it mkes much sense just to make the FIX sender's thread pool
>> configurable.
>> >>>
>> >>> Thanks,
>> >>> Hiranya
>> >>>
>> >>>>
>> >>>>
>> >>>> Thanks,
>> >>>> Ruwan
>> >>>>
>> >>>> On Tue, May 12, 2009 at 1:30 PM, <[email protected]> wrote:
>> >>>>>
>> >>>>> Author: hiranya
>> >>>>> Date: Tue May 12 08:00:27 2009
>> >>>>> New Revision: 773818
>> >>>>>
>> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>> >>>>> Log:
>> >>>>> Enhancements and code cleanup in the FIX transport:
>> >>>>> * FIX sender now has its own worker pool and hence does not rely on
>> the
>> >>>>> FIX listener any more. Therefore listener and sender can be enabled
>> >>>>> individually
>> >>>>> * Made FIXSessionFactory a singleton to effectively share session
>> data
>> >>>>> among the listener and the sender
>> >>>>> * Cleanup logic for initiators during sender shutdown
>> >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
>> >>>>> sample 259 and similar scenarios from operating properly
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> >>>>>
>>  
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>>
>> >>>>>
>>  
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>>
>> >>>>>
>>  
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>>
>> >>>>>
>>  
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -27,15 +27,12 @@
>> >>>>>  public class FIXApplicationFactory {
>> >>>>>
>> >>>>>     private ConfigurationContext cfgCtx;
>> >>>>> -    private WorkerPool workerPool;
>> >>>>> -
>> >>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
>> >>>>> WorkerPool workerPool) {
>> >>>>>
>> >>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>> >>>>>         this.cfgCtx = cfgCtx;
>> >>>>> -        this.workerPool = workerPool;
>> >>>>>     }
>> >>>>>
>> >>>>> -    public Application getFIXApplication(AxisService service,
>> boolean
>> >>>>> acceptor) {
>> >>>>> +    public Application getFIXApplication(AxisService service,
>> >>>>> WorkerPool workerPool, boolean acceptor) {
>> >>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
>> >>>>> service, acceptor);
>> >>>>>     }
>> >>>>>  }
>> >>>>> \ No newline at end of file
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -23,6 +23,7 @@
>> >>>>>  import org.apache.axis2.description.AxisService;
>> >>>>>  import org.apache.axis2.description.Parameter;
>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>> >>>>>  import org.apache.commons.logging.Log;
>> >>>>>  import org.apache.commons.logging.LogFactory;
>> >>>>>  import quickfix.*;
>> >>>>> @@ -65,16 +66,29 @@
>> >>>>>     /** A Map containing all the FIX applications created for
>> >>>>> initiators, keyed by FIX EPR */
>> >>>>>     private Map<String, Application> applicationStore;
>> >>>>>     /** An ApplicationFactory handles creating FIX Applications
>> >>>>> (FIXIncomingMessageHandler Objects) */
>> >>>>> -    private FIXApplicationFactory applicationFactory;
>> >>>>> +    private static FIXApplicationFactory applicationFactory = null;
>> >>>>> +
>> >>>>> +    private WorkerPool listenerThreadPool;
>> >>>>> +    private WorkerPool senderThreadPool;
>> >>>>>
>> >>>>>     private Log log;
>> >>>>>
>> >>>>> -    public FIXSessionFactory(FIXApplicationFactory
>> applicationFactory)
>> >>>>> {
>> >>>>> -        this.applicationFactory = applicationFactory;
>> >>>>> +    private static FIXSessionFactory INSTANCE = new
>> >>>>> FIXSessionFactory();
>> >>>>> +
>> >>>>> +    public static FIXSessionFactory
>> getInstance(FIXApplicationFactory
>> >>>>> af) {
>> >>>>> +        if (applicationFactory == null) {
>> >>>>> +            applicationFactory = af;
>> >>>>> +        }
>> >>>>> +        return INSTANCE;
>> >>>>> +    }
>> >>>>> +
>> >>>>> +    private FIXSessionFactory() {
>> >>>>>         this.log = LogFactory.getLog(this.getClass());
>> >>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>> >>>>>         this.initiatorStore = new HashMap<String, Initiator>();
>> >>>>>         this.applicationStore = new HashMap<String, Application>();
>> >>>>> +        this.listenerThreadPool = null;
>> >>>>> +        this.senderThreadPool = null;
>> >>>>>     }
>> >>>>>
>> >>>>>     /**
>> >>>>> @@ -101,7 +115,7 @@
>> >>>>>                 MessageFactory messageFactory = new
>> >>>>> DefaultMessageFactory();
>> >>>>>                 quickfix.LogFactory logFactory =
>> getLogFactory(service,
>> >>>>> settings, true);
>> >>>>>                 //Get a new FIX Application
>> >>>>> -                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, true);
>> >>>>> +                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool,
>> true);
>> >>>>>                 //Create a new FIX Acceptor
>> >>>>>                 Acceptor acceptor = new SocketAcceptor(
>> >>>>>                         messageHandler,
>> >>>>> @@ -174,7 +188,7 @@
>> >>>>>         MessageStoreFactory storeFactory =
>> >>>>> getMessageStoreFactory(service, settings, false);
>> >>>>>         MessageFactory messageFactory = new DefaultMessageFactory();
>> >>>>>         //Get a new FIX application
>> >>>>> -        Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, false);
>> >>>>> +        Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
>> false);
>> >>>>>
>> >>>>>         try {
>> >>>>>            //Create a new FIX initiator
>> >>>>> @@ -216,7 +230,7 @@
>> >>>>>                 MessageFactory messageFactory = new
>> >>>>> DefaultMessageFactory();
>> >>>>>                 quickfix.LogFactory logFactory =
>> getLogFactory(service,
>> >>>>> settings, true);
>> >>>>>                 //Get a new FIX Application
>> >>>>> -                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, false);
>> >>>>> +                Application messageHandler =
>> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
>> false);
>> >>>>>
>> >>>>>                 Initiator initiator = new SocketInitiator(
>> >>>>>                     messageHandler,
>> >>>>> @@ -246,10 +260,10 @@
>> >>>>>             }
>> >>>>>
>> >>>>>         } else {
>> >>>>> -            String msg = "The " +
>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> >>>>> -                    "not specified. Unable to initialize the
>> initiator
>> >>>>> session at this stage.";
>> >>>>> -            log.info(msg);
>> >>>>> -            throw new AxisFault(msg);
>> >>>>> +            // FIX initiator session is not configured
>> >>>>> +            // It could be intentional - So not an error (we don't
>> >>>>> need initiators at all times)
>> >>>>> +            log.info("The " +
>> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> >>>>> +                    "not specified. Unable to initialize the
>> initiator
>> >>>>> session at this stage.");
>> >>>>>         }
>> >>>>>     }
>> >>>>>
>> >>>>> @@ -276,6 +290,24 @@
>> >>>>>     }
>> >>>>>
>> >>>>>     /**
>> >>>>> +     * Stops all the FIX initiators created so far and cleans up
>> all
>> >>>>> the mappings
>> >>>>> +     * related to them
>> >>>>> +     */
>> >>>>> +    public void disposeFIXInitiators() {
>> >>>>> +        boolean debugEnabled = log.isDebugEnabled();
>> >>>>> +
>> >>>>> +        for (String key : initiatorStore.keySet()) {
>> >>>>> +            initiatorStore.get(key).stop();
>> >>>>> +            if (debugEnabled) {
>> >>>>> +                log.debug("FIX initiator to the EPR " + key + "
>> >>>>> stopped");
>> >>>>> +            }
>> >>>>> +        }
>> >>>>> +
>> >>>>> +        initiatorStore.clear();
>> >>>>> +        applicationStore.clear();
>> >>>>> +    }
>> >>>>> +
>> >>>>> +    /**
>> >>>>>      * Returns an array of Strings representing EPRs for the
>> specified
>> >>>>> service
>> >>>>>      *
>> >>>>>      * @param serviceName the name of the service
>> >>>>> @@ -444,6 +476,14 @@
>> >>>>>         }
>> >>>>>         return app;
>> >>>>>     }
>> >>>>> +
>> >>>>> +    public void setListenerThreadPool(WorkerPool
>> listenerThreadPool) {
>> >>>>> +        this.listenerThreadPool = listenerThreadPool;
>> >>>>> +    }
>> >>>>> +
>> >>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>> >>>>> +        this.senderThreadPool = senderThreadPool;
>> >>>>> +    }
>> >>>>>  }
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -60,14 +60,8 @@
>> >>>>>                      TransportInDescription trpInDesc) throws
>> AxisFault
>> >>>>> {
>> >>>>>
>> >>>>>         super.init(cfgCtx, trpInDesc);
>> >>>>> -        //initialize the FIXSessionFactory
>> >>>>> -        fixSessionFactory = new FIXSessionFactory(
>> >>>>> -                new FIXApplicationFactory(this.cfgCtx,
>> >>>>> this.workerPool));
>> >>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>> >>>>> -
>> >>>>>
>>  
>> getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>> >>>>> -        if (sender != null) {
>> >>>>> -            sender.setSessionFactory(fixSessionFactory);
>> >>>>> -        }
>> >>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>> >>>>> FIXApplicationFactory(cfgCtx));
>> >>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>> >>>>>         log.info("FIX transport listener initialized...");
>> >>>>>     }
>> >>>>>
>> >>>>>
>> >>>>> Modified:
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>> URL:
>> >>>>>
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>> >>>>>
>> >>>>>
>> ==============================================================================
>> >>>>> ---
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>> (original)
>> >>>>> +++
>> >>>>>
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> >>>>> Tue May 12 08:00:27 2009
>> >>>>> @@ -28,6 +28,8 @@
>> >>>>>  import org.apache.axis2.transport.OutTransportInfo;
>> >>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>> >>>>>  import org.apache.commons.logging.LogFactory;
>> >>>>>  import quickfix.*;
>> >>>>>  import quickfix.field.*;
>> >>>>> @@ -51,17 +53,12 @@
>> >>>>>
>> >>>>>     private FIXSessionFactory sessionFactory;
>> >>>>>     private FIXOutgoingMessageHandler messageSender;
>> >>>>> +    private WorkerPool workerPool;
>> >>>>>
>> >>>>>     public FIXTransportSender() {
>> >>>>>         this.log = LogFactory.getLog(this.getClass());
>> >>>>>     }
>> >>>>>
>> >>>>> -
>> >>>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory)
>> {
>> >>>>> -        this.sessionFactory = sessionFactory;
>> >>>>> -        this.messageSender.setSessionFactory(sessionFactory);
>> >>>>> -    }
>> >>>>> -
>> >>>>>     /**
>> >>>>>      * @param cfgCtx       the axis2 configuration context
>> >>>>>      * @param transportOut the Out Transport description
>> >>>>> @@ -69,10 +66,25 @@
>> >>>>>      */
>> >>>>>     public void init(ConfigurationContext cfgCtx,
>> >>>>> TransportOutDescription transportOut) throws AxisFault {
>> >>>>>         super.init(cfgCtx, transportOut);
>> >>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>> >>>>> FIXApplicationFactory(cfgCtx));
>> >>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>> >>>>> +                            10, 20, 5, -1, "FIX Sender Worker
>> thread
>> >>>>> group", "FIX-Worker");
>> >>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>> >>>>>         messageSender = new FIXOutgoingMessageHandler();
>> >>>>> +        messageSender.setSessionFactory(this.sessionFactory);
>> >>>>>         log.info("FIX transport sender initialized...");
>> >>>>>     }
>> >>>>>
>> >>>>> +    public void stop() {
>> >>>>> +        try {
>> >>>>> +            this.workerPool.shutdown(10000);
>> >>>>> +        } catch (InterruptedException e) {
>> >>>>> +            log.warn("Thread interrupted while waiting for worker
>> pool
>> >>>>> to shut down");
>> >>>>> +        }
>> >>>>> +        sessionFactory.disposeFIXInitiators();
>> >>>>> +        super.stop();
>> >>>>> +    }
>> >>>>> +
>> >>>>>     /**
>> >>>>>      * Performs the actual sending of the message.
>> >>>>>      *
>> >>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> Ruwan Linton
>> >>>> Senior Software Engineer & Product Manager; WSO2 ESB;
>> >>>> http://wso2.org/esb
>> >>>> WSO2 Inc.; http://wso2.org
>> >>>> email: [email protected]; cell: +94 77 341 3097
>> >>>> blog: http://ruwansblog.blogspot.com
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Hiranya Jayathilaka
>> >>> Software Engineer;
>> >>> WSO2 Inc.;  http://wso2.org
>> >>> E-mail: [email protected];  Mobile: +94 77 633 3491
>> >>> Blog: http://techfeast-hiranya.blogspot.com
>> >>
>> >>
>> >>
>> >> --
>> >> Ruwan Linton
>> >> Senior Software Engineer & Product Manager; WSO2 ESB;
>> http://wso2.org/esb
>> >> WSO2 Inc.; http://wso2.org
>> >> email: [email protected]; cell: +94 77 341 3097
>> >> blog: http://ruwansblog.blogspot.com
>> >
>> >
>> >
>> > --
>> > Hiranya Jayathilaka
>> > Software Engineer;
>> > WSO2 Inc.;  http://wso2.org
>> > E-mail: [email protected];  Mobile: +94 77 633 3491
>> > Blog: http://techfeast-hiranya.blogspot.com
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>
>
> --
> Hiranya Jayathilaka
> Software Engineer;
> WSO2 Inc.;  http://wso2.org
> E-mail: [email protected];  Mobile: +94 77 633 3491
> Blog: http://techfeast-hiranya.blogspot.com
>



-- 
Ruwan Linton
Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
WSO2 Inc.; http://wso2.org
email: [email protected]; cell: +94 77 341 3097
blog: http://ruwansblog.blogspot.com

Reply via email to