Author: davsclaus
Date: Thu May 24 05:14:28 2012
New Revision: 1342146
URL: http://svn.apache.org/viewvc?rev=1342146&view=rev
Log:
CAMEL-5247: Send failures with quickfix now causes exception to be thrown.
Thanks to Marek Strejczek for the patch.
Added:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
Added:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java?rev=1342146&view=auto
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
(added)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
Thu May 24 05:14:28 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.quickfixj.QFJException;
+
+public class CannotSendException extends QFJException {
+
+ private static final long serialVersionUID = 6424768414275772380L;
+
+ public CannotSendException(final String message) {
+ super(message);
+ }
+
+ public CannotSendException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+}
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1342146&r1=1342145&r2=1342146&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
Thu May 24 05:14:28 2012
@@ -21,7 +21,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-
+import org.quickfixj.QFJException;
import quickfix.MessageUtils;
import quickfix.Session;
import quickfix.SessionID;
@@ -45,25 +45,23 @@ public class QuickfixjConsumer extends D
}
}
- private void sendOutMessage(Exchange exchange) {
- try {
- Message camelMessage = exchange.getOut();
- quickfix.Message quickfixjMessage =
camelMessage.getBody(quickfix.Message.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Sending FIX message reply: " +
quickfixjMessage.toString());
- }
-
- SessionID messageSessionID =
MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
-
- Session session = getSession(messageSessionID);
- if (session == null) {
- throw new IllegalStateException("Unknown session: " +
messageSessionID);
- }
-
- session.send(quickfixjMessage);
- } catch (Exception e) {
- exchange.setException(e);
+ private void sendOutMessage(Exchange exchange) throws QFJException {
+ Message camelMessage = exchange.getOut();
+ quickfix.Message quickfixjMessage =
camelMessage.getBody(quickfix.Message.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sending FIX message reply: " +
quickfixjMessage.toString());
+ }
+
+ SessionID messageSessionID =
MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
+
+ Session session = getSession(messageSessionID);
+ if (session == null) {
+ throw new IllegalStateException("Unknown session: " +
messageSessionID);
+ }
+
+ if (!session.send(quickfixjMessage)) {
+ throw new CannotSendException("Could not send FIX message reply: "
+ quickfixjMessage.toString());
}
}
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1342146&r1=1342145&r2=1342146&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
Thu May 24 05:14:28 2012
@@ -39,43 +39,45 @@ public class QuickfixjProducer extends D
}
public void process(Exchange exchange) throws Exception {
- sendMessage(exchange, exchange.getIn());
- }
-
- void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage)
throws InterruptedException {
try {
- Message message = camelMessage.getBody(Message.class);
- log.debug("Sending FIX message: {}", message);
-
- SessionID messageSessionID = sessionID;
- if (messageSessionID == null) {
- messageSessionID = MessageUtils.getSessionID(message);
- }
-
- Session session = getSession(messageSessionID);
- if (session == null) {
- throw new IllegalStateException("Unknown session: " +
messageSessionID);
- }
-
- Callable<Message> callable = null;
-
- if (exchange.getPattern().isOutCapable()) {
- QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
- MessageCorrelator messageCorrelator =
endpoint.getEngine().getMessageCorrelator();
- callable = messageCorrelator.getReply(endpoint.getSessionID(),
exchange);
- }
-
- session.send(message);
-
- if (callable != null) {
- Message reply = callable.call();
- exchange.getOut().setBody(reply);
- }
+ sendMessage(exchange, exchange.getIn());
} catch (Exception e) {
exchange.setException(e);
}
}
+ void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage)
throws Exception {
+ Message message = camelMessage.getBody(Message.class);
+ log.debug("Sending FIX message: {}", message);
+
+ SessionID messageSessionID = sessionID;
+ if (messageSessionID == null) {
+ messageSessionID = MessageUtils.getSessionID(message);
+ }
+
+ Session session = getSession(messageSessionID);
+ if (session == null) {
+ throw new IllegalStateException("Unknown session: " +
messageSessionID);
+ }
+
+ Callable<Message> callable = null;
+
+ if (exchange.getPattern().isOutCapable()) {
+ QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
+ MessageCorrelator messageCorrelator =
endpoint.getEngine().getMessageCorrelator();
+ callable = messageCorrelator.getReply(endpoint.getSessionID(),
exchange);
+ }
+
+ if (!session.send(message)) {
+ throw new CannotSendException("Cannot send FIX message: " +
message.toString());
+ }
+
+ if (callable != null) {
+ Message reply = callable.call();
+ exchange.getOut().setBody(reply);
+ }
+ }
+
Session getSession(SessionID messageSessionID) {
return Session.lookupSession(messageSessionID);
}
Modified:
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1342146&r1=1342145&r2=1342146&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
(original)
+++
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
Thu May 24 05:14:28 2012
@@ -86,12 +86,12 @@ public class QuickfixjProducerTest {
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
Mockito.doThrow(new
TestException()).when(mockSession).send(Mockito.isA(Message.class));
- producer.process(mockExchange);
+ producer.process(mockExchange);
Mockito.verify(mockExchange).setException(Matchers.isA(TestException.class));
}
@Test
- public void processInOnlyExchange() throws Exception {
+ public void processInOnlyExchangeSuccess() throws Exception {
Session mockSession =
Mockito.spy(TestSupport.createSession(sessionID));
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
@@ -101,9 +101,21 @@ public class QuickfixjProducerTest {
Mockito.verify(mockExchange,
Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
Mockito.verify(mockSession).send(inboundFixMessage);
}
+
+ @Test
+ public void processInOnlyExchangeSendUnsuccessful() throws Exception {
+ Session mockSession =
Mockito.spy(TestSupport.createSession(sessionID));
+
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+
Mockito.doReturn(false).when(mockSession).send(Mockito.isA(Message.class));
+
+ producer.process(mockExchange);
+
+ Mockito.verify(mockSession).send(inboundFixMessage);
+
Mockito.verify(mockExchange).setException(Matchers.isA(CannotSendException.class));
+ }
@Test
- public void processInOutExchange() throws Exception {
+ public void processInOutExchangeSuccess() throws Exception {
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).thenReturn(
new MessagePredicate(sessionID, MsgType.EMAIL));
@@ -143,4 +155,46 @@ public class QuickfixjProducerTest {
Mockito.verify(mockSession).send(inboundFixMessage);
Mockito.verify(mockOutboundCamelMessage).setBody(outboundFixMessage);
}
+
+ @Test
+ public void processInOutExchangeSendUnsuccessful() throws Exception {
+
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+
Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).thenReturn(
+ new MessagePredicate(sessionID, MsgType.EMAIL));
+ Mockito.when(mockExchange.getProperty(
+ QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+ 1000L, Long.class)).thenReturn(5000L);
+
+ org.apache.camel.Message mockOutboundCamelMessage =
Mockito.mock(org.apache.camel.Message.class);
+
Mockito.when(mockExchange.getOut()).thenReturn(mockOutboundCamelMessage);
+
+ final Message outboundFixMessage = new Email();
+ outboundFixMessage.getHeader().setString(SenderCompID.FIELD, "TARGET");
+ outboundFixMessage.getHeader().setString(TargetCompID.FIELD, "SENDER");
+
+ Session mockSession =
Mockito.spy(TestSupport.createSession(sessionID));
+
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws
Throwable {
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+
quickfixjEngine.getMessageCorrelator().onEvent(QuickfixjEventCategory.AppMessageReceived,
sessionID, outboundFixMessage);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 10);
+ return false;
+ }
+ }).when(mockSession).send(Mockito.isA(Message.class));
+
+ producer.process(mockExchange);
+
+ Mockito.verify(mockOutboundCamelMessage,
Mockito.never()).setBody(Mockito.isA(Message.class));
+ Mockito.verify(mockSession).send(inboundFixMessage);
+
Mockito.verify(mockExchange).setException(Matchers.isA(CannotSendException.class));
+ }
}