This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 966be0eebd95c97599e3b05e8ad309c5e4217eeb Author: Claus Ibsen <[email protected]> AuthorDate: Mon Feb 22 09:56:22 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../component/quickfixj/QuickfixjEndpoint.java | 13 +- .../quickfixj/converter/QuickfixjConverters.java | 23 ++++ .../component/quickfixj/QuickfixjConsumerTest.java | 147 --------------------- 3 files changed, 32 insertions(+), 151 deletions(-) diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java index 7725b0d..0f45d55 100644 --- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java +++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java @@ -133,10 +133,15 @@ public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEvent if (this.sessionID == null || isMatching(sessionID)) { for (QuickfixjConsumer consumer : consumers) { Exchange exchange - = QuickfixjConverters.toExchange(this, sessionID, message, eventCategory, getExchangePattern()); - consumer.onExchange(exchange); - if (exchange.getException() != null) { - throw exchange.getException(); + = QuickfixjConverters.toExchange(consumer, sessionID, message, eventCategory, getExchangePattern()); + try { + consumer.onExchange(exchange); + Exception cause = exchange.getException(); + if (cause != null) { + throw cause; + } + } finally { + consumer.releaseExchange(exchange, false); } } } diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java index 426efb4..65304ee 100644 --- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java +++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/converter/QuickfixjConverters.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.UnsupportedEncodingException; +import org.apache.camel.Consumer; import org.apache.camel.Converter; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -143,4 +144,26 @@ public final class QuickfixjConverters { return exchange; } + public static Exchange toExchange( + Consumer consumer, SessionID sessionID, Message message, QuickfixjEventCategory eventCategory, + ExchangePattern exchangePattern) { + Exchange exchange = consumer.createExchange(false); + exchange.setPattern(exchangePattern); + + org.apache.camel.Message camelMessage = exchange.getIn(); + camelMessage.setHeader(EVENT_CATEGORY_KEY, eventCategory); + camelMessage.setHeader(SESSION_ID_KEY, sessionID); + + if (message != null) { + try { + camelMessage.setHeader(MESSAGE_TYPE_KEY, message.getHeader().getString(MsgType.FIELD)); + } catch (FieldNotFound e) { + LOG.warn("Message type field not found in QFJ message: {}, continuing...", message); + } + } + camelMessage.setBody(message); + + return exchange; + } + } diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java deleted file mode 100644 index 4e9360a..0000000 --- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.quickfixj; - -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Processor; -import org.hamcrest.CoreMatchers; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import quickfix.FixVersions; -import quickfix.Message; -import quickfix.Session; -import quickfix.SessionID; -import quickfix.field.BeginString; -import quickfix.field.SenderCompID; -import quickfix.field.TargetCompID; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.isA; - -public class QuickfixjConsumerTest { - private Exchange mockExchange; - private Processor mockProcessor; - private QuickfixjEndpoint mockEndpoint; - private Message inboundFixMessage; - - @BeforeEach - public void setUp() { - - mockExchange = Mockito.mock(Exchange.class); - org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class); - Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage); - - inboundFixMessage = new Message(); - inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44); - inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER"); - inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET"); - Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage); - - mockProcessor = Mockito.mock(Processor.class); - mockEndpoint = Mockito.mock(QuickfixjEndpoint.class); - Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange); - } - - @Test - public void processExchangeOnlyWhenStarted() throws Exception { - QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor); - - assertThat("Consumer should not be automatically started", - consumer.isStarted(), CoreMatchers.is(false)); - - consumer.onExchange(mockExchange); - - // No expected interaction with processor since component is not started - Mockito.verifyNoInteractions(mockProcessor); - - consumer.start(); - Mockito.verify(mockEndpoint).ensureInitialized(); - assertThat(consumer.isStarted(), CoreMatchers.is(true)); - - consumer.onExchange(mockExchange); - - // Second message should be processed - Mockito.verify(mockProcessor).process(isA(Exchange.class)); - } - - @Test - public void setExceptionOnExchange() throws Exception { - QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor); - consumer.start(); - - Throwable exception = new Exception("Throwable for test"); - Mockito.doThrow(exception).when(mockProcessor).process(mockExchange); - - // Simulate a message from the FIX engine - consumer.onExchange(mockExchange); - - Mockito.verify(mockExchange).setException(exception); - } - - @Test - public void setExceptionOnInOutExchange() throws Exception { - org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class); - org.apache.camel.Message mockCamelInMessage = Mockito.mock(org.apache.camel.Message.class); - SessionID mockSessionId = Mockito.mock(SessionID.class); - - QuickfixjConsumer consumer = Mockito.spy(new QuickfixjConsumer(mockEndpoint, mockProcessor)); - Mockito.doReturn(null).when(consumer).getSession(mockSessionId); - - Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut); - Mockito.when(mockExchange.hasOut()).thenReturn(true); - Mockito.when(mockExchange.getMessage()).thenReturn(mockCamelOutMessage); - Message outboundFixMessage = new Message(); - Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage); - Mockito.when(mockExchange.getIn()).thenReturn(mockCamelInMessage); - Mockito.when(mockCamelInMessage.getHeader("SessionID", SessionID.class)).thenReturn(mockSessionId); - - consumer.start(); - - // Simulate a message from the FIX engine - consumer.onExchange(mockExchange); - - Mockito.verify(mockExchange).setException(isA(IllegalStateException.class)); - } - - @Test - public void processInOutExchange() throws Exception { - org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class); - org.apache.camel.Message mockCamelInMessage = Mockito.mock(org.apache.camel.Message.class); - SessionID mockSessionId = Mockito.mock(SessionID.class); - Session mockSession = Mockito.mock(Session.class); - - QuickfixjConsumer consumer = Mockito.spy(new QuickfixjConsumer(mockEndpoint, mockProcessor)); - Mockito.doReturn(mockSession).when(consumer).getSession(mockSessionId); - Mockito.doReturn(true).when(mockSession).send(isA(Message.class)); - - Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut); - Mockito.when(mockExchange.hasOut()).thenReturn(true); - Mockito.when(mockExchange.getMessage()).thenReturn(mockCamelOutMessage); - Message outboundFixMessage = new Message(); - Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage); - Mockito.when(mockExchange.getIn()).thenReturn(mockCamelInMessage); - Mockito.when(mockCamelInMessage.getHeader("SessionID", SessionID.class)).thenReturn(mockSessionId); - - consumer.start(); - - consumer.onExchange(mockExchange); - Mockito.verify(mockExchange, Mockito.never()).setException(isA(Exception.class)); - Mockito.verify(mockSession).send(outboundFixMessage); - } -}
