Author: hiranya
Date: Tue Jan 3 06:44:07 2012
New Revision: 1226672
URL: http://svn.apache.org/viewvc?rev=1226672&view=rev
Log:
Some improvements and control over how threads are handled by the FIX transport
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java?rev=1226672&r1=1226671&r2=1226672&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
Tue Jan 3 06:44:07 2012
@@ -107,6 +107,14 @@ public class FIXConstants {
public static final String FIX_ACCEPTOR_EVENT_HANDLER =
"transport.fix.AcceptorSessionEventHandler";
public static final String FIX_INITIATOR_EVENT_HANDLER =
"transport.fix.InitiatorSessionEventHandler";
+ public static final String FIX_PROCESS_SINGLE_THREADED =
"transport.fix.ProcessSingleThreaded";
+ public static final String FIX_ACCEPTOR_SINGLE_THREADED =
"transport.fix.AcceptorSingleThreaded";
+ public static final String FIX_INITIATOR_SINGLE_THREADED =
"transport.fix.InitiatorSingleThreaded";
+
+ public static final String FIX_USE_THREADED_CONNECTORS =
"transport.fix.UseThreadedConnectors";
+ public static final String FIX_USE_THREADED_ACCEPTOR =
"transport.fix.UseThreadedAcceptor";
+ public static final String FIX_USE_THREADED_INITIATOR =
"transport.fix.UseThreadedInitiator";
+
//--------------------------- Message level properties
-----------------------------------
public static final String FIX_IGNORE_ORDER = "transport.fix.IgnoreOrder";
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java?rev=1226672&r1=1226671&r2=1226672&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
Tue Jan 3 06:44:07 2012
@@ -21,6 +21,7 @@ package org.apache.synapse.transport.fix
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.util.JavaUtils;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -70,9 +71,10 @@ public class FIXIncomingMessageHandler i
private boolean dropExtraResponses = false;
private Semaphore semaphore;
private SessionEventHandler eventHandler;
+ private boolean singleThreaded;
public FIXIncomingMessageHandler(ConfigurationContext cfgCtx, WorkerPool
workerPool,
- AxisService service, boolean acceptor) {
+ AxisService service, boolean acceptor) {
this.cfgCtx = cfgCtx;
this.workerPool = workerPool;
this.service = service;
@@ -104,6 +106,8 @@ public class FIXIncomingMessageHandler i
eventHandlerParam.getValue(), e);
}
}
+
+ singleThreaded = isSingleThreaded();
}
private void getResponseHandlingApproach() {
@@ -118,6 +122,35 @@ public class FIXIncomingMessageHandler i
}
}
+ private boolean isSingleThreaded() {
+ Parameter singleThreadParam = service.getParameter(
+ FIXConstants.FIX_ACCEPTOR_SINGLE_THREADED);
+ if (acceptor && singleThreadParam != null &&
+ JavaUtils.isTrueExplicitly(singleThreadParam.getValue())) {
+ log.info("FIX acceptor for service: " + service.getName() + " is
single threaded");
+ return true;
+ }
+
+ singleThreadParam = service.getParameter(
+ FIXConstants.FIX_INITIATOR_SINGLE_THREADED);
+ if (!acceptor && singleThreadParam != null &&
+ JavaUtils.isTrueExplicitly(singleThreadParam.getValue())) {
+ log.info("FIX initiator for service: " + service.getName() + " is
single threaded");
+ return true;
+ }
+
+ singleThreadParam = service.getParameter(
+ FIXConstants.FIX_PROCESS_SINGLE_THREADED);
+ if (singleThreadParam != null &&
+ JavaUtils.isTrueExplicitly(singleThreadParam.getValue())) {
+ log.info("FIX processor for service: " + service.getName() + " is
single threaded");
+ return true;
+ }
+
+ return false;
+ }
+
+
public void setOutgoingMessageContext(MessageContext msgCtx) {
if (!allNewApproach) {
outgoingMessages.offer(msgCtx);
@@ -260,7 +293,7 @@ public class FIXIncomingMessageHandler i
* @throws DoNotSend This exception aborts message transmission
*/
public void toApp(Message message, SessionID sessionID) throws DoNotSend {
- if (log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
StringBuffer sb = new StringBuffer();
try {
sb.append("Sending application level FIX message to
").append(message.getHeader().getField(new TargetCompID()).getValue());
@@ -314,7 +347,108 @@ public class FIXIncomingMessageHandler i
if (rolled && log.isDebugEnabled()) {
log.debug("Incoming request counter rolled over for the session: "
+ sessionID);
}
- workerPool.execute(new FIXWorkerThread(message, sessionID, counter));
+ if (singleThreaded) {
+ processMessage(message, sessionID, counter);
+ } else {
+ workerPool.execute(new FIXWorkerThread(message, sessionID,
counter));
+ }
+ }
+
+ public void processMessage(Message message, SessionID sessionID, int
counter) {
+ if (allNewApproach) {
+ //treat all messages (including responses) as new messages
+ handleIncomingRequest(message, sessionID, counter);
+ } else {
+ if (acceptor) {
+ //treat messages coming from an acceptor as new request
messages
+ handleIncomingRequest(message, sessionID, counter);
+ } else {
+ MessageContext outMsgCtx = outgoingMessages.poll();
+ if (outMsgCtx != null) {
+ //handle as a response to an outgoing message
+ handleIncomingResponse(outMsgCtx, message, sessionID,
counter);
+ } else if (!dropExtraResponses) {
+ //handle as a new request message
+ handleIncomingRequest(message, sessionID, counter);
+ } else {
+ log.debug("Dropping additional FIX response");
+ }
+ }
+ }
+ }
+
+ private void handleIncomingRequest(Message message, SessionID sessionID,
int counter) {
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + sessionID + " - Received message
with sequence " +
+ "number " + counter);
+ }
+
+ //Create message context for the incoming message
+ AbstractTransportListener trpListener = (AbstractTransportListener)
cfgCtx.getAxisConfiguration().
+ getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
+
+ MessageContext msgCtx = trpListener.createMessageContext();
+ msgCtx.setProperty(Constants.OUT_TRANSPORT_INFO, new
FIXOutTransportInfo(sessionID));
+
+ if (service != null) {
+ // Set the service for which the message is intended to
+ msgCtx.setAxisService(service);
+ // find the operation for the message, or default to one
+ Parameter operationParam =
service.getParameter(BaseConstants.OPERATION_PARAM);
+ QName operationQName = (
+ operationParam != null ?
+
BaseUtils.getQNameFromString(operationParam.getValue()) :
+ BaseConstants.DEFAULT_OPERATION);
+
+ AxisOperation operation = service.getOperation(operationQName);
+ if (operation != null) {
+ msgCtx.setAxisOperation(operation);
+
msgCtx.setAxisMessage(operation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
+ msgCtx.setSoapAction("urn:" +
operation.getName().getLocalPart());
+ }
+ }
+
+ String fixApplication = FIXConstants.FIX_INITIATOR;
+ if (acceptor) {
+ fixApplication = FIXConstants.FIX_ACCEPTOR;
+ } else {
+ msgCtx.setProperty("synapse.isresponse", true);
+ }
+
+ try {
+ //Put the FIX message in a SOAPEnvelope
+ FIXUtils.getInstance().setSOAPEnvelope(message, counter,
sessionID.toString(), msgCtx);
+ trpListener.handleIncomingMessage(
+ msgCtx,
+ FIXUtils.getTransportHeaders(service.getName(),
fixApplication),
+ null,
+ FIXConstants.FIX_DEFAULT_CONTENT_TYPE
+ );
+ } catch (AxisFault e) {
+ handleException("Error while processing FIX message", e);
+ }
+ }
+
+ private void handleIncomingResponse(MessageContext outMsgCtx, Message
message,
+ SessionID sessionID, int counter) {
+ AbstractTransportSender trpSender = (AbstractTransportSender)
cfgCtx.getAxisConfiguration().
+ getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
+
+ MessageContext msgCtx =
trpSender.createResponseMessageContext(outMsgCtx);
+
+ try {
+ //Put the FIX message in a SOAPEnvelope
+ FIXUtils.getInstance().setSOAPEnvelope(message, counter,
sessionID.toString(), msgCtx);
+ msgCtx.setServerSide(true);
+ trpSender.handleIncomingMessage(
+ msgCtx,
+ FIXUtils.getTransportHeaders(service.getName(),
FIXConstants.FIX_INITIATOR),
+ null,
+ FIXConstants.FIX_DEFAULT_CONTENT_TYPE
+ );
+ } catch (AxisFault e) {
+ handleException("Error while processing response FIX message", e);
+ }
}
/**
@@ -333,105 +467,9 @@ public class FIXIncomingMessageHandler i
this.counter = counter;
}
- private void handleIncomingRequest() {
- if (log.isDebugEnabled()) {
- log.debug("Source session: " + sessionID + " - Received
message with sequence " +
- "number " + counter);
- }
-
- //Create message context for the incoming message
- AbstractTransportListener trpListener =
(AbstractTransportListener) cfgCtx.getAxisConfiguration().
- getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
-
- MessageContext msgCtx = trpListener.createMessageContext();
- msgCtx.setProperty(Constants.OUT_TRANSPORT_INFO, new
FIXOutTransportInfo(sessionID));
-
- if (service != null) {
- // Set the service for which the message is intended to
- msgCtx.setAxisService(service);
- // find the operation for the message, or default to one
- Parameter operationParam =
service.getParameter(BaseConstants.OPERATION_PARAM);
- QName operationQName = (
- operationParam != null ?
-
BaseUtils.getQNameFromString(operationParam.getValue()) :
- BaseConstants.DEFAULT_OPERATION);
-
- AxisOperation operation = service.getOperation(operationQName);
- if (operation != null) {
- msgCtx.setAxisOperation(operation);
-
msgCtx.setAxisMessage(operation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
- msgCtx.setSoapAction("urn:" +
operation.getName().getLocalPart());
- }
- }
-
- String fixApplication = FIXConstants.FIX_INITIATOR;
- if (acceptor) {
- fixApplication = FIXConstants.FIX_ACCEPTOR;
- } else {
- msgCtx.setProperty("synapse.isresponse", true);
- }
-
- try {
- //Put the FIX message in a SOAPEnvelope
- FIXUtils.getInstance().setSOAPEnvelope(message, counter,
sessionID.toString(), msgCtx);
- trpListener.handleIncomingMessage(
- msgCtx,
- FIXUtils.getTransportHeaders(service.getName(),
fixApplication),
- null,
- FIXConstants.FIX_DEFAULT_CONTENT_TYPE
- );
- } catch (AxisFault e) {
- handleException("Error while processing FIX message", e);
- }
- }
-
- private void handleIncomingResponse(MessageContext outMsgCtx) {
- AbstractTransportSender trpSender = (AbstractTransportSender)
cfgCtx.getAxisConfiguration().
-
getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
-
- MessageContext msgCtx =
trpSender.createResponseMessageContext(outMsgCtx);
-
- try {
- //Put the FIX message in a SOAPEnvelope
- FIXUtils.getInstance().setSOAPEnvelope(message, counter,
sessionID.toString(), msgCtx);
- msgCtx.setServerSide(true);
- trpSender.handleIncomingMessage(
- msgCtx,
- FIXUtils.getTransportHeaders(service.getName(),
FIXConstants.FIX_INITIATOR),
- null,
- FIXConstants.FIX_DEFAULT_CONTENT_TYPE
- );
- } catch (AxisFault e) {
- handleException("Error while processing response FIX message",
e);
- }
- }
-
public void run() {
-
- if (allNewApproach) {
- //treat all messages (including responses) as new messages
- handleIncomingRequest();
- }
- else {
- if (acceptor) {
- //treat messages coming from an acceptor as new request
messages
- handleIncomingRequest();
- }
- else {
- MessageContext outMsgCtx = outgoingMessages.poll();
- if (outMsgCtx != null) {
- //handle as a response to an outgoing message
- handleIncomingResponse(outMsgCtx);
- } else if (!dropExtraResponses) {
- //handle as a new request message
- handleIncomingRequest();
- } else {
- log.debug("Dropping additional FIX response");
- }
- }
- }
+ processMessage(message, sessionID, counter);
}
-
}
}
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=1226672&r1=1226671&r2=1226672&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
Tue Jan 3 06:44:07 2012
@@ -24,6 +24,7 @@ import org.apache.axis2.description.Axis
import org.apache.axis2.description.Parameter;
import org.apache.axis2.transport.base.BaseUtils;
import org.apache.axis2.transport.base.threads.WorkerPool;
+import org.apache.axis2.util.JavaUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quickfixj.jmx.JmxExporter;
@@ -120,13 +121,24 @@ public class FIXSessionFactory {
//Get a new FIX Application
Application messageHandler =
applicationFactory.getFIXApplication(service,
listenerThreadPool, true);
+ boolean threadedConnector = useThreadedConnector(service,
true);
//Create a new FIX Acceptor
- Acceptor acceptor = new SocketAcceptor(
- messageHandler,
- storeFactory,
- settings,
- logFactory,
- messageFactory);
+ Acceptor acceptor;
+ if (threadedConnector) {
+ acceptor = new ThreadedSocketAcceptor(
+ messageHandler,
+ storeFactory,
+ settings,
+ logFactory,
+ messageFactory);
+ } else {
+ acceptor = new SocketAcceptor(
+ messageHandler,
+ storeFactory,
+ settings,
+ logFactory,
+ messageFactory);
+ }
acceptor.start();
initJMX(acceptor, service.getName());
@@ -197,15 +209,26 @@ public class FIXSessionFactory {
//Get a new FIX application
Application messageHandler =
applicationFactory.getFIXApplication(service,
senderThreadPool, false);
+ boolean threadedConnector = useThreadedConnector(service, false);
try {
//Create a new FIX initiator
- Initiator initiator = new SocketInitiator(
- messageHandler,
- storeFactory,
- settings,
- logFactory,
- messageFactory);
+ Initiator initiator;
+ if (threadedConnector) {
+ initiator = new ThreadedSocketInitiator(
+ messageHandler,
+ storeFactory,
+ settings,
+ logFactory,
+ messageFactory);
+ } else {
+ initiator = new SocketInitiator(
+ messageHandler,
+ storeFactory,
+ settings,
+ logFactory,
+ messageFactory);
+ }
initiator.start();
initJMX(initiator, service.getName());
@@ -242,12 +265,23 @@ public class FIXSessionFactory {
Application messageHandler =
applicationFactory.getFIXApplication(service,
senderThreadPool, false);
- Initiator initiator = new SocketInitiator(
- messageHandler,
- storeFactory,
- settings,
- logFactory,
- messageFactory);
+ boolean threadedConnector = useThreadedConnector(service,
false);
+ Initiator initiator;
+ if (threadedConnector) {
+ initiator = new ThreadedSocketInitiator(
+ messageHandler,
+ storeFactory,
+ settings,
+ logFactory,
+ messageFactory);
+ } else {
+ initiator = new SocketInitiator(
+ messageHandler,
+ storeFactory,
+ settings,
+ logFactory,
+ messageFactory);
+ }
initiator.start();
initJMX(initiator, service.getName());
@@ -511,6 +545,25 @@ public class FIXSessionFactory {
log.error("Error while initializing JMX support for the service: "
+ service, e);
}
}
+
+ private boolean useThreadedConnector(AxisService service, boolean
acceptor) {
+ Parameter param =
service.getParameter(FIXConstants.FIX_USE_THREADED_ACCEPTOR);
+ if (acceptor && param != null &&
JavaUtils.isTrueExplicitly(param.getValue())) {
+ return true;
+ }
+
+ param = service.getParameter(FIXConstants.FIX_USE_THREADED_INITIATOR);
+ if (!acceptor && param != null &&
JavaUtils.isTrueExplicitly(param.getValue())) {
+ return true;
+ }
+
+ param = service.getParameter(FIXConstants.FIX_USE_THREADED_CONNECTORS);
+ if (param != null && JavaUtils.isTrueExplicitly(param.getValue())) {
+ return true;
+ }
+
+ return false;
+ }
}