Author: davsclaus
Date: Thu May 24 05:14:28 2012
New Revision: 1342146

URL: http://svn.apache.org/viewvc?rev=1342146&view=rev
Log:
CAMEL-5247: Send failures with quickfix now causes exception to be thrown. 
Thanks to Marek Strejczek for the patch.

Added:
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
Modified:
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java

Added: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java?rev=1342146&view=auto
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
 (added)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/CannotSendException.java
 Thu May 24 05:14:28 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import org.quickfixj.QFJException;
+
+public class CannotSendException extends QFJException {
+
+    private static final long serialVersionUID = 6424768414275772380L;
+
+    public CannotSendException(final String message) {
+        super(message);
+    }
+
+    public CannotSendException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+}

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1342146&r1=1342145&r2=1342146&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
 Thu May 24 05:14:28 2012
@@ -21,7 +21,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-
+import org.quickfixj.QFJException;
 import quickfix.MessageUtils;
 import quickfix.Session;
 import quickfix.SessionID;
@@ -45,25 +45,23 @@ public class QuickfixjConsumer extends D
         }
     }
 
-    private void sendOutMessage(Exchange exchange) {
-        try {
-            Message camelMessage = exchange.getOut();
-            quickfix.Message quickfixjMessage = 
camelMessage.getBody(quickfix.Message.class);
-     
-            if (log.isDebugEnabled()) {
-                log.debug("Sending FIX message reply: " + 
quickfixjMessage.toString());
-            }
-            
-            SessionID messageSessionID = 
MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
-            
-            Session session = getSession(messageSessionID);
-            if (session == null) {
-                throw new IllegalStateException("Unknown session: " + 
messageSessionID);
-            }
-            
-            session.send(quickfixjMessage);
-        } catch (Exception e) {
-            exchange.setException(e);
+    private void sendOutMessage(Exchange exchange) throws QFJException {
+        Message camelMessage = exchange.getOut();
+        quickfix.Message quickfixjMessage = 
camelMessage.getBody(quickfix.Message.class);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Sending FIX message reply: " + 
quickfixjMessage.toString());
+        }
+
+        SessionID messageSessionID = 
MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
+
+        Session session = getSession(messageSessionID);
+        if (session == null) {
+            throw new IllegalStateException("Unknown session: " + 
messageSessionID);
+        }
+
+        if (!session.send(quickfixjMessage)) {
+            throw new CannotSendException("Could not send FIX message reply: " 
+ quickfixjMessage.toString());
         }
     }
 

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1342146&r1=1342145&r2=1342146&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
 Thu May 24 05:14:28 2012
@@ -39,43 +39,45 @@ public class QuickfixjProducer extends D
     }
 
     public void process(Exchange exchange) throws Exception {
-        sendMessage(exchange, exchange.getIn());
-    }
-
-    void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage) 
throws InterruptedException {
         try {
-            Message message = camelMessage.getBody(Message.class);
-            log.debug("Sending FIX message: {}", message);
-
-            SessionID messageSessionID = sessionID;
-            if (messageSessionID == null) {
-                messageSessionID = MessageUtils.getSessionID(message);
-            }
-
-            Session session = getSession(messageSessionID);
-            if (session == null) {
-                throw new IllegalStateException("Unknown session: " + 
messageSessionID);
-            }
-
-            Callable<Message> callable = null;
-
-            if (exchange.getPattern().isOutCapable()) {
-                QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
-                MessageCorrelator messageCorrelator = 
endpoint.getEngine().getMessageCorrelator();
-                callable = messageCorrelator.getReply(endpoint.getSessionID(), 
exchange);
-            }
-
-            session.send(message);
-
-            if (callable != null) {
-                Message reply = callable.call();
-                exchange.getOut().setBody(reply);
-            }
+            sendMessage(exchange, exchange.getIn());
         } catch (Exception e) {
             exchange.setException(e);
         }
     }
 
+    void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage) 
throws Exception {
+        Message message = camelMessage.getBody(Message.class);
+        log.debug("Sending FIX message: {}", message);
+
+        SessionID messageSessionID = sessionID;
+        if (messageSessionID == null) {
+            messageSessionID = MessageUtils.getSessionID(message);
+        }
+
+        Session session = getSession(messageSessionID);
+        if (session == null) {
+            throw new IllegalStateException("Unknown session: " + 
messageSessionID);
+        }
+
+        Callable<Message> callable = null;
+
+        if (exchange.getPattern().isOutCapable()) {
+            QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
+            MessageCorrelator messageCorrelator = 
endpoint.getEngine().getMessageCorrelator();
+            callable = messageCorrelator.getReply(endpoint.getSessionID(), 
exchange);
+        }
+
+        if (!session.send(message)) {
+            throw new CannotSendException("Cannot send FIX message: " + 
message.toString());
+        }
+
+        if (callable != null) {
+            Message reply = callable.call();
+            exchange.getOut().setBody(reply);
+        }
+    }
+
     Session getSession(SessionID messageSessionID) {
         return Session.lookupSession(messageSessionID);
     }

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1342146&r1=1342145&r2=1342146&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
 Thu May 24 05:14:28 2012
@@ -86,12 +86,12 @@ public class QuickfixjProducerTest {
         
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
         Mockito.doThrow(new 
TestException()).when(mockSession).send(Mockito.isA(Message.class));
 
-        producer.process(mockExchange);    
+        producer.process(mockExchange);
         
Mockito.verify(mockExchange).setException(Matchers.isA(TestException.class));
     }
     
     @Test
-    public void processInOnlyExchange() throws Exception {        
+    public void processInOnlyExchangeSuccess() throws Exception {
         Session mockSession = 
Mockito.spy(TestSupport.createSession(sessionID));
         
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
         
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
@@ -101,9 +101,21 @@ public class QuickfixjProducerTest {
         Mockito.verify(mockExchange, 
Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
         Mockito.verify(mockSession).send(inboundFixMessage);
     }
+    
+    @Test
+    public void processInOnlyExchangeSendUnsuccessful() throws Exception {
+        Session mockSession = 
Mockito.spy(TestSupport.createSession(sessionID));
+        
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+        
Mockito.doReturn(false).when(mockSession).send(Mockito.isA(Message.class));
+
+        producer.process(mockExchange);
+        
+        Mockito.verify(mockSession).send(inboundFixMessage);
+        
Mockito.verify(mockExchange).setException(Matchers.isA(CannotSendException.class));
+    }    
 
     @Test
-    public void processInOutExchange() throws Exception {
+    public void processInOutExchangeSuccess() throws Exception {
         
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
         
Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).thenReturn(
             new MessagePredicate(sessionID, MsgType.EMAIL));
@@ -143,4 +155,46 @@ public class QuickfixjProducerTest {
         Mockito.verify(mockSession).send(inboundFixMessage);
         Mockito.verify(mockOutboundCamelMessage).setBody(outboundFixMessage);
     }
+    
+    @Test
+    public void processInOutExchangeSendUnsuccessful() throws Exception {
+        
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+        
Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).thenReturn(
+            new MessagePredicate(sessionID, MsgType.EMAIL));
+        Mockito.when(mockExchange.getProperty(
+            QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+            1000L, Long.class)).thenReturn(5000L);
+                
+        org.apache.camel.Message mockOutboundCamelMessage = 
Mockito.mock(org.apache.camel.Message.class);
+        
Mockito.when(mockExchange.getOut()).thenReturn(mockOutboundCamelMessage);
+                
+        final Message outboundFixMessage = new Email();
+        outboundFixMessage.getHeader().setString(SenderCompID.FIELD, "TARGET");
+        outboundFixMessage.getHeader().setString(TargetCompID.FIELD, "SENDER");
+        
+        Session mockSession = 
Mockito.spy(TestSupport.createSession(sessionID));
+        
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+        Mockito.doAnswer(new Answer<Boolean>() {
+            @Override
+            public Boolean answer(InvocationOnMock invocation) throws 
Throwable {
+                new Timer().schedule(new TimerTask() {                
+                    @Override
+                    public void run() {
+                        try {
+                            
quickfixjEngine.getMessageCorrelator().onEvent(QuickfixjEventCategory.AppMessageReceived,
 sessionID, outboundFixMessage);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }, 10);
+                return false;
+            }            
+        }).when(mockSession).send(Mockito.isA(Message.class));
+
+        producer.process(mockExchange);
+        
+        Mockito.verify(mockOutboundCamelMessage, 
Mockito.never()).setBody(Mockito.isA(Message.class));
+        Mockito.verify(mockSession).send(inboundFixMessage);
+        
Mockito.verify(mockExchange).setException(Matchers.isA(CannotSendException.class));
+    }    
 }


Reply via email to