Author: hiranya
Date: Mon Apr 18 06:00:45 2011
New Revision: 1094238
URL: http://svn.apache.org/viewvc?rev=1094238&view=rev
Log:
Implementing SYNAPSE-668 and some minor bug fixes in the
FIXOutgoingMessageHandler
Added:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXConstants.java
Mon Apr 18 06:00:45 2011
@@ -54,6 +54,7 @@ public class FIXConstants {
public static final long DEFAULT_HEART_BT_INT_VALUE = 30;
public static final String DEFAULT_START_TIME_VALUE = "00:00:00";
public static final String DEFAULT_END_TIME_VALUE = "00:00:00";
+ public static final int DEFAULT_COUNTER_UPPER_LIMIT = 1000000000;
public static final String HEART_BY_INT = "HeartBtInt";
public static final String BEGIN_STRING = "BeginString";
@@ -102,4 +103,11 @@ public class FIXConstants {
public static final String FIX_DROP_EXTRA_RESPONSES =
"transport.fix.DropExtraResponses";
+ public static final String FIX_ACCEPTOR_EVENT_HANDLER =
"transport.fix.AcceptorSessionEventHandler";
+ public static final String FIX_INITIATOR_EVENT_HANDLER =
"transport.fix.InitiatorSessionEventHandler";
+
+ //--------------------------- Message level properties
-----------------------------------
+
+ public static final String FIX_IGNORE_ORDER = "transport.fix.IgnoreOrder";
+
}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXIncomingMessageHandler.java
Mon Apr 18 06:00:45 2011
@@ -41,11 +41,12 @@ import quickfix.field.SenderCompID;
import quickfix.field.TargetCompID;
import javax.xml.namespace.QName;
-import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* FIXIncomingMessageHandler is responsible for handling all incoming FIX
messages. This is where the
@@ -63,11 +64,12 @@ public class FIXIncomingMessageHandler i
/** A boolean value indicating the type of the FIX application */
private boolean acceptor;
/** A Map of counters with one counter per session */
- private Map<SessionID, Integer> countersMap;
+ private Map<SessionID, AtomicInteger> countersMap;
private Queue<MessageContext> outgoingMessages;
- private boolean allNewApproach;
- private boolean dropExtraResponses;
+ private boolean allNewApproach = true;
+ private boolean dropExtraResponses = false;
private Semaphore semaphore;
+ private SessionEventHandler eventHandler;
public FIXIncomingMessageHandler(ConfigurationContext cfgCtx, WorkerPool
workerPool,
AxisService service, boolean acceptor) {
@@ -76,25 +78,43 @@ public class FIXIncomingMessageHandler i
this.service = service;
this.log = LogFactory.getLog(this.getClass());
this.acceptor = acceptor;
- countersMap = new HashMap<SessionID, Integer>();
+ countersMap = new ConcurrentHashMap<SessionID, AtomicInteger>();
outgoingMessages = new LinkedBlockingQueue<MessageContext>();
semaphore = new Semaphore(0);
getResponseHandlingApproach();
+
+ Parameter eventHandlerParam;
+ if (acceptor) {
+ eventHandlerParam =
service.getParameter(FIXConstants.FIX_ACCEPTOR_EVENT_HANDLER);
+ } else {
+ eventHandlerParam =
service.getParameter(FIXConstants.FIX_INITIATOR_EVENT_HANDLER);
+ }
+
+ if (eventHandlerParam != null && eventHandlerParam.getValue() != null
&&
+ !"".equals(eventHandlerParam.getValue())) {
+ try {
+ Class clazz = getClass().getClassLoader().loadClass(
+ (String) eventHandlerParam.getValue());
+ eventHandler = (SessionEventHandler) clazz.newInstance();
+ } catch (ClassNotFoundException e) {
+ log.error("Unable to find the session event handler class: " +
+ eventHandlerParam.getValue(), e);
+ } catch (Exception e) {
+ log.error("Error while initializing the session event handler
class: " +
+ eventHandlerParam.getValue(), e);
+ }
+ }
}
private void getResponseHandlingApproach() {
Parameter param =
service.getParameter(FIXConstants.FIX_RESPONSE_HANDLER_APPROACH);
if (param != null && "false".equals(param.getValue().toString())) {
allNewApproach = false;
- } else {
- allNewApproach = true;
}
Parameter dropResponsesParam =
service.getParameter(FIXConstants.FIX_DROP_EXTRA_RESPONSES);
if (dropResponsesParam != null &&
"true".equals(dropResponsesParam.getValue().toString())) {
dropExtraResponses = true;
- } else {
- dropExtraResponses = false;
}
}
@@ -119,12 +139,15 @@ public class FIXIncomingMessageHandler i
* Sessions exist whether or not a counter party is connected to it. As
soon
* as a session is created, the application can begin sending messages to
it. If no one
* is logged on, the messages will be sent at the time a connection is
- * established with the counterparty.
+ * established with the counter party.
*
* @param sessionID QuickFIX session ID
*/
public void onCreate(SessionID sessionID) {
log.info("New FIX session created: " + sessionID.toString());
+ if (eventHandler != null) {
+ eventHandler.onCreate(sessionID);
+ }
}
/**
@@ -136,9 +159,15 @@ public class FIXIncomingMessageHandler i
* @param sessionID QuickFIX session ID
*/
public void onLogon(SessionID sessionID) {
- countersMap.put(sessionID, 0);
+ if (!countersMap.containsKey(sessionID)) {
+ countersMap.put(sessionID, new AtomicInteger(0));
+ }
log.info("FIX session logged on: " + sessionID.toString());
semaphore.release();
+
+ if (eventHandler != null) {
+ eventHandler.onLogon(sessionID);
+ }
}
/**
@@ -149,11 +178,14 @@ public class FIXIncomingMessageHandler i
* @param sessionID QuickFIX session ID
*/
public void onLogout(SessionID sessionID) {
- countersMap.put(sessionID, 0);
FIXTransportSender trpSender = (FIXTransportSender)
cfgCtx.getAxisConfiguration().
getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
trpSender.logOutIncomingSession(sessionID);
log.info("FIX session logged out: " + sessionID.toString());
+
+ if (eventHandler != null) {
+ eventHandler.onLogout(sessionID);
+ }
}
/**
@@ -182,6 +214,10 @@ public class FIXIncomingMessageHandler i
log.trace("Message: " + message.toString());
}
}
+
+ if (eventHandler != null) {
+ eventHandler.toAdmin(message, sessionID);
+ }
}
/**
@@ -209,11 +245,15 @@ public class FIXIncomingMessageHandler i
log.trace("Message: " + message.toString());
}
}
+
+ if (eventHandler != null) {
+ eventHandler.fromAdmin(message, sessionID);
+ }
}
/**
* This is a callback for application messages that are being sent to a
- * counterparty.
+ * counter party.
*
* @param message QuickFIX message
* @param sessionID QuickFIX session ID
@@ -236,6 +276,10 @@ public class FIXIncomingMessageHandler i
log.trace("Message: " + message.toString());
}
}
+
+ if (eventHandler != null) {
+ eventHandler.toApp(message, sessionID);
+ }
}
/**
@@ -264,10 +308,12 @@ public class FIXIncomingMessageHandler i
}
}
- int counter = countersMap.get(sessionID);
- counter++;
- countersMap.put(sessionID, counter);
-
+ AtomicInteger atomicCounter = countersMap.get(sessionID);
+ int counter = atomicCounter.incrementAndGet();
+ boolean rolled =
atomicCounter.compareAndSet(FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT, 0);
+ if (rolled && log.isDebugEnabled()) {
+ log.debug("Incoming request counter rolled over for the session: "
+ sessionID);
+ }
workerPool.execute(new FIXWorkerThread(message, sessionID, counter));
}
@@ -288,7 +334,12 @@ public class FIXIncomingMessageHandler i
}
private void handleIncomingRequest() {
- //Create message context for the incmong message
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + sessionID + " - Received
message with sequence " +
+ "number " + counter);
+ }
+
+ //Create message context for the incoming message
AbstractTransportListener trpListener =
(AbstractTransportListener) cfgCtx.getAxisConfiguration().
getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
@@ -311,16 +362,12 @@ public class FIXIncomingMessageHandler i
msgCtx.setAxisMessage(operation.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
msgCtx.setSoapAction("urn:" +
operation.getName().getLocalPart());
}
- } else {
- log.warn("Service information not available for the FIX
message processor");
- return;
}
String fixApplication = FIXConstants.FIX_INITIATOR;
if (acceptor) {
fixApplication = FIXConstants.FIX_ACCEPTOR;
- }
- else {
+ } else {
msgCtx.setProperty("synapse.isresponse", true);
}
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXOutgoingMessageHandler.java
Mon Apr 18 06:00:45 2011
@@ -20,28 +20,32 @@
package org.apache.synapse.transport.fix;
import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionNotFound;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* FIXOutgoingMessageHandler makes sure that messages are delivered in the
order they were received by
- * a FIX acceptor. In case the message arrived over a different transport
srill this class will try to
+ * a FIX acceptor. In case the message arrived over a different transport
still this class will try to
* put the messages in correct order based on the counter value of the message.
*/
public class FIXOutgoingMessageHandler {
+ private static final Log log =
LogFactory.getLog(FIXOutgoingMessageHandler.class);
+
private Map<String, Integer> countersMap;
private Map<String, Map<Integer,Object[]>> messagesMap;
private FIXSessionFactory sessionFactory;
public FIXOutgoingMessageHandler() {
- countersMap = new HashMap<String, Integer>();
- messagesMap = new HashMap<String, Map<Integer,Object[]>>();
+ countersMap = new ConcurrentHashMap<String, Integer>();
+ messagesMap = new ConcurrentHashMap<String, Map<Integer,Object[]>>();
}
public void setSessionFactory(FIXSessionFactory sessionFactory) {
@@ -64,35 +68,43 @@ public class FIXOutgoingMessageHandler {
public synchronized void sendMessage(Message message, SessionID
targetSession, String sourceSession,
int counter, MessageContext msgCtx, String
targetEPR) throws SessionNotFound {
- if (sourceSession != null && counter != -1) {
+ boolean ignoreOrder =
"true".equals(msgCtx.getProperty(FIXConstants.FIX_IGNORE_ORDER));
+ if (sourceSession != null && counter != -1 && !ignoreOrder) {
int expectedValue;
if (countersMap.containsKey(sourceSession)) {
expectedValue = countersMap.get(sourceSession);
- }
- else {
+ } else {
//create new entries in the respective Maps
//counter starts at 1
countersMap.put(sourceSession, 1);
- messagesMap.put(sourceSession, new
HashMap<Integer,Object[]>());
+ messagesMap.put(sourceSession, new
ConcurrentHashMap<Integer,Object[]>());
expectedValue = 1;
}
if (expectedValue == counter) {
sendToTarget(msgCtx, targetEPR, message, targetSession);
- countersMap.put(sourceSession, expectedValue++);
+ if (FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT == expectedValue)
{
+ if (log.isDebugEnabled()) {
+ log.debug("Outgoing request counter rolled over for
the session: " +
+ sourceSession + " (from " + expectedValue +
")");
+ }
+ expectedValue = 1;
+ }
+ countersMap.put(sourceSession, ++expectedValue);
sendQueuedMessages(expectedValue, sourceSession);
}
else {
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + sourceSession + " -
Expected sequence number (" +
+ expectedValue + ") does not match with the actual
sequence number (" +
+ counter + "). Holding the message back for later
delivery.");
+ }
+
//save the message to be sent later...
Map<Integer,Object[]> messages =
messagesMap.get(sourceSession);
- Object[] obj = new Object[4];
- obj[0] = message;
- obj[1] = targetSession;
- obj[2] = msgCtx;
- obj[3] = targetEPR;
+ Object[] obj = new Object[] { message, targetSession, msgCtx,
targetEPR } ;
messages.put(counter, obj);
- messagesMap.put(sourceSession, messages);
}
}
else {
@@ -144,11 +156,23 @@ public class FIXOutgoingMessageHandler {
String targetEPR = null;
if (obj[2] != null) {
msgCtx = (MessageContext) obj[2];
- targetEPR = obj[3].toString();
+ targetEPR = (String) obj[3];
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + session + " - Sending the
previously queued message " +
+ "with the sequence number: " + expectedValue);
}
sendToTarget(msgCtx, targetEPR, message, sessionID);
messages.remove(expectedValue);
- obj = messages.get(expectedValue++);
+ if (FIXConstants.DEFAULT_COUNTER_UPPER_LIMIT == expectedValue) {
+ if (log.isDebugEnabled()) {
+ log.debug("Outgoing request counter rolled over for the
session: " + session +
+ " (from " + expectedValue + ")");
+ }
+ expectedValue = 1;
+ }
+ obj = messages.get(++expectedValue);
}
messagesMap.put(session, messages);
countersMap.put(session, expectedValue);
@@ -163,6 +187,12 @@ public class FIXOutgoingMessageHandler {
if (obj != null) {
Message message = (Message) obj[0];
SessionID sessionID = (SessionID) obj[1];
+
+ if (log.isDebugEnabled()) {
+ log.debug("Source session: " + session + " - Flushing
the previously queued " +
+ "message with the sequence number: " +
expectedValue);
+ }
+
try {
Session.sendToTarget(message, sessionID);
} catch (SessionNotFound ignore) { }
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
Mon Apr 18 06:00:45 2011
@@ -98,7 +98,7 @@ public class FIXSessionFactory {
* acceptorStore keyed by the service name and start it.
*
* @param service the AxisService
- * @return true if the acceptor was created and started properly and false
otherwise
+ * @return true if the acceptor is successfully initialized and false
otherwise
* @throws AxisFault if the acceptor cannot be created
*/
public boolean createFIXAcceptor(AxisService service) throws AxisFault {
@@ -175,7 +175,7 @@ public class FIXSessionFactory {
try {
settings = new SessionSettings(fixConfigStream);
} catch (ConfigError e) {
- throw new AxisFault("Error in the specified FIX configuration
for the initiaotr. " +
+ throw new AxisFault("Error in the specified FIX configuration
for the initiator. " +
"Unable to initialize a FIX session for the service " +
service.getName(), e);
}
@@ -349,7 +349,7 @@ public class FIXSessionFactory {
* @param serviceName the name of the AxisService
* @return a FIX Acceptor for the service
*/
- public Acceptor getAccepter(String serviceName) {
+ public Acceptor getAcceptor(String serviceName) {
return acceptorStore.get(serviceName);
}
@@ -361,7 +361,7 @@ public class FIXSessionFactory {
*/
public Initiator getInitiator(String fixEPR) {
return initiatorStore.get(fixEPR);
- }
+ }
/**
* Get the FIX configuration URL from the services.xml.
@@ -391,8 +391,8 @@ public class FIXSessionFactory {
} catch (IOException e) {
log.error("Error while reading from the URL " +
fixConfigURLValue, e);
}
- } else if (log.isDebugEnabled()) {
- log.debug("FIX configuration URL is not specified for the service
" + service.getName());
+ } else {
+ log.info("FIX configuration URL is not specified for the service "
+ service.getName());
}
return fixConfigStream;
@@ -506,8 +506,7 @@ public class FIXSessionFactory {
jmxExporter.setRegistrationBehavior(JmxExporter.REGISTRATION_IGNORE_EXISTING);
jmxExporter.export(connector);
} catch (JMException e) {
- log.error("Error while initializing JMX support for the FIX
sessions in " +
- "service: " + service, e);
+ log.error("Error while initializing JMX support for the service: "
+ service, e);
}
}
}
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
Mon Apr 18 06:00:45 2011
@@ -71,7 +71,7 @@ public class FIXTransportListener extend
*
* @param service the service for which to listen for messages
*/
- protected void startListeningForService(AxisService service) {
+ public void startListeningForService(AxisService service) {
try {
boolean acceptorCreated =
fixSessionFactory.createFIXAcceptor(service);
boolean initiatorCreated =
fixSessionFactory.createFIXInitiator(service);
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
Mon Apr 18 06:00:45 2011
@@ -46,7 +46,7 @@ import java.util.Map;
* <p/>
* This transport sender implementation does not support forwarding FIX
messages to sessions with
* different BeginString values.When it performs a message forwarding it makes
sure the forwarding
- * takes place according to the conditions specified in the 'Thirs Party
Routing' section in the
+ * takes place according to the conditions specified in the 'Third Party
Routing' section in the
* FIX protocol specification.
*/
public class FIXTransportSender extends AbstractTransportSender {
@@ -108,7 +108,7 @@ public class FIXTransportSender extends
try {
fixMessage = FIXUtils.getInstance().createFIXMessage(msgCtx);
} catch (IOException e) {
- handleException("Exception occured while creating the FIX message
from SOAP Envelope", e);
+ handleException("Exception occurred while creating the FIX message
from SOAP Envelope", e);
}
if (FIXConstants.FIX_ACCEPTOR.equals(fixApplication)) {
@@ -118,11 +118,12 @@ public class FIXTransportSender extends
sendUsingEPR(targetEPR, serviceName, fixMessage,
sourceSession, counter, msgCtx);
} else if (outTransportInfo != null && outTransportInfo instanceof
FIXOutTransportInfo) {
//Send the message back to the sender
- sendUsingTrpOutInfo(outTransportInfo, serviceName, fixMessage,
sourceSession, counter);
+ sendUsingTrpOutInfo(outTransportInfo, serviceName, fixMessage,
+ sourceSession, counter, msgCtx);
}
} else if (FIXConstants.FIX_INITIATOR.equals(fixApplication)) {
- if (sendUsingAcceptorSession(serviceName, fixMessage,
sourceSession, counter)) {
+ if (sendUsingAcceptorSession(serviceName, fixMessage,
sourceSession, counter, msgCtx)) {
return;
} else if (targetEPR != null) {
sendUsingEPR(targetEPR, serviceName, fixMessage,
sourceSession, counter, msgCtx);
@@ -135,12 +136,12 @@ public class FIXTransportSender extends
if (targetEPR != null) {
sendUsingEPR(targetEPR, serviceName, fixMessage,
sourceSession, counter, msgCtx);
} else {
- sendUsingAcceptorSession(serviceName, fixMessage,
sourceSession, counter);
+ sendUsingAcceptorSession(serviceName, fixMessage,
sourceSession, counter, msgCtx);
}
}
}
- private boolean isTargetVald(Map<String, String> fieldValues, SessionID
targetSession,
+ private boolean isTargetValid(Map<String, String> fieldValues, SessionID
targetSession,
boolean beginStrValidation) {
String beginString = fieldValues.get(FIXConstants.BEGIN_STRING);
@@ -239,7 +240,7 @@ public class FIXTransportSender extends
/**
* Puts DeliverToX fields in the message to enable the message to be
forwarded at the destination.
- * This method retireves the parameters from the services.xml and put them
in the message as
+ * This method retrieves the parameters from the services.xml and put them
in the message as
* DeliverToX fields. Should be used when a response message has to
forwarded at the destination.
*
* @param message the FIX message to be forwarded
@@ -322,12 +323,14 @@ public class FIXTransportSender extends
* @param fixMessage the FIX message to be sent
* @param srcSession String uniquely identifying the incoming session
* @param counter application level sequence number of the message
- * @param serviceName name of the AxisSerivce for the message
+ * @param serviceName name of the AxisService for the message
+ * @param msgCtx Axis2 MessageContext
* @return boolean value indicating the result
* @throws AxisFault on error
*/
private boolean sendUsingTrpOutInfo(OutTransportInfo trpOutInfo, String
serviceName,
- Message fixMessage, String srcSession,
int counter) throws AxisFault {
+ Message fixMessage, String srcSession,
int counter,
+ MessageContext msgCtx) throws
AxisFault {
FIXOutTransportInfo fixOut = (FIXOutTransportInfo) trpOutInfo;
SessionID sessionID = fixOut.getSessionID();
@@ -356,7 +359,7 @@ public class FIXTransportSender extends
}
try {
- messageSender.sendMessage(fixMessage, sessionID, srcSession,
counter, null, null);
+ messageSender.sendMessage(fixMessage, sessionID, srcSession,
counter, msgCtx, null);
return true;
} catch (SessionNotFound e) {
log.error("Error while sending the FIX message. Session " +
sessionID.toString() + " does" +
@@ -366,22 +369,23 @@ public class FIXTransportSender extends
}
/**
- * Send the message using a session in the aaceptor side
+ * Send the message using a session in the acceptor side
*
* @param serviceName the service of the message
* @param fixMessage the FIX message to be sent
* @param srcSession String uniquely identifying the incoming session
* @param counter the application level sequence number of the message
+ * @param msgCtx Axi2 MessageContext
* @return boolean value indicating the result
* @throws AxisFault on error
*/
private boolean sendUsingAcceptorSession(String serviceName, Message
fixMessage, String srcSession,
- int counter) throws AxisFault {
+ int counter, MessageContext
msgCtx) throws AxisFault {
Map<String, String> fieldValues =
FIXUtils.getMessageForwardingParameters(fixMessage);
String deliverToCompID =
fieldValues.get(FIXConstants.DELIVER_TO_COMP_ID);
- Acceptor acceptor = sessionFactory.getAccepter(serviceName);
+ Acceptor acceptor = sessionFactory.getAcceptor(serviceName);
SessionID sessionID = null;
AxisService service =
cfgCtx.getAxisConfiguration().getService(serviceName);
@@ -390,14 +394,14 @@ public class FIXTransportSender extends
ArrayList<SessionID> sessions = acceptor.getSessions();
if (sessions.size() == 1) {
sessionID = sessions.get(0);
- if (deliverToCompID != null && !isTargetVald(fieldValues,
sessionID, isValidationOn(service))) {
+ if (deliverToCompID != null && !isTargetValid(fieldValues,
sessionID, isValidationOn(service))) {
sessionID = null;
}
} else if (sessions.size() > 1 && deliverToCompID != null) {
- for (int i = 0; i < sessions.size(); i++) {
- sessionID = sessions.get(i);
- if (isTargetVald(fieldValues, sessionID,
isValidationOn(service))) {
+ for (SessionID session : sessions) {
+ sessionID = session;
+ if (isTargetValid(fieldValues, sessionID,
isValidationOn(service))) {
break;
}
}
@@ -407,7 +411,8 @@ public class FIXTransportSender extends
if (sessionID != null) {
//Found a valid session. Now forward the message...
FIXOutTransportInfo fixOutInfo = new
FIXOutTransportInfo(sessionID);
- return sendUsingTrpOutInfo(fixOutInfo, serviceName, fixMessage,
srcSession, counter);
+ return sendUsingTrpOutInfo(fixOutInfo, serviceName, fixMessage,
+ srcSession, counter, msgCtx);
}
return false;
}
Modified:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java?rev=1094238&r1=1094237&r2=1094238&view=diff
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
(original)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXUtils.java
Mon Apr 18 06:00:45 2011
@@ -213,7 +213,7 @@ public class FIXUtils {
List<Group> groupList = message.getGroups(groupKey);
Iterator<Group> groupIterator = groupList.iterator();
- while(groupIterator.hasNext()) {
+ while (groupIterator.hasNext()) {
Group msgGroup = groupIterator.next();
OMElement groupField =
soapFactory.createOMElement(FIXConstants.FIX_GROUP, null);
// rec. call the method to process the repeating groups
Added:
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java?rev=1094238&view=auto
==============================================================================
---
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
(added)
+++
synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/SessionEventHandler.java
Mon Apr 18 06:00:45 2011
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.synapse.transport.fix;
+
+public interface SessionEventHandler {
+
+ void onCreate(quickfix.SessionID sessionID);
+
+ void onLogon(quickfix.SessionID sessionID);
+
+ void onLogout(quickfix.SessionID sessionID);
+
+ void toAdmin(quickfix.Message message, quickfix.SessionID sessionID);
+
+ void fromAdmin(quickfix.Message message, quickfix.SessionID sessionID);
+
+ void toApp(quickfix.Message message, quickfix.SessionID sessionID);
+
+}