This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 966be0eebd95c97599e3b05e8ad309c5e4217eeb
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Feb 22 09:56:22 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../component/quickfixj/QuickfixjEndpoint.java     |  13 +-
 .../quickfixj/converter/QuickfixjConverters.java   |  23 ++++
 .../component/quickfixj/QuickfixjConsumerTest.java | 147 ---------------------
 3 files changed, 32 insertions(+), 151 deletions(-)

diff --git 
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
 
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
index 7725b0d..0f45d55 100644
--- 
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
+++ 
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
@@ -133,10 +133,15 @@ public class QuickfixjEndpoint extends DefaultEndpoint 
implements QuickfixjEvent
         if (this.sessionID == null || isMatching(sessionID)) {
             for (QuickfixjConsumer consumer : consumers) {
                 Exchange exchange
-                        = QuickfixjConverters.toExchange(this, sessionID, 
message, eventCategory, getExchangePattern());
-                consumer.onExchange(exchange);
-                if (exchange.getException() != null) {
-                    throw exchange.getException();
+                        = QuickfixjConverters.toExchange(consumer, sessionID, 
message, eventCategory, getExchangePattern());
+                try {
+                    consumer.onExchange(exchange);
+                    Exception cause = exchange.getException();
+                    if (cause != null) {
+                        throw cause;
+                    }
+                } finally {
+                    consumer.releaseExchange(exchange, false);
                 }
             }
         }
diff --git 
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
 
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
index 426efb4..65304ee 100644
--- 
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
+++ 
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.Converter;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -143,4 +144,26 @@ public final class QuickfixjConverters {
         return exchange;
     }
 
+    public static Exchange toExchange(
+            Consumer consumer, SessionID sessionID, Message message, 
QuickfixjEventCategory eventCategory,
+            ExchangePattern exchangePattern) {
+        Exchange exchange = consumer.createExchange(false);
+        exchange.setPattern(exchangePattern);
+
+        org.apache.camel.Message camelMessage = exchange.getIn();
+        camelMessage.setHeader(EVENT_CATEGORY_KEY, eventCategory);
+        camelMessage.setHeader(SESSION_ID_KEY, sessionID);
+
+        if (message != null) {
+            try {
+                camelMessage.setHeader(MESSAGE_TYPE_KEY, 
message.getHeader().getString(MsgType.FIELD));
+            } catch (FieldNotFound e) {
+                LOG.warn("Message type field not found in QFJ message: {}, 
continuing...", message);
+            }
+        }
+        camelMessage.setBody(message);
+
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
 
b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
deleted file mode 100644
index 4e9360a..0000000
--- 
a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.quickfixj;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.hamcrest.CoreMatchers;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-import quickfix.FixVersions;
-import quickfix.Message;
-import quickfix.Session;
-import quickfix.SessionID;
-import quickfix.field.BeginString;
-import quickfix.field.SenderCompID;
-import quickfix.field.TargetCompID;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.isA;
-
-public class QuickfixjConsumerTest {
-    private Exchange mockExchange;
-    private Processor mockProcessor;
-    private QuickfixjEndpoint mockEndpoint;
-    private Message inboundFixMessage;
-
-    @BeforeEach
-    public void setUp() {
-
-        mockExchange = Mockito.mock(Exchange.class);
-        org.apache.camel.Message mockCamelMessage = 
Mockito.mock(org.apache.camel.Message.class);
-        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
-
-        inboundFixMessage = new Message();
-        inboundFixMessage.getHeader().setString(BeginString.FIELD, 
FixVersions.BEGINSTRING_FIX44);
-        inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
-        inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
-        
Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage);
-
-        mockProcessor = Mockito.mock(Processor.class);
-        mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
-        
Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
-    }
-
-    @Test
-    public void processExchangeOnlyWhenStarted() throws Exception {
-        QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, 
mockProcessor);
-
-        assertThat("Consumer should not be automatically started",
-                consumer.isStarted(), CoreMatchers.is(false));
-
-        consumer.onExchange(mockExchange);
-
-        // No expected interaction with processor since component is not 
started
-        Mockito.verifyNoInteractions(mockProcessor);
-
-        consumer.start();
-        Mockito.verify(mockEndpoint).ensureInitialized();
-        assertThat(consumer.isStarted(), CoreMatchers.is(true));
-
-        consumer.onExchange(mockExchange);
-
-        // Second message should be processed
-        Mockito.verify(mockProcessor).process(isA(Exchange.class));
-    }
-
-    @Test
-    public void setExceptionOnExchange() throws Exception {
-        QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, 
mockProcessor);
-        consumer.start();
-
-        Throwable exception = new Exception("Throwable for test");
-        Mockito.doThrow(exception).when(mockProcessor).process(mockExchange);
-
-        // Simulate a message from the FIX engine
-        consumer.onExchange(mockExchange);
-
-        Mockito.verify(mockExchange).setException(exception);
-    }
-
-    @Test
-    public void setExceptionOnInOutExchange() throws Exception {
-        org.apache.camel.Message mockCamelOutMessage = 
Mockito.mock(org.apache.camel.Message.class);
-        org.apache.camel.Message mockCamelInMessage = 
Mockito.mock(org.apache.camel.Message.class);
-        SessionID mockSessionId = Mockito.mock(SessionID.class);
-
-        QuickfixjConsumer consumer = Mockito.spy(new 
QuickfixjConsumer(mockEndpoint, mockProcessor));
-        Mockito.doReturn(null).when(consumer).getSession(mockSessionId);
-
-        
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
-        Mockito.when(mockExchange.hasOut()).thenReturn(true);
-        
Mockito.when(mockExchange.getMessage()).thenReturn(mockCamelOutMessage);
-        Message outboundFixMessage = new Message();
-        
Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
-        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelInMessage);
-        Mockito.when(mockCamelInMessage.getHeader("SessionID", 
SessionID.class)).thenReturn(mockSessionId);
-
-        consumer.start();
-
-        // Simulate a message from the FIX engine
-        consumer.onExchange(mockExchange);
-
-        
Mockito.verify(mockExchange).setException(isA(IllegalStateException.class));
-    }
-
-    @Test
-    public void processInOutExchange() throws Exception {
-        org.apache.camel.Message mockCamelOutMessage = 
Mockito.mock(org.apache.camel.Message.class);
-        org.apache.camel.Message mockCamelInMessage = 
Mockito.mock(org.apache.camel.Message.class);
-        SessionID mockSessionId = Mockito.mock(SessionID.class);
-        Session mockSession = Mockito.mock(Session.class);
-
-        QuickfixjConsumer consumer = Mockito.spy(new 
QuickfixjConsumer(mockEndpoint, mockProcessor));
-        Mockito.doReturn(mockSession).when(consumer).getSession(mockSessionId);
-        Mockito.doReturn(true).when(mockSession).send(isA(Message.class));
-
-        
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
-        Mockito.when(mockExchange.hasOut()).thenReturn(true);
-        
Mockito.when(mockExchange.getMessage()).thenReturn(mockCamelOutMessage);
-        Message outboundFixMessage = new Message();
-        
Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
-        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelInMessage);
-        Mockito.when(mockCamelInMessage.getHeader("SessionID", 
SessionID.class)).thenReturn(mockSessionId);
-
-        consumer.start();
-
-        consumer.onExchange(mockExchange);
-        Mockito.verify(mockExchange, 
Mockito.never()).setException(isA(Exception.class));
-        Mockito.verify(mockSession).send(outboundFixMessage);
-    }
-}

Reply via email to