Author: hiranya Date: Tue May 12 02:49:52 2009 New Revision: 35558 URL: http://wso2.org/svn/browse/wso2?view=rev&revision=35558
Log: Enhancements and code cleanup in the FIX transport (these changes are already performed on the Synapse trunk) * Made fix session factory a singleton * gave transport sender a worker pool * minor bug fixes to get samples the samples operational Modified: branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java Modified: branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=35558&r1=35557&r2=35558&view=diff ============================================================================== --- branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java (original) +++ branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java Tue May 12 02:49:52 2009 @@ -27,15 +27,12 @@ public class FIXApplicationFactory { private ConfigurationContext cfgCtx; - private WorkerPool workerPool; - - public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool workerPool) { + public FIXApplicationFactory(ConfigurationContext cfgCtx) { this.cfgCtx = cfgCtx; - this.workerPool = workerPool; } - public Application getFIXApplication(AxisService service, boolean acceptor) { + public Application getFIXApplication(AxisService service, WorkerPool workerPool, boolean acceptor) { return new FIXIncomingMessageHandler(cfgCtx, workerPool, service, acceptor); } } \ No newline at end of file Modified: branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=35558&r1=35557&r2=35558&view=diff ============================================================================== --- branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java (original) +++ branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java Tue May 12 02:49:52 2009 @@ -23,6 +23,7 @@ import org.apache.axis2.description.AxisService; import org.apache.axis2.description.Parameter; import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.threads.WorkerPool; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import quickfix.*; @@ -65,16 +66,29 @@ /** A Map containing all the FIX applications created for initiators, keyed by FIX EPR */ private Map<String, Application> applicationStore; /** An ApplicationFactory handles creating FIX Applications (FIXIncomingMessageHandler Objects) */ - private FIXApplicationFactory applicationFactory; + private static FIXApplicationFactory applicationFactory = null; + + private WorkerPool listenerThreadPool; + private WorkerPool senderThreadPool; private Log log; - public FIXSessionFactory(FIXApplicationFactory applicationFactory) { - this.applicationFactory = applicationFactory; + private static FIXSessionFactory INSTANCE = new FIXSessionFactory(); + + public static FIXSessionFactory getInstance(FIXApplicationFactory af) { + if (applicationFactory == null) { + applicationFactory = af; + } + return INSTANCE; + } + + private FIXSessionFactory() { this.log = LogFactory.getLog(this.getClass()); this.acceptorStore = new HashMap<String,Acceptor>(); this.initiatorStore = new HashMap<String, Initiator>(); this.applicationStore = new HashMap<String, Application>(); + this.listenerThreadPool = null; + this.senderThreadPool = null; } /** @@ -101,7 +115,7 @@ MessageFactory messageFactory = new DefaultMessageFactory(); quickfix.LogFactory logFactory = getLogFactory(service, settings, true); //Get a new FIX Application - Application messageHandler = applicationFactory.getFIXApplication(service, true); + Application messageHandler = applicationFactory.getFIXApplication(service, listenerThreadPool, true); //Create a new FIX Acceptor Acceptor acceptor = new SocketAcceptor( messageHandler, @@ -174,7 +188,7 @@ MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, false); MessageFactory messageFactory = new DefaultMessageFactory(); //Get a new FIX application - Application messageHandler = applicationFactory.getFIXApplication(service, false); + Application messageHandler = applicationFactory.getFIXApplication(service, senderThreadPool, false); try { //Create a new FIX initiator @@ -216,7 +230,7 @@ MessageFactory messageFactory = new DefaultMessageFactory(); quickfix.LogFactory logFactory = getLogFactory(service, settings, true); //Get a new FIX Application - Application messageHandler = applicationFactory.getFIXApplication(service, false); + Application messageHandler = applicationFactory.getFIXApplication(service, senderThreadPool, false); Initiator initiator = new SocketInitiator( messageHandler, @@ -246,10 +260,10 @@ } } else { - String msg = "The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + - "not specified. Unable to initialize the initiator session at this stage."; - log.info(msg); - throw new AxisFault(msg); + // FIX initiator session is not configured + // It could be intentional - So not an error (we don't need initiators at all times) + log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " + + "not specified. Unable to initialize the initiator session at this stage."); } } @@ -276,6 +290,24 @@ } /** + * Stops all the FIX initiators created so far and cleans up all the mappings + * related to them + */ + public void disposeFIXInitiators() { + boolean debugEnabled = log.isDebugEnabled(); + + for (String key : initiatorStore.keySet()) { + initiatorStore.get(key).stop(); + if (debugEnabled) { + log.debug("FIX initiator to the EPR " + key + " stopped"); + } + } + + initiatorStore.clear(); + applicationStore.clear(); + } + + /** * Returns an array of Strings representing EPRs for the specified service * * @param serviceName the name of the service @@ -444,6 +476,14 @@ } return app; } + + public void setListenerThreadPool(WorkerPool listenerThreadPool) { + this.listenerThreadPool = listenerThreadPool; + } + + public void setSenderThreadPool(WorkerPool senderThreadPool) { + this.senderThreadPool = senderThreadPool; + } } Modified: branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=35558&r1=35557&r2=35558&view=diff ============================================================================== --- branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java (original) +++ branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java Tue May 12 02:49:52 2009 @@ -60,14 +60,8 @@ TransportInDescription trpInDesc) throws AxisFault { super.init(cfgCtx, trpInDesc); - //initialize the FIXSessionFactory - fixSessionFactory = new FIXSessionFactory( - new FIXApplicationFactory(this.cfgCtx, this.workerPool)); - FIXTransportSender sender = (FIXTransportSender) cfgCtx. - getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender(); - if (sender != null) { - sender.setSessionFactory(fixSessionFactory); - } + fixSessionFactory = FIXSessionFactory.getInstance(new FIXApplicationFactory(cfgCtx)); + fixSessionFactory.setListenerThreadPool(this.workerPool); log.info("FIX transport listener initialized..."); } @@ -121,4 +115,4 @@ } throw new AxisFault("Unable to get EPRs for the service " + serviceName); } -} \ No newline at end of file +} Modified: branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java URL: http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=35558&r1=35557&r2=35558&view=diff ============================================================================== --- branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java (original) +++ branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java Tue May 12 02:49:52 2009 @@ -28,6 +28,8 @@ import org.apache.axis2.transport.OutTransportInfo; import org.apache.axis2.transport.base.AbstractTransportSender; import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.threads.WorkerPool; +import org.apache.axis2.transport.base.threads.WorkerPoolFactory; import org.apache.commons.logging.LogFactory; import quickfix.*; import quickfix.field.*; @@ -51,17 +53,12 @@ private FIXSessionFactory sessionFactory; private FIXOutgoingMessageHandler messageSender; + private WorkerPool workerPool; public FIXTransportSender() { this.log = LogFactory.getLog(this.getClass()); } - - public void setSessionFactory(FIXSessionFactory sessionFactory) { - this.sessionFactory = sessionFactory; - this.messageSender.setSessionFactory(sessionFactory); - } - /** * @param cfgCtx the axis2 configuration context * @param transportOut the Out Transport description @@ -69,10 +66,25 @@ */ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { super.init(cfgCtx, transportOut); + this.sessionFactory = FIXSessionFactory.getInstance(new FIXApplicationFactory(cfgCtx)); + this.workerPool = WorkerPoolFactory.getWorkerPool( + 10, 20, 5, -1, "FIX Sender Worker thread group", "FIX-Worker"); + this.sessionFactory.setSenderThreadPool(this.workerPool); messageSender = new FIXOutgoingMessageHandler(); + messageSender.setSessionFactory(this.sessionFactory); log.info("FIX transport sender initialized..."); } + public void stop() { + try { + this.workerPool.shutdown(10000); + } catch (InterruptedException e) { + log.warn("Thread interrupted while waiting for worker pool to shut down"); + } + sessionFactory.disposeFIXInitiators(); + super.stop(); + } + /** * Performs the actual sending of the message. * @@ -421,4 +433,4 @@ messageSender.cleanUpMessages(sessionID.toString()); } -} \ No newline at end of file +} _______________________________________________ Esb-java-dev mailing list [email protected] https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev
