This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit b7510cbded3a82378b7f689e3c5506fc8bfabc25
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Fri Oct 2 22:17:27 2020 +0200

    Added RabbitMQ itests.
---
 tests/itests-rabbitmq/pom.xml                      |  72 ++++++++
 .../rabbitmq/clients/RabbitMQClient.java           | 184 +++++++++++++++++++++
 .../rabbitmq/services/ConnectionProperties.java    |  25 +++
 .../services/RabbitMQLocalContainerService.java    |  73 ++++++++
 .../rabbitmq/services/RabbitMQRemoteService.java   |  44 +++++
 .../rabbitmq/services/RabbitMQService.java         |  60 +++++++
 .../rabbitmq/services/RabbitMQServiceFactory.java  |  45 +++++
 .../sink/CamelRabbitMQPropertyFactory.java         |  76 +++++++++
 .../rabbitmq/sink/RabbitMQSinkITCase.java          | 145 ++++++++++++++++
 .../source/CamelRabbitMQPropertyFactory.java       |  76 +++++++++
 .../rabbitmq/source/RabbitMQSourceITCase.java      | 109 ++++++++++++
 tests/pom.xml                                      |   1 +
 12 files changed, 910 insertions(+)

diff --git a/tests/itests-rabbitmq/pom.xml b/tests/itests-rabbitmq/pom.xml
new file mode 100644
index 0000000..4365c68
--- /dev/null
+++ b/tests/itests-rabbitmq/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-rabbitmq</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: RabbitMQ</name>
+
+    <properties>
+<!--        <jmx.port>9010</jmx.port>-->
+<!--        <rmi.server>localhost</rmi.server>-->
+<!--        <jvm.user.settings />-->
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-rabbitmq</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>rabbitmq</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+<!--            <plugin>-->
+<!--                <groupId>org.apache.maven.plugins</groupId>-->
+<!--                <artifactId>maven-failsafe-plugin</artifactId>-->
+<!--                <configuration>-->
+<!--                    <argLine>${common.failsafe.args} ${jvm.user.settings} 
-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false  
-Dcom.sun.management.jmxremote.ssl=false 
-Dcom.sun.management.jmxremote.local.only=false 
-Djava.rmi.server.hostname=${rmi.server} 
-Dcom.sun.management.jmxremote.rmi.port=${jmx.port}</argLine>-->
+<!--                    <skipTests>${skipIntegrationTests}</skipTests>-->
+<!--                </configuration>-->
+<!--            </plugin>-->
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java
new file mode 100644
index 0000000..19da657
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/clients/RabbitMQClient.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.rabbitmq.clients;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.MessageProperties;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A basic RabbitMQ client
+ */
+public class RabbitMQClient {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQClient.class);
+    private static final String DEFAULT_EXCHANGE_TYPE = "direct";
+
+    private Connection connection;
+    private Channel channel;
+
+    private ConnectionFactory factory;
+
+    public RabbitMQClient(String uri) {
+        factory = new ConnectionFactory();
+        try {
+            factory.setUri(uri);
+        } catch (Exception e) {
+            LOG.error("Unable to create the RabbitMQ client {}", 
e.getMessage(), e);
+            Assertions.fail(e);
+        }
+    }
+
+    private static void capturingClose(Closeable closeable, String 
closableDescription) {
+        LOG.debug("Closing the " + closableDescription);
+
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Throwable t) {
+                LOG.warn("Error closing the {}: {}", closableDescription, 
t.getMessage(), t);
+            }
+        }
+    }
+
+    private static void capturingClose(AutoCloseable closeable, String 
closableDescription) {
+        LOG.debug("Closing the " + closableDescription);
+
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Throwable t) {
+                LOG.warn("Error closing the {}: {}", closableDescription, 
t.getMessage(), t);
+            }
+        }
+    }
+
+    public void start() throws Exception {
+        LOG.debug("Starting the RabbitMQ client");
+
+        try {
+            LOG.debug("Creating the connection");
+            connection = factory.newConnection();
+            LOG.debug("Connection created successfully");
+
+            LOG.debug("Creating the Channel");
+            channel = connection.createChannel();
+            LOG.debug("Channel created successfully");
+        } catch (Throwable t) {
+            LOG.trace("Something wrong happened while initializing the 
RabbitMQ client: {}", t.getMessage(), t);
+
+            capturingClose(connection, "connection");
+            throw t;
+        }
+    }
+
+    public void stop() {
+        try {
+            LOG.debug("Stopping the channel");
+            capturingClose(channel, "channel");
+
+            LOG.debug("Stopping the RabbitMQ connection");
+            capturingClose(connection, "connection");
+        } finally {
+            channel = null;
+            connection = null;
+        }
+    }
+
+    public AMQP.Queue.DeclareOk createQueue(final String queueName) {
+        try {
+            start();
+            return channel.queueDeclare(queueName, true, false, false, null);
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+
+            // unreachable
+            return null;
+        } finally {
+            stop();
+        }
+    }
+
+    public AMQP.Exchange.DeclareOk createExchange(final String exchangeName) {
+        return createExchange(exchangeName, DEFAULT_EXCHANGE_TYPE);
+    }
+
+    public AMQP.Exchange.DeclareOk createExchange(final String exchangeName, 
final String exchangeType) {
+        try {
+            start();
+            return channel.exchangeDeclare(exchangeName, exchangeType);
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+
+            // unreachable
+            return null;
+        } finally {
+            stop();
+        }
+    }
+
+    public AMQP.Queue.BindOk bindExchangeToQueue(final String exchangeName, 
final String queueName) {
+        try {
+            start();
+            return channel.queueBind(exchangeName, exchangeName, "");
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+
+            // unreachable
+            return null;
+        } finally {
+            stop();
+        }
+    }
+
+    /**
+     * Sends data to a RabbitMQ queue
+     *
+     * @param queue the queue to send data to
+     * @param data  the (string) data to send
+     * @throws IOException
+     */
+    public void send(final String queue, final String data) {
+        try {
+            start();
+            channel.basicPublish("", queue, 
MessageProperties.PERSISTENT_TEXT_PLAIN, data.getBytes(StandardCharsets.UTF_8));
+        } catch (Exception e) {
+            Assertions.fail(e.getMessage());
+        } finally {
+            stop();
+        }
+    }
+
+    /**
+     * Receives data from a JMS queue or topic
+     *
+     * @param queue     the queue or topic to receive data from
+     * @param deliverCallback the callback used to test each received messages
+     */
+    public void receive(final String queue, DeliverCallback deliverCallback) 
throws Exception {
+        channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
+    }
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
new file mode 100644
index 0000000..15b7f8d
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/ConnectionProperties.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+public interface ConnectionProperties {
+    String username();
+    String password();
+    String hostname();
+    int port();
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
new file mode 100644
index 0000000..ee7a827
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQLocalContainerService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.RabbitMQContainer;
+
+public class RabbitMQLocalContainerService implements RabbitMQService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQLocalContainerService.class);
+
+    private final RabbitMQContainer container = new 
RabbitMQContainer("rabbitmq:3.8-management");
+
+    public RabbitMQLocalContainerService() {
+        container.start();
+    }
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return new ConnectionProperties() {
+            @Override
+            public String username() {
+                return container.getAdminUsername();
+            }
+
+            @Override
+            public String password() {
+                return container.getAdminPassword();
+            }
+
+            @Override
+            public String hostname() {
+                return container.getHost();
+            }
+
+            @Override
+            public int port() {
+                return container.getAmqpPort();
+            }
+        };
+    }
+
+    @Override
+    public RabbitMQClient getClient() {
+        return new RabbitMQClient(container.getAmqpUrl());
+    }
+
+    @Override
+    public void initialize() {
+        LOG.info("RabbitMQ container running on {}", container.getAmqpUrl());
+    }
+
+    @Override
+    public void shutdown() {
+        container.stop();
+    }
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
new file mode 100644
index 0000000..74d2a48
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQRemoteService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+
+public class RabbitMQRemoteService implements RabbitMQService {
+
+
+    @Override
+    public ConnectionProperties connectionProperties() {
+        return null;
+    }
+
+    @Override
+    public RabbitMQClient getClient() {
+        return null;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
new file mode 100644
index 0000000..d026ba2
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface RabbitMQService extends BeforeAllCallback, AfterAllCallback {
+
+
+    /**
+     * The connection properties for the service
+     * @return
+     */
+    ConnectionProperties connectionProperties();
+
+    /**
+     * Get the appropriate client for the service
+     * @return
+     */
+    RabbitMQClient getClient();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception 
{
+        initialize();
+    }
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
new file mode 100644
index 0000000..50013dd
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/services/RabbitMQServiceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class RabbitMQServiceFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQServiceFactory.class);
+
+    private RabbitMQServiceFactory() {
+
+    }
+
+    public static RabbitMQService createService() {
+        String instanceType = System.getProperty("rabbitmq.instance.type");
+
+        if (instanceType == null || 
instanceType.equals("local-rabbitmq-container")) {
+            return new RabbitMQLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new RabbitMQRemoteService();
+        }
+
+        LOG.error("rabbit-mq instance must be one of 
'local-rabbitmq-container' or 'remote");
+        throw new UnsupportedOperationException(String.format("Invalid 
rabbitmq instance type: %s", instanceType));
+
+    }
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java
new file mode 100644
index 0000000..0c8e467
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/CamelRabbitMQPropertyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelRabbitMQPropertyFactory extends 
SinkConnectorPropertyFactory<CamelRabbitMQPropertyFactory> {
+    public CamelRabbitMQPropertyFactory withHostname(String value) {
+        return setProperty("camel.component.rabbitmq.hostname", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPortNumber(int value) {
+        return setProperty("camel.component.rabbitmq.portNumber", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withUsername(String value) {
+        return setProperty("camel.component.rabbitmq.username", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPassword(String value) {
+        return setProperty("camel.component.rabbitmq.password", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeName(String value) {
+        return setProperty("camel.source.path.exchangeName", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeType(String value) {
+        return setProperty("camel.source.endpoint.exchangeType", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) {
+        return setProperty("camel.source.endpoint.autoDelete", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withQueue(String value) {
+        return setProperty("camel.source.endpoint.queue", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withRoutingKey(String value) {
+        return setProperty("camel.source.endpoint.routingKey", value);
+    }
+
+    public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String 
exchangeName) {
+        String sourceUrl = String.format("rabbitmq://%s", exchangeName);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, sourceUrl);
+    }
+
+    public static CamelRabbitMQPropertyFactory basic() {
+        return new CamelRabbitMQPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelRabbitmqSinkConnector")
+                    
.withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSinkConnector")
+                    
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    
.withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter");
+    
+    }
+
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
new file mode 100644
index 0000000..80b1606
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/sink/RabbitMQSinkITCase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.rabbitmq.sink;
+
+import java.io.UnsupportedEncodingException;
+import java.util.concurrent.CountDownLatch;
+
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService;
+import 
org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Testcontainers
+public class RabbitMQSinkITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static RabbitMQService rabbitmqService = 
RabbitMQServiceFactory.createService();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSinkITCase.class);
+    private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
+
+    private RabbitMQClient rabbitMQClient;
+    private int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-rabbitmq-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        rabbitMQClient = rabbitmqService.getClient();
+    }
+
+    private boolean checkRecord(Delivery rabbitMQDelivery) {
+        try {
+            String message = new String(rabbitMQDelivery.getBody(), "UTF-8");
+            LOG.debug("Received: {}", message);
+
+            received++;
+
+            if (received == expect) {
+                return false;
+            }
+
+            return true;
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("Failed to read message: {}", e.getMessage(), e);
+            fail("Failed to read message: " + e.getMessage());
+            return false;
+        }
+    }
+
+    private void runBasicStringTest(ConnectorPropertyFactory 
connectorPropertyFactory) throws Exception {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        LOG.debug("Creating the consumer ...");
+        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+        try {
+            rabbitMQClient.start();
+            consumeRabbitMQMessages(latch);
+
+            KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test 
message " + i);
+            }
+
+            LOG.debug("Created the consumer ... About to receive messages");
+
+            latch.await();
+            assertEquals(received, expect, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
+        } finally {
+            rabbitMQClient.stop();
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testSource() throws Exception {
+        ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
+                .basic()
+                .withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withUrl("")
+                .append("username", 
rabbitmqService.connectionProperties().username())
+                .append("password", 
rabbitmqService.connectionProperties().password())
+                .append("autoDelete", "false")
+                .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                .append("RoutingKey", DEFAULT_RABBITMQ_QUEUE)
+                .append("skipExchangeDeclare", "true")
+                .append("skipQueueBind", "true")
+                .append("hostname", 
rabbitmqService.connectionProperties().hostname())
+                .append("portNumber", 
rabbitmqService.connectionProperties().port())
+                .buildUrl();
+
+        runBasicStringTest(factory);
+    }
+
+    private void consumeRabbitMQMessages(CountDownLatch latch) {
+        DeliverCallback deliveryCallback = (consumerTag, delivery) -> {
+            if (!this.checkRecord(delivery)) {
+                latch.countDown();
+            }
+        };
+        try {
+            rabbitMQClient.receive(DEFAULT_RABBITMQ_QUEUE, deliveryCallback);
+        } catch (Exception e) {
+            LOG.error("RabbitMQ test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
new file mode 100644
index 0000000..1548b2e
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory;
+
+public class CamelRabbitMQPropertyFactory extends 
SourceConnectorPropertyFactory<CamelRabbitMQPropertyFactory> {
+    public CamelRabbitMQPropertyFactory withHostname(String value) {
+        return setProperty("camel.component.rabbitmq.hostname", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPortNumber(int value) {
+        return setProperty("camel.component.rabbitmq.portNumber", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withUsername(String value) {
+        return setProperty("camel.component.rabbitmq.username", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withPassword(String value) {
+        return setProperty("camel.component.rabbitmq.password", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeName(String value) {
+        return setProperty("camel.source.path.exchangeName", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withExchangeType(String value) {
+        return setProperty("camel.source.endpoint.exchangeType", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withAutoDelete(boolean value) {
+        return setProperty("camel.source.endpoint.autoDelete", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withQueue(String value) {
+        return setProperty("camel.source.endpoint.queue", value);
+    }
+
+    public CamelRabbitMQPropertyFactory withRoutingKey(String value) {
+        return setProperty("camel.source.endpoint.routingKey", value);
+    }
+
+    public EndpointUrlBuilder<CamelRabbitMQPropertyFactory> withUrl(String 
exchangeName) {
+        String sourceUrl = String.format("rabbitmq://%s", exchangeName);
+
+        return new EndpointUrlBuilder<>(this::withSourceUrl, sourceUrl);
+    }
+
+    public static CamelRabbitMQPropertyFactory basic() {
+        return new CamelRabbitMQPropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelRabbitmqSourceConnector")
+                    
.withConnectorClass("org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector")
+                    
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    
.withValueConverterClass("org.apache.kafka.connect.converters.ByteArrayConverter");
+    
+    }
+
+}
diff --git 
a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
new file mode 100644
index 0000000..73a75e3
--- /dev/null
+++ 
b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.rabbitmq.source;
+
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient;
+import org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQService;
+import 
org.apache.camel.kafkaconnector.rabbitmq.services.RabbitMQServiceFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Testcontainers
+public class RabbitMQSourceITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static RabbitMQService rabbitmqService = 
RabbitMQServiceFactory.createService();
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQSourceITCase.class);
+    private static final String DEFAULT_RABBITMQ_QUEUE = "Q.test.kafka.import";
+
+    private RabbitMQClient rabbitMQClient;
+    private int received;
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-rabbitmq-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        received = 0;
+        rabbitMQClient = rabbitmqService.getClient();
+    }
+
+    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
+        LOG.debug("Received: {}", record.value());
+        received++;
+
+        if (received == expect) {
+            return false;
+        }
+
+        return true;
+    }
+
+    public void runBasicStringTest(ConnectorPropertyFactory 
connectorPropertyFactory) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
+        rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE);
+
+        for (int i = 0; i < expect; i++) {
+            rabbitMQClient.send(DEFAULT_RABBITMQ_QUEUE, "Test string message");
+        }
+
+        LOG.debug("Creating the kafka consumer ...");
+        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
+        LOG.debug("Created the kafka consumer ...");
+
+        assertEquals(received, expect, "Didn't process the expected amount of 
messages");
+    }
+
+    @Test
+    @Timeout(90)
+    public void testSource() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory factory = CamelRabbitMQPropertyFactory
+                .basic()
+                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
+                .withUrl("")
+                .append("username", 
rabbitmqService.connectionProperties().username())
+                .append("password", 
rabbitmqService.connectionProperties().password())
+                .append("autoDelete", "false")
+                .append("queue", DEFAULT_RABBITMQ_QUEUE)
+                .append("skipExchangeDeclare", "true")
+                .append("skipQueueBind", "true")
+                .append("hostname", 
rabbitmqService.connectionProperties().hostname())
+                .append("portNumber", 
rabbitmqService.connectionProperties().port())
+                .buildUrl();
+
+        runBasicStringTest(factory);
+    }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index d52caed..c95de51 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -59,6 +59,7 @@
         <module>itests-azure-common</module>
         <module>itests-azure-storage-queue</module>
         <module>perf-tests-rabbitmq</module>
+        <module>itests-rabbitmq</module>
     </modules>
 
 

Reply via email to