Author: hiranya
Date: Tue May 12 02:49:52 2009
New Revision: 35558
URL: http://wso2.org/svn/browse/wso2?view=rev&revision=35558

Log:
Enhancements and code cleanup in the FIX transport (these changes are already 
performed on the Synapse trunk)

* Made fix session factory a singleton
* gave transport sender a worker pool
* minor bug fixes to get samples the samples operational

Modified:
   
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
   
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
   
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
   
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java

Modified: 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=35558&r1=35557&r2=35558&view=diff
==============================================================================
--- 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
       (original)
+++ 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
       Tue May 12 02:49:52 2009
@@ -27,15 +27,12 @@
 public class FIXApplicationFactory {
 
     private ConfigurationContext cfgCtx;
-    private WorkerPool workerPool;
-
-    public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool 
workerPool) {
 
+    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
         this.cfgCtx = cfgCtx;
-        this.workerPool = workerPool;
     }
 
-    public Application getFIXApplication(AxisService service, boolean 
acceptor) {
+    public Application getFIXApplication(AxisService service, WorkerPool 
workerPool, boolean acceptor) {
         return new FIXIncomingMessageHandler(cfgCtx, workerPool, service, 
acceptor);
     }
 }
\ No newline at end of file

Modified: 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=35558&r1=35557&r2=35558&view=diff
==============================================================================
--- 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
   (original)
+++ 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
   Tue May 12 02:49:52 2009
@@ -23,6 +23,7 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.transport.base.BaseUtils;
+import org.apache.axis2.transport.base.threads.WorkerPool;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import quickfix.*;
@@ -65,16 +66,29 @@
     /** A Map containing all the FIX applications created for initiators, 
keyed by FIX EPR */
     private Map<String, Application> applicationStore;
     /** An ApplicationFactory handles creating FIX Applications 
(FIXIncomingMessageHandler Objects) */
-    private FIXApplicationFactory applicationFactory;
+    private static FIXApplicationFactory applicationFactory = null;
+
+    private WorkerPool listenerThreadPool;
+    private WorkerPool senderThreadPool;
 
     private Log log;
 
-    public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
-        this.applicationFactory = applicationFactory;
+    private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
+
+    public static FIXSessionFactory getInstance(FIXApplicationFactory af) {
+        if (applicationFactory == null) {
+            applicationFactory = af;
+        }
+        return INSTANCE;   
+    }
+
+    private FIXSessionFactory() {
         this.log = LogFactory.getLog(this.getClass());
         this.acceptorStore = new HashMap<String,Acceptor>();
         this.initiatorStore = new HashMap<String, Initiator>();
         this.applicationStore = new HashMap<String, Application>();
+        this.listenerThreadPool = null;
+        this.senderThreadPool = null;
     }
 
     /**
@@ -101,7 +115,7 @@
                 MessageFactory messageFactory = new DefaultMessageFactory();
                 quickfix.LogFactory logFactory = getLogFactory(service, 
settings, true);
                 //Get a new FIX Application
-                Application messageHandler = 
applicationFactory.getFIXApplication(service, true);
+                Application messageHandler = 
applicationFactory.getFIXApplication(service, listenerThreadPool, true);
                 //Create a new FIX Acceptor
                 Acceptor acceptor = new SocketAcceptor(
                         messageHandler,
@@ -174,7 +188,7 @@
         MessageStoreFactory storeFactory = getMessageStoreFactory(service, 
settings, false);
         MessageFactory messageFactory = new DefaultMessageFactory();
         //Get a new FIX application
-        Application messageHandler = 
applicationFactory.getFIXApplication(service, false);
+        Application messageHandler = 
applicationFactory.getFIXApplication(service, senderThreadPool, false);
 
         try {
            //Create a new FIX initiator
@@ -216,7 +230,7 @@
                 MessageFactory messageFactory = new DefaultMessageFactory();
                 quickfix.LogFactory logFactory = getLogFactory(service, 
settings, true);
                 //Get a new FIX Application
-                Application messageHandler = 
applicationFactory.getFIXApplication(service, false);
+                Application messageHandler = 
applicationFactory.getFIXApplication(service, senderThreadPool, false);
 
                 Initiator initiator = new SocketInitiator(
                     messageHandler,
@@ -246,10 +260,10 @@
             }
 
         } else {
-            String msg = "The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM 
+ " parameter is " +
-                    "not specified. Unable to initialize the initiator session 
at this stage.";
-            log.info(msg);
-            throw new AxisFault(msg);
+            // FIX initiator session is not configured
+            // It could be intentional - So not an error (we don't need 
initiators at all times)
+            log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " 
parameter is " +
+                    "not specified. Unable to initialize the initiator session 
at this stage.");
         }
     }
 
@@ -276,6 +290,24 @@
     }
 
     /**
+     * Stops all the FIX initiators created so far and cleans up all the 
mappings
+     * related to them
+     */
+    public void disposeFIXInitiators() {
+        boolean debugEnabled = log.isDebugEnabled();
+
+        for (String key : initiatorStore.keySet()) {
+            initiatorStore.get(key).stop();
+            if (debugEnabled) {
+                log.debug("FIX initiator to the EPR " + key + " stopped");
+            }
+        }
+
+        initiatorStore.clear();
+        applicationStore.clear();
+    }
+
+    /**
      * Returns an array of Strings representing EPRs for the specified service
      *
      * @param serviceName the name of the service
@@ -444,6 +476,14 @@
         }
         return app;
     }
+
+    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
+        this.listenerThreadPool = listenerThreadPool;
+    }
+
+    public void setSenderThreadPool(WorkerPool senderThreadPool) {
+        this.senderThreadPool = senderThreadPool;
+    }
 }
 
 

Modified: 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=35558&r1=35557&r2=35558&view=diff
==============================================================================
--- 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
        (original)
+++ 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
        Tue May 12 02:49:52 2009
@@ -60,14 +60,8 @@
                      TransportInDescription trpInDesc) throws AxisFault {
 
         super.init(cfgCtx, trpInDesc);
-        //initialize the FIXSessionFactory
-        fixSessionFactory = new FIXSessionFactory(
-                new FIXApplicationFactory(this.cfgCtx, this.workerPool));
-        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
-                
getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
-        if (sender != null) {
-            sender.setSessionFactory(fixSessionFactory);
-        }
+        fixSessionFactory = FIXSessionFactory.getInstance(new 
FIXApplicationFactory(cfgCtx));
+        fixSessionFactory.setListenerThreadPool(this.workerPool);
         log.info("FIX transport listener initialized...");
     }
 
@@ -121,4 +115,4 @@
         }
         throw new AxisFault("Unable to get EPRs for the service " + 
serviceName);
     }
-}
\ No newline at end of file
+}

Modified: 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
URL: 
http://wso2.org/svn/browse/wso2/branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=35558&r1=35557&r2=35558&view=diff
==============================================================================
--- 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
  (original)
+++ 
branches/synapse/1.3-wso2v1/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
  Tue May 12 02:49:52 2009
@@ -28,6 +28,8 @@
 import org.apache.axis2.transport.OutTransportInfo;
 import org.apache.axis2.transport.base.AbstractTransportSender;
 import org.apache.axis2.transport.base.BaseUtils;
+import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
 import org.apache.commons.logging.LogFactory;
 import quickfix.*;
 import quickfix.field.*;
@@ -51,17 +53,12 @@
 
     private FIXSessionFactory sessionFactory;
     private FIXOutgoingMessageHandler messageSender;
+    private WorkerPool workerPool;
 
     public FIXTransportSender() {
         this.log = LogFactory.getLog(this.getClass());
     }
 
-
-    public void setSessionFactory(FIXSessionFactory sessionFactory) {
-        this.sessionFactory = sessionFactory;
-        this.messageSender.setSessionFactory(sessionFactory);
-    }
-
     /**
      * @param cfgCtx       the axis2 configuration context
      * @param transportOut the Out Transport description
@@ -69,10 +66,25 @@
      */
     public void init(ConfigurationContext cfgCtx, TransportOutDescription 
transportOut) throws AxisFault {
         super.init(cfgCtx, transportOut);
+        this.sessionFactory = FIXSessionFactory.getInstance(new 
FIXApplicationFactory(cfgCtx));
+        this.workerPool = WorkerPoolFactory.getWorkerPool(
+                            10, 20, 5, -1, "FIX Sender Worker thread group", 
"FIX-Worker");
+        this.sessionFactory.setSenderThreadPool(this.workerPool);
         messageSender = new FIXOutgoingMessageHandler();
+        messageSender.setSessionFactory(this.sessionFactory);
         log.info("FIX transport sender initialized...");
     }
 
+    public void stop() {
+        try {
+            this.workerPool.shutdown(10000);
+        } catch (InterruptedException e) {
+            log.warn("Thread interrupted while waiting for worker pool to shut 
down");
+        }
+        sessionFactory.disposeFIXInitiators();
+        super.stop();
+    }
+
     /**
      * Performs the actual sending of the message.
      *
@@ -421,4 +433,4 @@
         messageSender.cleanUpMessages(sessionID.toString());
     }
 
-}
\ No newline at end of file
+}

_______________________________________________
Esb-java-dev mailing list
[email protected]
https://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev

Reply via email to