This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8a4ffef Added support for using an external JMS broker
new 1abbc79 Merge pull request #83 from
orpiske/add-support-for-remote-jms-broker
8a4ffef is described below
commit 8a4ffef5fa28a96025cd9de2c4aed3d1d3177c58
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Jan 29 18:09:03 2020 +0100
Added support for using an external JMS broker
---
.../apache/camel/kafkaconnector/PropertyUtils.java | 7 +-
.../kafkaconnector/clients/jms/JMSClient.java | 79 ++++++----------------
.../{ArtemisService.java => ArtemisContainer.java} | 27 ++++++--
.../services/jms/ContainerLocalService.java | 60 ++++++++++++++++
.../jms/{JMSService.java => JMSContainer.java} | 28 ++++++--
.../kafkaconnector/services/jms/JMSService.java | 35 ++++++++--
.../services/jms/JMSServiceFactory.java | 10 ++-
...rvice.java => QpidDispatchRouterContainer.java} | 22 +++++-
.../jms/{JMSService.java => RemoteJMSService.java} | 38 ++++++++---
.../sink/jms/CamelSinkJMSITCase.java | 8 +--
.../source/jms/CamelSourceJMSITCase.java | 62 +++++++++--------
11 files changed, 251 insertions(+), 125 deletions(-)
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java
index a18971c..4081b78 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/PropertyUtils.java
@@ -29,11 +29,16 @@ import org.slf4j.LoggerFactory;
public final class PropertyUtils {
private static final Logger LOG =
LoggerFactory.getLogger(PropertyUtils.class);
+ private static Properties properties = new Properties();
private PropertyUtils() {
}
+ public static Properties getProperties() {
+ return properties;
+ }
+
public static void load() {
String fileName = System.getProperty("test.properties");
@@ -44,8 +49,6 @@ public final class PropertyUtils {
}
try (InputStream stream = new FileInputStream(fileName)) {
- Properties properties = new Properties();
-
properties.load(stream);
System.getProperties().putAll(properties);
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
index 2a24264..4949126 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/jms/JMSClient.java
@@ -17,7 +17,6 @@
package org.apache.camel.kafkaconnector.clients.jms;
-import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -29,9 +28,9 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
import javax.jms.Session;
+import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,21 +40,27 @@ import org.slf4j.LoggerFactory;
public class JMSClient {
private static final Logger LOG = LoggerFactory.getLogger(JMSClient.class);
- private final String url;
private Connection connection;
private Session session;
- private final Function<String, ? extends ConnectionFactory>
connectionFactory;
- private final Function<String, ? extends Queue> destinationFactory;
+ private ConnectionFactory factory;
public JMSClient(Function<String, ? extends ConnectionFactory>
connectionFactory,
- Function<String, ? extends Queue> destinationFactory,
String url) {
- this.connectionFactory = connectionFactory;
- this.destinationFactory = destinationFactory;
- this.url = url;
+ factory = connectionFactory.apply(url);
}
+ public JMSClient(String className, String url) {
+ Class<? extends ConnectionFactory> clazz;
+ try {
+ clazz = (Class<? extends ConnectionFactory>)
Class.forName(className);
+
+ factory = clazz.getConstructor(String.class).newInstance(url);
+ } catch (Exception e) {
+ LOG.error("Unable to create the JMS client classL {}",
e.getMessage(), e);
+ Assertions.fail(e);
+ }
+ }
@SuppressWarnings("UnusedReturnValue")
public static Throwable capturingClose(MessageProducer closeable) {
@@ -113,8 +118,6 @@ public class JMSClient {
LOG.debug("Starting the JMS client");
try {
- final ConnectionFactory factory = connectionFactory.apply(url);
-
LOG.debug("Creating the connection");
connection = factory.createConnection();
LOG.debug("Connection created successfully");
@@ -146,7 +149,14 @@ public class JMSClient {
}
private Destination createDestination(final String destinationName) {
- return destinationFactory.apply(destinationName);
+ try {
+ return session.createQueue(destinationName);
+ } catch (JMSException e) {
+ Assertions.fail(e.getMessage());
+
+ // unreachable
+ return null;
+ }
}
@@ -225,49 +235,4 @@ public class JMSClient {
capturingClose(producer);
}
}
-
- public static JMSClient createClient(String url) {
- String jmsInstanceType =
System.getProperty("jms-service.instance.type");
-
- if (jmsInstanceType == null ||
jmsInstanceType.equals("local-dispatch-router-container")) {
- return new JMSClient(org.apache.qpid.jms.JmsConnectionFactory::new,
- org.apache.qpid.jms.JmsQueue::new, url);
- }
-
- if (jmsInstanceType.equals("local-artemis-container")) {
- return new JMSClient(
- org.apache.activemq.ActiveMQConnectionFactory::new,
- org.apache.activemq.command.ActiveMQQueue::new,
- url);
- }
-
- LOG.error("Invalid JMS instance type: {}. Must be one of
'local-artemis-container' or 'local-dispatch-router-container",
- jmsInstanceType);
- throw new UnsupportedOperationException("Invalid JMS instance type:");
- }
-
- public static Properties getConnectionProperties(String url) {
- Properties properties = new Properties();
-
- String jmsInstanceType =
System.getProperty("jms-service.instance.type");
-
- if (jmsInstanceType == null ||
jmsInstanceType.equals("local-dispatch-router-container")) {
- properties.put("camel.component.sjms2.connection-factory",
"#class:org.apache.qpid.jms.JmsConnectionFactory");
-
properties.put("camel.component.sjms2.connection-factory.remoteURI", url);
-
- return properties;
- }
-
- if (jmsInstanceType.equals("local-artemis-container")) {
- properties.put("camel.component.sjms2.connection-factory",
"#class:org.apache.activemq.ActiveMQConnectionFactory");
-
properties.put("camel.component.sjms2.connection-factory.brokerURL", url);
-
- return properties;
- }
-
- LOG.error("Invalid JMS instance type: {}. Must be one of
'local-artemis-container' or 'local-dispatch-router-container",
- jmsInstanceType);
- throw new UnsupportedOperationException("Invalid JMS instance type:");
- }
-
}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java
similarity index 85%
rename from
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java
rename to
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java
index a1e828f..e9ee3a7 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisService.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ArtemisContainer.java
@@ -17,21 +17,20 @@
package org.apache.camel.kafkaconnector.services.jms;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;
-/**
- * A specialized container that can be used to create Apache Artemis broker
- * instances.
- */
-public class ArtemisService extends JMSService {
+public class ArtemisContainer extends JMSContainer {
private static final int DEFAULT_MQTT_PORT = 1883;
private static final int DEFAULT_AMQP_PORT = 5672;
private static final int DEFAULT_ADMIN_PORT = 8161;
private static final int DEFAULT_ACCEPTOR_PORT = 61616;
- public ArtemisService() {
+ public ArtemisContainer() {
super(new ImageFromDockerfile("apache-artemis:ckc", false)
.withFileFromClasspath("Dockerfile",
"org/apache/camel/kafkaconnector/services/jms/artemis/Dockerfile"));
@@ -115,7 +114,6 @@ public class ArtemisService extends JMSService {
}
-
/**
* Gets the port number used for exchanging messages using the Openwire
protocol
* @return the port number
@@ -132,4 +130,19 @@ public class ArtemisService extends JMSService {
public String getOpenwireEndpoint() {
return String.format("tcp://localhost:%d", getOpenwirePort());
}
+
+ @Override
+ public Properties getConnectionProperties() {
+ Properties properties = new Properties();
+
+ properties.put("camel.component.sjms2.connection-factory",
"#class:org.apache.activemq.ActiveMQConnectionFactory");
+ properties.put("camel.component.sjms2.connection-factory.brokerURL",
getDefaultEndpoint());
+
+ return properties;
+ }
+
+ @Override
+ public JMSClient getClient() {
+ return new
JMSClient(org.apache.activemq.ActiveMQConnectionFactory::new,
getDefaultEndpoint());
+ }
}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java
new file mode 100644
index 0000000..93e81c8
--- /dev/null
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/ContainerLocalService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.jms;
+
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A specialized container that can be used to create Apache Artemis broker
+ * instances.
+ */
+public class ContainerLocalService implements JMSService {
+ private static final Logger LOG =
LoggerFactory.getLogger(ContainerLocalService.class);
+
+ private final JMSContainer container;
+
+ public ContainerLocalService(JMSContainer container) {
+ this.container = container;
+
+ container.start();
+ }
+
+ @Override
+ public Properties getConnectionProperties() {
+ return container.getConnectionProperties();
+ }
+
+ @Override
+ public JMSClient getClient() {
+ return container.getClient();
+ }
+
+ @Override
+ public String getDefaultEndpoint() {
+ return container.getDefaultEndpoint();
+ }
+
+ @Override
+ public void initialize() {
+ LOG.info("JMS broker running at address {}",
container.getDefaultEndpoint());
+ }
+}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java
similarity index 58%
copy from
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
copy to
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java
index 7480d28..9366373 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSContainer.java
@@ -17,19 +17,35 @@
package org.apache.camel.kafkaconnector.services.jms;
-import java.util.concurrent.Future;
+import java.util.Properties;
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
-public abstract class JMSService extends GenericContainer {
+public abstract class JMSContainer extends GenericContainer {
- public JMSService(Future<String> image) {
- super(image);
+
+ public JMSContainer(ImageFromDockerfile dockerfile) {
+ super(dockerfile);
}
/**
- * Gets the default endpoint for the JMS service (ie.: amqp://host:port,
or tcp://host:port, etc)
- * @return the endpoint URL as a string in the specific format used by the
service
+ * Gets the connection properties for accessing the service
+ * @return
+ */
+ public abstract Properties getConnectionProperties();
+
+
+ /**
+ * Get a client that can access the container
+ * @return
+ */
+ public abstract JMSClient getClient();
+
+ /**
+ * Gets the end point URL used exchanging messages through the default
acceptor port
+ * @return the end point URL as a string
*/
public abstract String getDefaultEndpoint();
}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
index 7480d28..99a5abd 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
@@ -17,19 +17,40 @@
package org.apache.camel.kafkaconnector.services.jms;
-import java.util.concurrent.Future;
+import java.util.Properties;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
-public abstract class JMSService extends GenericContainer {
+public interface JMSService extends BeforeAllCallback {
- public JMSService(Future<String> image) {
- super(image);
- }
+ /**
+ * Gets the connection properties for accessing the service
+ * @return
+ */
+ Properties getConnectionProperties();
+
+ /**
+ * Get the appropriate client for the service
+ * @return
+ */
+ JMSClient getClient();
/**
* Gets the default endpoint for the JMS service (ie.: amqp://host:port,
or tcp://host:port, etc)
* @return the endpoint URL as a string in the specific format used by the
service
*/
- public abstract String getDefaultEndpoint();
+ String getDefaultEndpoint();
+
+ /**
+ * Perform any initialization necessary
+ */
+ void initialize();
+
+
+ @Override
+ default void beforeAll(ExtensionContext extensionContext) throws Exception
{
+ initialize();
+ }
}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
index aa18498..0855010 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSServiceFactory.java
@@ -30,14 +30,18 @@ public final class JMSServiceFactory {
String jmsInstanceType =
System.getProperty("jms-service.instance.type");
if (jmsInstanceType == null ||
jmsInstanceType.equals("local-dispatch-router-container")) {
- return new QpidDispatchRouterService();
+ return new ContainerLocalService(new
QpidDispatchRouterContainer());
}
if (jmsInstanceType.equals("local-artemis-container")) {
- return new ArtemisService();
+ return new ContainerLocalService(new ArtemisContainer());
}
- LOG.error("Invalid JMS instance type: {}. Must be one of
'local-artemis-container' or 'local-dispatch-router-container",
+ if (jmsInstanceType.equals("remote")) {
+ return new RemoteJMSService();
+ }
+
+ LOG.error("Invalid JMS instance type: {}. Must be one of 'remote',
'local-artemis-container' or 'local-dispatch-router-container",
jmsInstanceType);
throw new UnsupportedOperationException("Invalid JMS instance type:");
}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java
similarity index 73%
rename from
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java
rename to
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java
index 5eaebdb..1d93d21 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterService.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/QpidDispatchRouterContainer.java
@@ -17,14 +17,17 @@
package org.apache.camel.kafkaconnector.services.jms;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;
-public class QpidDispatchRouterService extends JMSService {
+public class QpidDispatchRouterContainer extends JMSContainer {
private static final int DEFAULT_AMQP_PORT = 5672;
- public QpidDispatchRouterService() {
+ public QpidDispatchRouterContainer() {
super(new ImageFromDockerfile("qpid-dispatch:ckc", false)
.withFileFromClasspath("Dockerfile",
"org/apache/camel/kafkaconnector/services/jms/qpid-dispatch-router/Dockerfile"));
@@ -56,4 +59,19 @@ public class QpidDispatchRouterService extends JMSService {
public String getDefaultEndpoint() {
return getAMQPEndpoint();
}
+
+ @Override
+ public Properties getConnectionProperties() {
+ Properties properties = new Properties();
+
+ properties.put("camel.component.sjms2.connection-factory",
"#class:org.apache.qpid.jms.JmsConnectionFactory");
+ properties.put("camel.component.sjms2.connection-factory.remoteURI",
getDefaultEndpoint());
+
+ return properties;
+ }
+
+ @Override
+ public JMSClient getClient() {
+ return new JMSClient(org.apache.qpid.jms.JmsConnectionFactory::new,
getDefaultEndpoint());
+ }
}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java
similarity index 51%
copy from
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
copy to
tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java
index 7480d28..c0f88f0 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/JMSService.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/services/jms/RemoteJMSService.java
@@ -17,19 +17,37 @@
package org.apache.camel.kafkaconnector.services.jms;
-import java.util.concurrent.Future;
+import java.util.Properties;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.camel.kafkaconnector.PropertyUtils;
+import org.apache.camel.kafkaconnector.clients.jms.JMSClient;
-public abstract class JMSService extends GenericContainer {
+public class RemoteJMSService implements JMSService {
- public JMSService(Future<String> image) {
- super(image);
+
+ @Override
+ public void initialize() {
+ // NO-OP
+ }
+
+ @Override
+ public Properties getConnectionProperties() {
+ return PropertyUtils.getProperties();
+ }
+
+ @Override
+ public String getDefaultEndpoint() {
+ return System.getProperty("jms.broker.address");
}
- /**
- * Gets the default endpoint for the JMS service (ie.: amqp://host:port,
or tcp://host:port, etc)
- * @return the endpoint URL as a string in the specific format used by the
service
- */
- public abstract String getDefaultEndpoint();
+ @Override
+ public JMSClient getClient() {
+ String tmpConnectionFactory =
System.getProperty("camel.component.sjms2.connection-factory");
+
+ String connectionFactory = tmpConnectionFactory.replace("#class:", "");
+
+ return new JMSClient(connectionFactory, getDefaultEndpoint());
+
+
+ }
}
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
index 899349c..f14c801 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/jms/CamelSinkJMSITCase.java
@@ -37,9 +37,9 @@ import
org.apache.camel.kafkaconnector.services.jms.JMSServiceFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import static org.junit.Assert.fail;
@@ -53,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class CamelSinkJMSITCase extends AbstractKafkaTest {
private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkJMSITCase.class);
- @Container
+ @RegisterExtension
public JMSService jmsService = JMSServiceFactory.createService();
private int received;
@@ -89,7 +89,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
@Timeout(90)
public void testBasicSendReceive() {
try {
- Properties connectionProperties =
JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint());
+ Properties connectionProperties =
jmsService.getConnectionProperties();
ConnectorPropertyFactory testProperties = new
CamelJMSPropertyFactory(1,
TestCommon.getDefaultTestTopic(this.getClass()),
@@ -128,7 +128,7 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
JMSClient jmsClient = null;
try {
- jmsClient =
JMSClient.createClient(jmsService.getDefaultEndpoint());
+ jmsClient = jmsService.getClient();
jmsClient.start();
diff --git
a/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
b/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
index 6e76879..91e4f39 100644
---
a/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
+++
b/tests/src/test/java/org/apache/camel/kafkaconnector/source/jms/CamelSourceJMSITCase.java
@@ -19,6 +19,8 @@ package org.apache.camel.kafkaconnector.source.jms;
import java.util.Properties;
+import javax.jms.JMSException;
+
import org.apache.camel.kafkaconnector.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.TestCommon;
@@ -27,12 +29,11 @@ import
org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.services.jms.JMSService;
import org.apache.camel.kafkaconnector.services.jms.JMSServiceFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,17 +48,12 @@ import static org.junit.jupiter.api.Assertions.fail;
public class CamelSourceJMSITCase extends AbstractKafkaTest {
private static final Logger LOG =
LoggerFactory.getLogger(CamelSourceJMSITCase.class);
- @Container
+ @RegisterExtension
public JMSService jmsService = JMSServiceFactory.createService();
private int received;
private final int expect = 10;
- @BeforeEach
- public void setUp() {
- LOG.info("JMS service running at {}", jmsService.getDefaultEndpoint());
- }
-
private boolean checkRecord(ConsumerRecord<String, String> record) {
LOG.debug("Received: {}", record.value());
received++;
@@ -69,24 +65,39 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest
{
return true;
}
+ private void produceMessages(String queue, String baseText) {
+ JMSClient jmsProducer = null;
+
+ try {
+ jmsProducer = jmsService.getClient();
+
+ jmsProducer.start();
+ for (int i = 0; i < expect; i++) {
+ jmsProducer.send(queue, baseText + " " + i);
+ }
+ } catch (JMSException e) {
+ LOG.error("JMS exception trying to send messages to the queue:
{}", e.getMessage(), e);
+ fail(e.getMessage());
+ } catch (Exception e) {
+ LOG.error("Failed to send messages to the queue: {}",
e.getMessage(), e);
+ fail(e.getMessage());
+ } finally {
+ jmsProducer.stop();
+ }
+ }
+
@Test
@Timeout(90)
public void testBasicSendReceive() {
try {
- Properties connectionProperties =
JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint());
+ Properties connectionProperties =
jmsService.getConnectionProperties();
ConnectorPropertyFactory testProperties = new
CamelJMSPropertyFactory(1,
TestCommon.getDefaultTestTopic(this.getClass()),
TestCommon.DEFAULT_JMS_QUEUE, connectionProperties);
getKafkaConnectService().initializeConnector(testProperties);
- JMSClient jmsProducer =
JMSClient.createClient(jmsService.getDefaultEndpoint());
-
- jmsProducer.start();
- for (int i = 0; i < expect; i++) {
- jmsProducer.send(TestCommon.DEFAULT_JMS_QUEUE, "Test message "
+ i);
- }
- jmsProducer.stop();
+ produceMessages(TestCommon.DEFAULT_JMS_QUEUE, "Test string
message");
LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
@@ -98,28 +109,25 @@ public class CamelSourceJMSITCase extends
AbstractKafkaTest {
LOG.error("JMS test failed: {}", e.getMessage(), e);
fail(e.getMessage());
}
-
}
+
+
@Test
@Timeout(90)
public void testIntSendReceive() {
try {
- Properties connectionProperties =
JMSClient.getConnectionProperties(jmsService.getDefaultEndpoint());
+ final String jmsQueueName = "testIntSendReceive";
+
+ Properties connectionProperties =
jmsService.getConnectionProperties();
ConnectorPropertyFactory testProperties = new
CamelJMSPropertyFactory(1,
- TestCommon.getDefaultTestTopic(this.getClass()) +
"testIntSendReceive",
- "testIntSendReceive", connectionProperties);
+ TestCommon.getDefaultTestTopic(this.getClass()) +
jmsQueueName,
+ jmsQueueName, connectionProperties);
getKafkaConnectService().initializeConnector(testProperties);
- JMSClient jmsProducer =
JMSClient.createClient(jmsService.getDefaultEndpoint());
-
- jmsProducer.start();
- for (int i = 0; i < expect; i++) {
- jmsProducer.send("testIntSendReceive", i);
- }
- jmsProducer.stop();
+ produceMessages(jmsQueueName, "Test string message");
LOG.debug("Creating the consumer ...");
KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());