Author: hiranya
Date: Tue Jan  3 06:44:07 2012
New Revision: 1226672

URL: http://svn.apache.org/viewvc?rev=1226672&view=rev
Log:
Some improvements and control over how threads are handled by the FIX transport

Modified:
    
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
    
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
    
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java

Modified: 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java?rev=1226672&r1=1226671&r2=1226672&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
 Tue Jan  3 06:44:07 2012
@@ -107,6 +107,14 @@ public class FIXConstants {
     public static final String FIX_ACCEPTOR_EVENT_HANDLER = 
"transport.fix.AcceptorSessionEventHandler";
     public static final String FIX_INITIATOR_EVENT_HANDLER = 
"transport.fix.InitiatorSessionEventHandler";
 
+    public static final String FIX_PROCESS_SINGLE_THREADED = 
"transport.fix.ProcessSingleThreaded";
+    public static final String FIX_ACCEPTOR_SINGLE_THREADED = 
"transport.fix.AcceptorSingleThreaded";
+    public static final String FIX_INITIATOR_SINGLE_THREADED = 
"transport.fix.InitiatorSingleThreaded";
+
+    public static final String FIX_USE_THREADED_CONNECTORS = 
"transport.fix.UseThreadedConnectors";
+    public static final String FIX_USE_THREADED_ACCEPTOR = 
"transport.fix.UseThreadedAcceptor";
+    public static final String FIX_USE_THREADED_INITIATOR = 
"transport.fix.UseThreadedInitiator";
+
     //--------------------------- Message level properties 
-----------------------------------
 
     public static final String FIX_IGNORE_ORDER = "transport.fix.IgnoreOrder";

Modified: 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java?rev=1226672&r1=1226671&r2=1226672&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
 Tue Jan  3 06:44:07 2012
@@ -21,6 +21,7 @@ package org.apache.synapse.transport.fix
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
+import org.apache.axis2.util.JavaUtils;
 import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -70,9 +71,10 @@ public class FIXIncomingMessageHandler i
     private boolean dropExtraResponses = false;
     private Semaphore semaphore;
     private SessionEventHandler eventHandler;
+    private boolean singleThreaded;
 
     public FIXIncomingMessageHandler(ConfigurationContext cfgCtx, WorkerPool 
workerPool,
-                             AxisService service, boolean acceptor) {
+                                     AxisService service, boolean acceptor) {
         this.cfgCtx = cfgCtx;
         this.workerPool = workerPool;
         this.service = service;
@@ -104,6 +106,8 @@ public class FIXIncomingMessageHandler i
                         eventHandlerParam.getValue(), e);
             }
         }
+
+        singleThreaded = isSingleThreaded();
     }
 
     private void getResponseHandlingApproach() {
@@ -118,6 +122,35 @@ public class FIXIncomingMessageHandler i
         }
     }
 
+    private boolean isSingleThreaded() {
+        Parameter singleThreadParam = service.getParameter(
+                FIXConstants.FIX_ACCEPTOR_SINGLE_THREADED);
+        if (acceptor && singleThreadParam != null &&
+                JavaUtils.isTrueExplicitly(singleThreadParam.getValue())) {
+            log.info("FIX acceptor for service: " + service.getName() + " is 
single threaded");
+            return true;
+        }
+
+        singleThreadParam = service.getParameter(
+                FIXConstants.FIX_INITIATOR_SINGLE_THREADED);
+        if (!acceptor && singleThreadParam != null &&
+                JavaUtils.isTrueExplicitly(singleThreadParam.getValue())) {
+            log.info("FIX initiator for service: " + service.getName() + " is 
single threaded");
+            return true;
+        }
+
+        singleThreadParam = service.getParameter(
+                FIXConstants.FIX_PROCESS_SINGLE_THREADED);
+        if (singleThreadParam != null &&
+                JavaUtils.isTrueExplicitly(singleThreadParam.getValue())) {
+            log.info("FIX processor for service: " + service.getName() + " is 
single threaded");
+            return true;
+        }
+
+        return false;
+    }
+
+
     public void setOutgoingMessageContext(MessageContext msgCtx) {
         if (!allNewApproach) {
             outgoingMessages.offer(msgCtx);
@@ -260,7 +293,7 @@ public class FIXIncomingMessageHandler i
      * @throws DoNotSend This exception aborts message transmission
      */
     public void toApp(Message message, SessionID sessionID) throws DoNotSend {
-          if (log.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             StringBuffer sb = new StringBuffer();
             try {
                 sb.append("Sending application level FIX message to 
").append(message.getHeader().getField(new TargetCompID()).getValue());
@@ -314,7 +347,108 @@ public class FIXIncomingMessageHandler i
         if (rolled && log.isDebugEnabled()) {
             log.debug("Incoming request counter rolled over for the session: " 
+ sessionID);
         }
-        workerPool.execute(new FIXWorkerThread(message, sessionID, counter));
+        if (singleThreaded) {
+            processMessage(message, sessionID, counter);
+        } else {
+            workerPool.execute(new FIXWorkerThread(message, sessionID, 
counter));
+        }
+    }
+
+    public void processMessage(Message message, SessionID sessionID, int 
counter) {
+        if (allNewApproach) {
+            //treat all messages (including responses) as new messages
+            handleIncomingRequest(message, sessionID, counter);
+        } else {
+            if (acceptor) {
+                //treat messages coming from an acceptor as new request 
messages
+                handleIncomingRequest(message, sessionID, counter);
+            } else {
+                MessageContext outMsgCtx = outgoingMessages.poll();
+                if (outMsgCtx != null) {
+                    //handle as a response to an outgoing message
+                    handleIncomingResponse(outMsgCtx, message, sessionID, 
counter);
+                } else if (!dropExtraResponses) {
+                    //handle as a new request message
+                    handleIncomingRequest(message, sessionID, counter);
+                } else {
+                    log.debug("Dropping additional FIX response");
+                }
+            }
+        }
+    }
+
+    private void handleIncomingRequest(Message message, SessionID sessionID, 
int counter) {
+        if (log.isDebugEnabled()) {
+            log.debug("Source session: " + sessionID + " - Received message 
with sequence " +
+                    "number " + counter);
+        }
+
+        //Create message context for the incoming message
+        AbstractTransportListener trpListener = (AbstractTransportListener) 
cfgCtx.getAxisConfiguration().
+                getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
+
+        MessageContext msgCtx = trpListener.createMessageContext();
+        msgCtx.setProperty(Constants.OUT_TRANSPORT_INFO, new 
FIXOutTransportInfo(sessionID));
+
+        if (service != null) {
+            // Set the service for which the message is intended to
+            msgCtx.setAxisService(service);
+            // find the operation for the message, or default to one
+            Parameter operationParam = 
service.getParameter(BaseConstants.OPERATION_PARAM);
+            QName operationQName = (
+                    operationParam != null ?
+                            
BaseUtils.getQNameFromString(operationParam.getValue()) :
+                            BaseConstants.DEFAULT_OPERATION);
+
+            AxisOperation operation = service.getOperation(operationQName);
+            if (operation != null) {
+                msgCtx.setAxisOperation(operation);
+                
msgCtx.setAxisMessage(operation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
+                msgCtx.setSoapAction("urn:" + 
operation.getName().getLocalPart());
+            }
+        }
+
+        String fixApplication = FIXConstants.FIX_INITIATOR;
+        if (acceptor) {
+            fixApplication = FIXConstants.FIX_ACCEPTOR;
+        } else {
+            msgCtx.setProperty("synapse.isresponse", true);
+        }
+
+        try {
+            //Put the FIX message in a SOAPEnvelope
+            FIXUtils.getInstance().setSOAPEnvelope(message, counter, 
sessionID.toString(), msgCtx);
+            trpListener.handleIncomingMessage(
+                    msgCtx,
+                    FIXUtils.getTransportHeaders(service.getName(), 
fixApplication),
+                    null,
+                    FIXConstants.FIX_DEFAULT_CONTENT_TYPE
+            );
+        } catch (AxisFault e) {
+            handleException("Error while processing FIX message", e);
+        }
+    }
+
+    private void handleIncomingResponse(MessageContext outMsgCtx, Message 
message,
+                                        SessionID sessionID, int counter) {
+        AbstractTransportSender trpSender = (AbstractTransportSender) 
cfgCtx.getAxisConfiguration().
+                getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
+
+        MessageContext msgCtx = 
trpSender.createResponseMessageContext(outMsgCtx);
+
+        try {
+            //Put the FIX message in a SOAPEnvelope
+            FIXUtils.getInstance().setSOAPEnvelope(message, counter, 
sessionID.toString(), msgCtx);
+            msgCtx.setServerSide(true);
+            trpSender.handleIncomingMessage(
+                    msgCtx,
+                    FIXUtils.getTransportHeaders(service.getName(), 
FIXConstants.FIX_INITIATOR),
+                    null,
+                    FIXConstants.FIX_DEFAULT_CONTENT_TYPE
+            );
+        } catch (AxisFault e) {
+            handleException("Error while processing response FIX message", e);
+        }
     }
 
     /**
@@ -333,105 +467,9 @@ public class FIXIncomingMessageHandler i
             this.counter = counter;
         }
 
-        private void handleIncomingRequest() {
-            if (log.isDebugEnabled()) {
-                log.debug("Source session: " + sessionID + " - Received 
message with sequence " +
-                        "number " + counter);
-            }
-
-            //Create message context for the incoming message
-            AbstractTransportListener trpListener = 
(AbstractTransportListener) cfgCtx.getAxisConfiguration().
-                    getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
-
-            MessageContext msgCtx = trpListener.createMessageContext();
-            msgCtx.setProperty(Constants.OUT_TRANSPORT_INFO, new 
FIXOutTransportInfo(sessionID));
-
-            if (service != null) {
-                // Set the service for which the message is intended to
-                msgCtx.setAxisService(service);
-                // find the operation for the message, or default to one
-                Parameter operationParam = 
service.getParameter(BaseConstants.OPERATION_PARAM);
-                QName operationQName = (
-                    operationParam != null ?
-                        
BaseUtils.getQNameFromString(operationParam.getValue()) :
-                        BaseConstants.DEFAULT_OPERATION);
-
-                AxisOperation operation = service.getOperation(operationQName);
-                if (operation != null) {
-                    msgCtx.setAxisOperation(operation);
-                    
msgCtx.setAxisMessage(operation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
-                    msgCtx.setSoapAction("urn:" + 
operation.getName().getLocalPart());
-                }
-            }
-
-            String fixApplication = FIXConstants.FIX_INITIATOR;
-            if (acceptor) {
-                fixApplication = FIXConstants.FIX_ACCEPTOR;
-            } else {
-                msgCtx.setProperty("synapse.isresponse", true);
-            }
-
-            try {
-                //Put the FIX message in a SOAPEnvelope
-                FIXUtils.getInstance().setSOAPEnvelope(message, counter, 
sessionID.toString(), msgCtx);
-                trpListener.handleIncomingMessage(
-                        msgCtx,
-                        FIXUtils.getTransportHeaders(service.getName(), 
fixApplication),
-                        null,
-                        FIXConstants.FIX_DEFAULT_CONTENT_TYPE
-                );
-            } catch (AxisFault e) {
-                handleException("Error while processing FIX message", e);
-            }
-        }
-
-        private void handleIncomingResponse(MessageContext outMsgCtx) {
-            AbstractTransportSender trpSender = (AbstractTransportSender) 
cfgCtx.getAxisConfiguration().
-                        
getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
-
-            MessageContext msgCtx = 
trpSender.createResponseMessageContext(outMsgCtx);
-
-            try {
-                //Put the FIX message in a SOAPEnvelope
-                FIXUtils.getInstance().setSOAPEnvelope(message, counter, 
sessionID.toString(), msgCtx);
-                msgCtx.setServerSide(true);
-                trpSender.handleIncomingMessage(
-                        msgCtx,
-                        FIXUtils.getTransportHeaders(service.getName(), 
FIXConstants.FIX_INITIATOR),
-                        null,
-                        FIXConstants.FIX_DEFAULT_CONTENT_TYPE
-                );
-            } catch (AxisFault e) {
-                handleException("Error while processing response FIX message", 
e);
-            }
-        }
-
         public void run() {
-
-            if (allNewApproach) {
-                //treat all messages (including responses) as new messages
-                handleIncomingRequest();
-            }
-            else {
-                if (acceptor) {
-                    //treat messages coming from an acceptor as new request 
messages
-                    handleIncomingRequest();
-                }
-                else {
-                    MessageContext outMsgCtx = outgoingMessages.poll();
-                    if (outMsgCtx != null) {
-                        //handle as a response to an outgoing message
-                        handleIncomingResponse(outMsgCtx);
-                    } else if (!dropExtraResponses) {
-                        //handle as a new request message
-                        handleIncomingRequest();
-                    } else {
-                        log.debug("Dropping additional FIX response");
-                    }
-                }
-            }
+            processMessage(message, sessionID, counter);
         }
-
     }
 
 }

Modified: 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=1226672&r1=1226671&r2=1226672&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
 (original)
+++ 
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
 Tue Jan  3 06:44:07 2012
@@ -24,6 +24,7 @@ import org.apache.axis2.description.Axis
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.transport.base.BaseUtils;
 import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.util.JavaUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.quickfixj.jmx.JmxExporter;
@@ -120,13 +121,24 @@ public class FIXSessionFactory {
                 //Get a new FIX Application
                 Application messageHandler = 
applicationFactory.getFIXApplication(service,
                         listenerThreadPool, true);
+                boolean threadedConnector = useThreadedConnector(service, 
true);
                 //Create a new FIX Acceptor
-                Acceptor acceptor = new SocketAcceptor(
-                        messageHandler,
-                        storeFactory,
-                        settings,
-                        logFactory,
-                        messageFactory);
+                Acceptor acceptor;
+                if (threadedConnector) {
+                    acceptor = new ThreadedSocketAcceptor(
+                            messageHandler,
+                            storeFactory,
+                            settings,
+                            logFactory,
+                            messageFactory);
+                } else {
+                    acceptor = new SocketAcceptor(
+                            messageHandler,
+                            storeFactory,
+                            settings,
+                            logFactory,
+                            messageFactory);
+                }
 
                 acceptor.start();
                 initJMX(acceptor, service.getName());
@@ -197,15 +209,26 @@ public class FIXSessionFactory {
         //Get a new FIX application
         Application messageHandler = 
applicationFactory.getFIXApplication(service,
                 senderThreadPool, false);
+        boolean threadedConnector = useThreadedConnector(service, false);
 
         try {
            //Create a new FIX initiator
-            Initiator initiator = new SocketInitiator(
-                    messageHandler,
-                    storeFactory,
-                    settings,
-                    logFactory,
-                    messageFactory);
+            Initiator initiator;
+            if (threadedConnector) {
+                initiator = new ThreadedSocketInitiator(
+                        messageHandler,
+                        storeFactory,
+                        settings,
+                        logFactory,
+                        messageFactory);
+            } else {
+                initiator = new SocketInitiator(
+                        messageHandler,
+                        storeFactory,
+                        settings,
+                        logFactory,
+                        messageFactory);
+            }
 
             initiator.start();
             initJMX(initiator, service.getName());
@@ -242,12 +265,23 @@ public class FIXSessionFactory {
                 Application messageHandler = 
applicationFactory.getFIXApplication(service,
                         senderThreadPool, false);
 
-                Initiator initiator = new SocketInitiator(
-                    messageHandler,
-                    storeFactory,
-                    settings,
-                    logFactory,
-                    messageFactory);
+                boolean threadedConnector = useThreadedConnector(service, 
false);
+                Initiator initiator;
+                if (threadedConnector) {
+                    initiator = new ThreadedSocketInitiator(
+                            messageHandler,
+                            storeFactory,
+                            settings,
+                            logFactory,
+                            messageFactory);
+                } else {
+                    initiator = new SocketInitiator(
+                            messageHandler,
+                            storeFactory,
+                            settings,
+                            logFactory,
+                            messageFactory);
+                }
 
                 initiator.start();
                 initJMX(initiator, service.getName());
@@ -511,6 +545,25 @@ public class FIXSessionFactory {
             log.error("Error while initializing JMX support for the service: " 
+ service, e);
         }
     }
+
+    private boolean useThreadedConnector(AxisService service, boolean 
acceptor) {
+           Parameter param = 
service.getParameter(FIXConstants.FIX_USE_THREADED_ACCEPTOR);
+        if (acceptor && param != null && 
JavaUtils.isTrueExplicitly(param.getValue())) {
+            return true;
+        }
+
+        param = service.getParameter(FIXConstants.FIX_USE_THREADED_INITIATOR);
+        if (!acceptor && param != null && 
JavaUtils.isTrueExplicitly(param.getValue())) {
+            return true;
+        }
+
+        param = service.getParameter(FIXConstants.FIX_USE_THREADED_CONNECTORS);
+        if (param != null && JavaUtils.isTrueExplicitly(param.getValue())) {
+            return true;
+        }
+
+        return false;
+    }
 }
 
 


Reply via email to