This is an automated email from the ASF dual-hosted git repository.
zhfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new b570562218 Fix #3656 Improve camel-quarkus-paho-mqtt5 test coverage
(#3709)
b570562218 is described below
commit b570562218cfaefa8a2bbee7ef106e35ebeb6113
Author: Amos Feng <[email protected]>
AuthorDate: Mon Apr 11 14:36:33 2022 +0800
Fix #3656 Improve camel-quarkus-paho-mqtt5 test coverage (#3709)
---
.../paho/mqtt5/deployment/PahoMqtt5Processor.java | 22 +++-
integration-tests/paho-mqtt5/pom.xml | 39 +++++++
.../quarkus/component/paho/mqtt5/it/Counter.java | 36 +++++++
.../component/paho/mqtt5/it/PahoMqtt5Resource.java | 120 ++++++++++++++++++++-
.../component/paho/mqtt5/it/PahoMqtt5Route.java | 57 ++++++++++
.../src/main/resources/application.properties | 17 +++
.../src/main/resources/clientkeystore.jks | Bin 0 -> 2263 bytes
.../paho/mqtt5/it/InjectPahoContainer.java | 28 +++++
.../mqtt5/it/PahoMqtt5ReconnectAfterFailureIT.java | 24 +++++
...ava => PahoMqtt5ReconnectAfterFailureTest.java} | 51 ++++++---
.../component/paho/mqtt5/it/PahoMqtt5Test.java | 37 ++++++-
.../paho/mqtt5/it/PahoMqtt5TestResource.java | 67 ++++++++++--
.../component/paho/mqtt5/it/ReconnectProfile.java | 37 +++++++
.../paho-mqtt5/src/test/resources/certs/ca.key | 30 ++++++
.../paho-mqtt5/src/test/resources/certs/ca.pem | 22 ++++
.../paho-mqtt5/src/test/resources/certs/server.key | 27 +++++
.../paho-mqtt5/src/test/resources/certs/server.pem | 20 ++++
.../paho-mqtt5/src/test/resources/mosquitto.conf | 14 +++
.../paho-mqtt5/src/test/resources/password.conf | 17 +++
19 files changed, 634 insertions(+), 31 deletions(-)
diff --git
a/extensions/paho-mqtt5/deployment/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/deployment/PahoMqtt5Processor.java
b/extensions/paho-mqtt5/deployment/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/deployment/PahoMqtt5Processor.java
index 17fc9f2db7..6e8eac5756 100644
---
a/extensions/paho-mqtt5/deployment/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/deployment/PahoMqtt5Processor.java
+++
b/extensions/paho-mqtt5/deployment/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/deployment/PahoMqtt5Processor.java
@@ -16,11 +16,18 @@
*/
package org.apache.camel.quarkus.component.paho.mqtt5.deployment;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.ResourceBundle;
+
+import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBundleBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
+import org.eclipse.paho.mqttv5.client.internal.ResourceBundleCatalog;
import org.eclipse.paho.mqttv5.client.internal.SSLNetworkModuleFactory;
import org.eclipse.paho.mqttv5.client.internal.TCPNetworkModuleFactory;
import org.eclipse.paho.mqttv5.client.logging.JSR47Logger;
@@ -38,8 +45,14 @@ class PahoMqtt5Processor {
}
@BuildStep
- ReflectiveClassBuildItem registerReflectiveClasses() {
- return new ReflectiveClassBuildItem(false, false, JSR47Logger.class);
+ void registerReflectiveClasses(BuildProducer<ReflectiveClassBuildItem> p) {
+ p.produce(new ReflectiveClassBuildItem(false, false,
JSR47Logger.class));
+ p.produce(new ReflectiveClassBuildItem(false, false,
ResourceBundleCatalog.class));
+ p.produce(new ReflectiveClassBuildItem(false, false,
ResourceBundle.class));
+ p.produce(new ReflectiveClassBuildItem(false, false, FileLock.class));
+ p.produce(new ReflectiveClassBuildItem(true, false,
FileChannel.class));
+ p.produce(new ReflectiveClassBuildItem(true, false,
RandomAccessFile.class));
+ p.produce(new ReflectiveClassBuildItem(true, false,
"sun.nio.ch.FileLockImpl"));
}
@BuildStep
@@ -53,8 +66,9 @@ class PahoMqtt5Processor {
}
@BuildStep()
- NativeImageResourceBundleBuildItem hapiMessages() {
- return new
NativeImageResourceBundleBuildItem("org.eclipse.paho.mqttv5.client.internal.nls.logcat");
+ void
registerResourceBundle(BuildProducer<NativeImageResourceBundleBuildItem> p) {
+ p.produce(new
NativeImageResourceBundleBuildItem("org.eclipse.paho.mqttv5.client.internal.nls.logcat"));
+ p.produce(new
NativeImageResourceBundleBuildItem("org.eclipse.paho.mqttv5.common.nls.messages"));
}
}
diff --git a/integration-tests/paho-mqtt5/pom.xml
b/integration-tests/paho-mqtt5/pom.xml
index f1f3558290..f803d81374 100644
--- a/integration-tests/paho-mqtt5/pom.xml
+++ b/integration-tests/paho-mqtt5/pom.xml
@@ -35,6 +35,14 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-paho-mqtt5</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-mock</artifactId>
+ </dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
@@ -67,6 +75,11 @@
<artifactId>quarkus-junit4-mock</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-integration-test-support</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
@@ -106,6 +119,32 @@
</activation>
<dependencies>
<!-- The following dependencies guarantee that this module is
built after them. You can update them by running `mvn process-resources
-Pformat -N` from the source tree root directory -->
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-mock-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-paho-mqtt5-deployment</artifactId>
diff --git
a/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/Counter.java
b/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/Counter.java
new file mode 100644
index 0000000000..637f8a99d3
--- /dev/null
+++
b/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/Counter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.paho.mqtt5.it;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class Counter {
+ CountDownLatch counter = new CountDownLatch(1);
+
+ public void countDown() {
+ counter.countDown();
+ }
+
+ public void await(int timeout, TimeUnit unit) throws Exception {
+ counter.await(timeout, unit);
+ }
+}
diff --git
a/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Resource.java
b/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Resource.java
index 14eda053ac..84afe889fc 100644
---
a/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Resource.java
+++
b/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Resource.java
@@ -16,41 +16,67 @@
*/
package org.apache.camel.quarkus.component.paho.mqtt5.it;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants;
+import org.apache.camel.spi.RouteController;
import org.eclipse.microprofile.config.ConfigProvider;
@Path("/paho-mqtt5")
@ApplicationScoped
public class PahoMqtt5Resource {
+ @Inject
+ CamelContext context;
+
+ @Inject
+ Counter counter;
+
@Inject
ProducerTemplate producerTemplate;
@Inject
ConsumerTemplate consumerTemplate;
+ private final String keystore = "clientkeystore.jks";
+ private final String password = "quarkus";
+
@Path("/{protocol}/{queueName}")
@GET
@Produces(MediaType.TEXT_PLAIN)
public String consumePahoMessage(
@PathParam("protocol") String protocol,
@PathParam("queueName") String queueName) {
- return consumerTemplate.receiveBody("paho-mqtt5:" + queueName +
"?brokerUrl=" + brokerUrl(protocol), 5000,
+ if ("ssl".equals(protocol)) {
+ setKeyStore(keystore, password);
+ }
+ String result = consumerTemplate.receiveBody("paho-mqtt5:" + queueName
+ "?brokerUrl=" + brokerUrl(protocol), 5000,
String.class);
+ if ("ssl".equals(protocol)) {
+ removeKeyStore(keystore);
+ }
+ return result;
}
@Path("/{protocol}/{queueName}")
@@ -60,7 +86,72 @@ public class PahoMqtt5Resource {
@PathParam("protocol") String protocol,
@PathParam("queueName") String queueName,
String message) throws Exception {
- producerTemplate.sendBody("paho-mqtt5:" + queueName +
"?retained=true&brokerUrl=" + brokerUrl(protocol), message);
+ if ("ssl".equals(protocol)) {
+ setKeyStore(keystore, password);
+ }
+ try {
+ producerTemplate.sendBody("paho-mqtt5:" + queueName +
"?retained=true&brokerUrl=" + brokerUrl(protocol), message);
+ } finally {
+ if ("ssl".equals(protocol)) {
+ removeKeyStore(keystore);
+ }
+ }
+ return Response.created(new URI("https://camel.apache.org/")).build();
+ }
+
+ @Path("/override/{queueName}")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ public Response overrideQueueName(
+ @PathParam("queueName") String queueName,
+ String message) throws Exception {
+
producerTemplate.sendBodyAndHeader("paho-mqtt5:test?retained=true&brokerUrl=" +
brokerUrl("tcp"), message,
+ PahoMqtt5Constants.CAMEL_PAHO_OVERRIDE_TOPIC, queueName);
+ return Response.created(new URI("https://camel.apache.org/")).build();
+ }
+
+ @Path("/readThenWriteWithFilePersistenceShouldSucceed")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String
readThenWriteWithFilePersistenceShouldSucceed(@QueryParam("message") String
message) throws Exception {
+ producerTemplate.sendBody(
+
"paho-mqtt5:withFilePersistence?retained=true&persistence=FILE&brokerUrl="
+ + brokerUrl("tcp"),
+ message);
+ return consumerTemplate.receiveBody(
+ "paho-mqtt5:withFilePersistence?persistence=FILE&brokerUrl=" +
brokerUrl("tcp"),
+ 5000,
+ String.class);
+ }
+
+ @Path("/routeStatus/{id}")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String routeStatus(@PathParam("id") String routeId,
+ @QueryParam("waitForContainerStarted") @DefaultValue("false")
boolean wait) throws Exception {
+ RouteController routeController = context.getRouteController();
+ if (wait) {
+ counter.await(30, TimeUnit.SECONDS);
+ }
+ return routeController.getRouteStatus(routeId).name();
+ }
+
+ @Path("/mock")
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ public String mock(String message) throws Exception {
+ MockEndpoint endpoint = context.getEndpoint("mock:test",
MockEndpoint.class);
+ endpoint.expectedBodiesReceived(message);
+
+ endpoint.assertIsSatisfied();
+ return "OK";
+ }
+
+ @Path("/send")
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response send(String message) throws Exception {
+ producerTemplate.sendBody("direct:test", message);
return Response.created(new URI("https://camel.apache.org/")).build();
}
@@ -68,4 +159,29 @@ public class PahoMqtt5Resource {
return ConfigProvider.getConfig().getValue("paho5.broker." + protocol
+ ".url", String.class);
}
+ private void setKeyStore(String keystore, String password) {
+ InputStream in =
Thread.currentThread().getContextClassLoader().getResourceAsStream(keystore);
+
+ try {
+ Files.copy(in, Paths.get(keystore));
+ } catch (Exception e) {
+ }
+
+ System.setProperty("javax.net.ssl.keyStore", keystore);
+ System.setProperty("javax.net.ssl.keyStorePassword", password);
+ System.setProperty("javax.net.ssl.trustStore", keystore);
+ System.setProperty("javax.net.ssl.trustStorePassword", password);
+ }
+
+ private void removeKeyStore(String keystore) {
+ try {
+ Files.delete(Paths.get(keystore));
+ } catch (Exception e) {
+ }
+
+ System.clearProperty("javax.net.ssl.keyStore");
+ System.clearProperty("javax.net.ssl.keyStorePassword");
+ System.clearProperty("javax.net.ssl.trustStore");
+ System.clearProperty("javax.net.ssl.trustStorePassword");
+ }
}
diff --git
a/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Route.java
b/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Route.java
new file mode 100644
index 0000000000..3b88dcf9a4
--- /dev/null
+++
b/integration-tests/paho-mqtt5/src/main/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Route.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.paho.mqtt5.it;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.support.RoutePolicySupport;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+@ApplicationScoped
+public class PahoMqtt5Route extends RouteBuilder {
+ public static final String TESTING_ROUTE_ID = "testingRoute";
+
+ @Inject
+ Counter counter;
+
+ @Override
+ public void configure() throws Exception {
+ SupervisingRouteController supervising =
getCamelContext().getRouteController().supervising();
+ supervising.setBackOffDelay(200);
+ supervising.setIncludeRoutes("paho-mqtt5:*");
+
+
from("direct:test").to("paho-mqtt5:queue?lazyStartProducer=true&brokerUrl=" +
brokerUrl("tcp"));
+ from("paho-mqtt5:queue?brokerUrl=" + brokerUrl("tcp"))
+ .id(TESTING_ROUTE_ID)
+ .routePolicy(new RoutePolicySupport() {
+ @Override
+ public void onStart(Route route) {
+ counter.countDown();
+ }
+ })
+ .to("mock:test");
+ }
+
+ private String brokerUrl(String protocol) {
+ return ConfigProvider.getConfig().getValue("paho5.broker." + protocol
+ ".url", String.class);
+ }
+}
diff --git
a/integration-tests/paho-mqtt5/src/main/resources/application.properties
b/integration-tests/paho-mqtt5/src/main/resources/application.properties
new file mode 100644
index 0000000000..7db61c14ab
--- /dev/null
+++ b/integration-tests/paho-mqtt5/src/main/resources/application.properties
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+quarkus.native.resources.includes=*.jks
diff --git a/integration-tests/paho-mqtt5/src/main/resources/clientkeystore.jks
b/integration-tests/paho-mqtt5/src/main/resources/clientkeystore.jks
new file mode 100644
index 0000000000..51643f16d0
Binary files /dev/null and
b/integration-tests/paho-mqtt5/src/main/resources/clientkeystore.jks differ
diff --git
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/InjectPahoContainer.java
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/InjectPahoContainer.java
new file mode 100644
index 0000000000..c0608ea79e
--- /dev/null
+++
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/InjectPahoContainer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.paho.mqtt5.it;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ ElementType.ANNOTATION_TYPE, ElementType.METHOD, ElementType.FIELD })
+@Retention(RetentionPolicy.RUNTIME)
+public @interface InjectPahoContainer {
+}
diff --git
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5ReconnectAfterFailureIT.java
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5ReconnectAfterFailureIT.java
new file mode 100644
index 0000000000..1fc040c6ee
--- /dev/null
+++
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5ReconnectAfterFailureIT.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.paho.mqtt5.it;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+
+@QuarkusIntegrationTest
+public class PahoMqtt5ReconnectAfterFailureIT extends
PahoMqtt5ReconnectAfterFailureTest {
+}
diff --git
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Test.java
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5ReconnectAfterFailureTest.java
similarity index 52%
copy from
integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Test.java
copy to
integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5ReconnectAfterFailureTest.java
index 25ffaf2ecb..227aa45ec3 100644
---
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Test.java
+++
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5ReconnectAfterFailureTest.java
@@ -14,39 +14,58 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.camel.quarkus.component.paho.mqtt5.it;
-import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.TestProfile;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
-import static org.hamcrest.core.Is.is;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.is;
@QuarkusTest
-@QuarkusTestResource(PahoMqtt5TestResource.class)
-class PahoMqtt5Test {
+@TestProfile(ReconnectProfile.class)
+public class PahoMqtt5ReconnectAfterFailureTest {
+
+ @InjectPahoContainer
+ GenericContainer container;
- @ParameterizedTest
- @ValueSource(strings = { "tcp", "ws" })
- public void sendReceive(String protocol) {
- String message = "Hello Camel Quarkus " + protocol;
+ @Test
+ public void test() throws Exception {
+ String msg = "msg";
RestAssured.given()
.contentType(ContentType.TEXT)
- .body(message)
- .post("/paho-mqtt5/tcp/{queueName}", protocol + "-test-queue")
+ .get("/paho-mqtt5/routeStatus/" +
PahoMqtt5Route.TESTING_ROUTE_ID)
.then()
- .statusCode(201);
+ .statusCode(200)
+ .body(is("Stopped"));
+
+ container.start();
RestAssured.given()
.contentType(ContentType.TEXT)
- .get("/paho-mqtt5/tcp/{queueName}", protocol + "-test-queue")
+ .get("/paho-mqtt5/routeStatus/" +
PahoMqtt5Route.TESTING_ROUTE_ID + "?waitForContainerStarted=true")
.then()
.statusCode(200)
- .body(is(message));
- }
+ .body(anyOf(is("Started"), is("Starting")));
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(msg)
+ .post("/paho-mqtt5/send")
+ .then()
+ .statusCode(201);
+
+ RestAssured.given()
+ .body(msg)
+ .post("/paho-mqtt5/mock")
+ .then()
+ .statusCode(200)
+ .body(is("OK"));
+ }
}
diff --git
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Test.java
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Test.java
index 25ffaf2ecb..528c0b3ac5 100644
---
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Test.java
+++
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5Test.java
@@ -20,6 +20,7 @@ import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -30,23 +31,53 @@ import static org.hamcrest.core.Is.is;
class PahoMqtt5Test {
@ParameterizedTest
- @ValueSource(strings = { "tcp", "ws" })
+ @ValueSource(strings = { "tcp", "ssl", "ws" })
public void sendReceive(String protocol) {
String message = "Hello Camel Quarkus " + protocol;
RestAssured.given()
.contentType(ContentType.TEXT)
.body(message)
- .post("/paho-mqtt5/tcp/{queueName}", protocol + "-test-queue")
+ .post("/paho-mqtt5/{protocol}/{queueName}", protocol, protocol
+ "-test-queue")
.then()
.statusCode(201);
RestAssured.given()
.contentType(ContentType.TEXT)
- .get("/paho-mqtt5/tcp/{queueName}", protocol + "-test-queue")
+ .get("/paho-mqtt5/{protocol}/{queueName}", protocol, protocol
+ "-test-queue")
.then()
.statusCode(200)
.body(is(message));
}
+ @Test
+ public void overrideTopic() {
+ String message = "Hello Camel Quarkus Override Topic";
+ String queue = "myoverride";
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(message)
+ .post("/paho-mqtt5/override/" + queue)
+ .then()
+ .statusCode(201);
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .get("/paho-mqtt5/tcp/" + queue)
+ .then()
+ .statusCode(200)
+ .body(is(message));
+ }
+
+ @Test
+ public void readThenWriteWithFilePersistenceShouldSucceed() {
+ String message = "readThenWriteWithFilePersistenceShouldSucceed
message content: 762e6af1-3ec7-40e0-9271-0c98a1001728";
+ RestAssured.given()
+ .queryParam("message", message)
+
.get("/paho-mqtt5/readThenWriteWithFilePersistenceShouldSucceed")
+ .then()
+ .statusCode(200)
+ .body(is(message));
+ }
}
diff --git
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5TestResource.java
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5TestResource.java
index 6f8298b73b..59581c7815 100644
---
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5TestResource.java
+++
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/PahoMqtt5TestResource.java
@@ -16,12 +16,16 @@
*/
package org.apache.camel.quarkus.component.paho.mqtt5.it;
+import java.util.HashMap;
import java.util.Map;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.apache.camel.quarkus.test.AvailablePortFinder;
import org.apache.camel.util.CollectionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
@@ -32,25 +36,76 @@ public class PahoMqtt5TestResource implements
QuarkusTestResourceLifecycleManage
private static final Logger LOGGER =
LoggerFactory.getLogger(PahoMqtt5TestResource.class);
private static final String IMAGE = "eclipse-mosquitto:1.6.12";
private static final int TCP_PORT = 1883;
+ private static final int SSL_PORT = 8883;
+ private static final int WS_PORT = 9001;
+ private static final String MQTT_USERNAME = "quarkus";
+ private static final String MQTT_PASSWORD = "quarkus";
private GenericContainer<?> container;
+ private boolean useFixedPort = false;
+ private boolean startContainer = true;
+
+ @Override
+ public void init(Map<String, String> initArgs) {
+ initArgs.forEach((name, value) -> {
+ if (name.equals("useFixedPort")) {
+ useFixedPort = Boolean.parseBoolean(value);
+ } else if (name.equals("startContainer")) {
+ startContainer = Boolean.parseBoolean(value);
+ }
+ });
+ }
+
+ @Override
+ public void inject(TestInjector testInjector) {
+ testInjector.injectIntoFields(container,
+ new
TestInjector.AnnotatedAndMatchesType(InjectPahoContainer.class,
GenericContainer.class));
+ }
@Override
public Map<String, String> start() {
LOGGER.info(TestcontainersConfiguration.getInstance().toString());
try {
- container = new GenericContainer<>(IMAGE)
- .withExposedPorts(TCP_PORT)
- .withLogConsumer(new Slf4jLogConsumer(LOGGER))
+ Map<String, String> result = new HashMap<>();
+
+ if (useFixedPort) {
+ int port = AvailablePortFinder.getNextAvailable();
+
+ container = new FixedHostPortGenericContainer<>(IMAGE)
+ .withFixedExposedPort(port, TCP_PORT);
+
+ result = CollectionHelper.mapOf(
+ "paho5.broker.tcp.url", "tcp://localhost:" + port);
+ } else {
+ container = new GenericContainer<>(IMAGE)
+ .withExposedPorts(TCP_PORT, WS_PORT, SSL_PORT)
+ .withClasspathResourceMapping("mosquitto.conf",
"/mosquitto/config/mosquitto.conf", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("password.conf",
"/etc/mosquitto/password", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("certs/ca.pem",
"/etc/mosquitto/certs/ca.pem", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("certs/server.pem",
"/etc/mosquitto/certs/server.pem", BindMode.READ_ONLY)
+ .withClasspathResourceMapping("certs/server.key",
"/etc/mosquitto/certs/server.key",
+ BindMode.READ_ONLY);
+ }
+
+ container.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.waitingFor(Wait.forLogMessage(".* mosquitto version .*
running", 1))
.waitingFor(Wait.forListeningPort());
- container.start();
+ if (startContainer) {
+ container.start();
+ }
- return CollectionHelper.mapOf(
- "paho5.broker.tcp.url",
String.format("tcp://localhost:%d", container.getMappedPort(TCP_PORT)));
+ if (!useFixedPort) {
+ result = CollectionHelper.mapOf(
+ "camel.component.paho-mqtt5.username", MQTT_USERNAME,
+ "camel.component.paho-mqtt5.password", MQTT_PASSWORD,
+ "paho5.broker.tcp.url",
String.format("tcp://localhost:%d", container.getMappedPort(TCP_PORT)),
+ "paho5.broker.ssl.url",
String.format("ssl://localhost:%d", container.getMappedPort(SSL_PORT)),
+ "paho5.broker.ws.url",
String.format("ws://localhost:%d", container.getMappedPort(WS_PORT)));
+ }
+ return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/ReconnectProfile.java
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/ReconnectProfile.java
new file mode 100644
index 0000000000..c26a87a267
--- /dev/null
+++
b/integration-tests/paho-mqtt5/src/test/java/org/apache/camel/quarkus/component/paho/mqtt5/it/ReconnectProfile.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.paho.mqtt5.it;
+
+import java.util.List;
+
+import io.quarkus.test.junit.QuarkusTestProfile;
+import org.apache.camel.util.CollectionHelper;
+
+public class ReconnectProfile implements QuarkusTestProfile {
+ @Override
+ public List<TestResourceEntry> testResources() {
+ return List.of(new TestResourceEntry(PahoMqtt5TestResource.class,
CollectionHelper.mapOf(
+ "useFixedPort", "true",
+ "startContainer", "false")));
+ }
+
+ @Override
+ public boolean disableGlobalTestResources() {
+ return true;
+ }
+}
diff --git a/integration-tests/paho-mqtt5/src/test/resources/certs/ca.key
b/integration-tests/paho-mqtt5/src/test/resources/certs/ca.key
new file mode 100644
index 0000000000..cd4cae6986
--- /dev/null
+++ b/integration-tests/paho-mqtt5/src/test/resources/certs/ca.key
@@ -0,0 +1,30 @@
+-----BEGIN RSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,82C51D0B8CD84E74
+
+ParQZs7HrK6hAcDEY3OyqN2PAFEDwfpeV/43XYNr2b+tdG4TqnQ+liMiDPOJ3nSR
+luNu9Y57N/EPtjfVPCj46fNPtuYotWwxUO1Nq9fZ8Y3JjAGE0f77bDAduMIIH6aL
+oubriZ44Uav3TqBhAc7H7pdmdthuSX9wGHQ3FHWHoxS+zkTe01Lp8OYFP3aJggpR
+gFfZF+S+rUKRV1AnMBV6Eytfu5magM5qTqZ10/R0VtuN3b0vlh72WhIakXnf7orG
+P9/tRY3jl4z/ZB9q62FN6xlJSKZKXYdPQ8uYpK7z278ZGfoSsmLhXc5yAg4wnCxa
+FmsirKvmohhDnrCEuIk47RtV7S6XbYQAcDnmwYNY5qvCDTWxaCVzxJe4WcJyHaq1
+MWj/SQn8sIQcTA0XPGh/fzJ6LLJgpWV+P9T3hgBRYUoLIGd+zxxaOd+KxOdHmvsU
+sqV8v4O4KmYWKYIgaamiUHb/ahc/YKnzj9toTi3dYuZbOY6EnBvMX0+VfckGG86f
+Xn+st7/ZtylJN2O1o+BKMYwqOKhZOTzzqgLZeDuHa7AyCAyBdh7+Bw5ZHVpFrUcp
+ciNpNS8ywlgEibdIvBl/uIs/IL8uKvL7fZu6tsR/nbuhG/0THnhZtDILP04v75Oh
+h5eN/JU8NqxuykfpP6C+KjerrR/+ExEJJ8wivEDRYF4vzY9+vjNdo7+Yh4yRSwgr
+FxML9bC4CpR8zwvhhBmD6SzDvoKqM+vFnQr+g1/dFku7RCd/GeXTVK9tBJIr3974
+8BJ0j1wmJJyOaKIX+nwX/ptLiM9e9Yj7RkcOX7+V1QI9rs2f5LalXY8mulkg3tVK
+axz49F7G5vV8ODLVqXoPZqZGNtEA9J84FGkfvAb2VqlAd1Bmv7Pwv/w3i5ldRxq4
+mCVY9Pm7VEi5qunV4fBHAHQNLMy899Cu/OGXEb0ZZ+JqgKnXKXyhPHBouTDI4mZ2
+81AqS0u3/11J82SCgtwICApWCpu1T9LWYt67kZ3hteI4Vy9j9d251eCArD/HoOJe
+TCcfBAnNt2z2Ji0hKAqQQld3AV9800CAFifhAPb0/3KB48itNfhNHNIjFgym0puc
+M8VaF5oBS2g36tay9lGEiSs9u1lojRoaFB4avL4oCdFXgAWNkrO0dtZhhj65q3HP
+i0GRhCTKzQdIp6forBdMrz/oFMr117MsSdnXUd+78NKiczZMBYaoZOonmKQ1gkwe
+hePqXy+QcOowPaNOdq+T7QpBX3C7tr52lnqQ26zvaspnDOkcYZZ27JjwM619JDvh
+uAB7q8iySt985KKafN2rIY/5mVE7Nl/06m0lqzpd6aju5jmf8pUYcG2sdV509KQg
+6ZRDZELizd2T/QwhGPKSAQBfF9CgqgWUBSeMtX1gbIShbf7hycPj1SmPMGzUUn+3
+hZ1Qtou2GdtHNQj1jSlHi8YVX7qM8tfAGDZnQ+30Dx0a03uKuieN/h6VACb9fkOI
+TErii3pNuW5M/p4DPSXwW1cwQFpxs2FTZKyn7QBBPSSxymEK3mirZjFiKhtKxj/C
+E64mtbZxdPH9tzO2WiqfHW1St7XmP1FshDfaQsOOMmq0l0giluc9Lg==
+-----END RSA PRIVATE KEY-----
diff --git a/integration-tests/paho-mqtt5/src/test/resources/certs/ca.pem
b/integration-tests/paho-mqtt5/src/test/resources/certs/ca.pem
new file mode 100644
index 0000000000..bc257a93eb
--- /dev/null
+++ b/integration-tests/paho-mqtt5/src/test/resources/certs/ca.pem
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDqzCCApOgAwIBAgIUU/uMzfEhx21GXTOErSxGfoqt7qUwDQYJKoZIhvcNAQEL
+BQAwZTELMAkGA1UEBhMCQ04xEDAOBgNVBAgMB0JlaWppbmcxFTATBgNVBAcMDERl
+ZmF1bHQgQ2l0eTEcMBoGA1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDEPMA0GA1UE
+AwwGdGVzdENBMB4XDTIyMDQwMTA1NDEwMloXDTI3MDMzMTA1NDEwMlowZTELMAkG
+A1UEBhMCQ04xEDAOBgNVBAgMB0JlaWppbmcxFTATBgNVBAcMDERlZmF1bHQgQ2l0
+eTEcMBoGA1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDEPMA0GA1UEAwwGdGVzdENB
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyfqvVSox1n+5OBhCZC1C
+OKpLVKEIalvxOQ5ezDcAeqUXfNJloP+7YeAylpOEiuxVlCvy4UScxUb98isr8Q0W
+3hNIc1O/FNJfnOPEJoaJCGb0Uq0Mq4CaZonkN8/2nolPIC8yg2b2/mIa4ZJhH94x
+KNzSiKfBMEqv8f3iGi8RhHrcMZAMosbIWZ1fIRhciJDa1xvsSFZz/gOq/D+YC/ST
+MjgTm7q1HXugHDtTFNw0XO3iY7co13fZBJjGu8AqW1K8CFZEBoJlsTByvkkEYzf4
+OMjwKO6v9dc4spRY32v7Nt8zrTz4Hg4g2QrvJWacYfi3eHkoMTdJFMVGE6COo3dS
+VwIDAQABo1MwUTAdBgNVHQ4EFgQUbkVV4nft2UZ3SqcaZqftJFleKtIwHwYDVR0j
+BBgwFoAUbkVV4nft2UZ3SqcaZqftJFleKtIwDwYDVR0TAQH/BAUwAwEB/zANBgkq
+hkiG9w0BAQsFAAOCAQEAVnGMYzl1xX5UHSTpGwpu+twd0GntIw8eSBZkLHp2GJdG
+Xh1uozTq9UTwT7F/2zCcOFhCzTZi18cK0sHBb53yS6gK5HEarZvUyqFqmXNljJxi
+I2sw640O1rAygM6lpth5Kt7038lr5MgFccU0/2AKbQUyW7tUwupf4nQACh+1xQ3J
+m+jF050eJQTtApnRuUU+XF7ITYPANJPofa9rrfbP76LQk94De3PJP5Ijll+bgjs/
+7H1hm5PFaTF3zj86i77O7Ru63T8BajolosVVWcUSmtxa/zMRLNVSLncQd40bRl1i
+mxfKR5assb31p+U/c1NH5yLtburJcB3GPyMdsmeO0w==
+-----END CERTIFICATE-----
diff --git a/integration-tests/paho-mqtt5/src/test/resources/certs/server.key
b/integration-tests/paho-mqtt5/src/test/resources/certs/server.key
new file mode 100644
index 0000000000..4dd056694e
--- /dev/null
+++ b/integration-tests/paho-mqtt5/src/test/resources/certs/server.key
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAw/UF6GOP/oy8vlzPzgRdtaoLeYnGW+DGYnbdv3qY4LWpI+BE
+MqprVUC69jvgNzwgRXmudx1DM68u1HWvXR7yA2hHPJqshlPplqeRarX7i9QwrdTg
+FimEuw3BKeYjgqE9ddtYyDTcVF6mncRB6t4S9VrkuVBlIukt0lfZVs2JuaNKpRpc
+/mQ04DcdA8QrEXTPDzpirCTC61p2OsyldhmkbvS9UpV4ur6lkCS4KJ8VZLGdJeSk
+TGI7qOe6WJS859ukmgZjnaHuEcFzLcb0ywDubsO1q+dNaKyD8ju3GFJi2LnFKwlk
+N7w+b/zq8XmAIKvp6q2AvG/jpgmVmSSrKfwSsQIDAQABAoIBAQCr/4Fz/RAC6h0x
+Kd3sgsCOF/eCcn/9XUpEWvlAfKd7dXhE2TCDGDql1e9E+kFPuiLJWIjeXH0D/Jwq
+ODH4lpTukLUWaN1N+pFpfyOQerOUhGdF7TfTvBWY6fXGhQ5eNpDvxdwjvuI12+57
+RCxnrw0M9v8T+ZEMF4f0vM8z9a0Rls097a246yAn040SjMJm4+FKWLB2/KXvh/nl
+xOFk81TKl6f7/Sx+GKCOSLasdAwp5G/8uyzNH0i2q9yPkw/OR3QutntBphBq+RGH
+pEBwshQNOwAwKlkIYiYv+KEZiL3JAZdPiV5p9HOIKtaIjzGZOFJqv/fZJLJDJZHu
+FUwO8u1RAoGBAOVX6rW8Nnf3jh1ZqZu6Qr2bHT8p41k57m5airBqlqzFZesSwErO
+X2utli11j+thZSuFG+pH70MTH2f3Me4AhOVO2wrllzo8ygjymjTWYm0217g5dYWy
+NLqhRlD2FEKdXpfTIUYc52cbYEG4/q9L2MKBws/S6cKJZN5le8LsNrVHAoGBANq7
+s4JGEsgzEhx1IwdygqdZHkiE4plmT1i4ufogw1jLl4hN8ajF6pAHuF7ZiJDlQ+QR
+M+tFrC0XEg7c23M9LgrnXRMUbKvjTsN3OnTZCKZ/U7Gimnfu4j6Lwu13l3AZLjWv
+6cOx+B3bO31caa5rbsrCJNYTiFJQtn6Kuq5eR9RHAoGANbS/1u0LSE6lbgQYLsqQ
+ZHxVffweLD/fsOHtBmD/hdf4BPNwKlRnjfypZV3ZZQJ6wZU7M0LDKc7plNwTSiu+
+8z/jFYssPcwMd8nwCJ2HkRG9tHtoJPXVWTr5D26A0hSuGRms+hE8sy16/gkHQx+c
+0/e1GhvG9/Jat6XGNKBCM3MCgYB9jVRCqNpLFBWHRC1xLueUC3F4it5O0w0kjhQi
+YQAATgyS7rqR99jmB6hquU7MbO0FUmKM35cu4pk48sj8Yte05ozkUMr39yfUxvFE
+9PckKt3tjro/sV0oLa0cBZNlgu6lXm4+KD+VU6vYD12SApS7yai/QML1DONTy2nQ
+gMmWLQKBgA3Nb5bFkb6NPHtb7JhJLB2/veuhOGN3yEEs1GFjLF5sgR8Fq+R84tWM
+tGE9mqHGkhvrTlgAdY8DRZjcBg0VimUnXNCRn5wXmhcFs44zsprdr8+2Srcit6n4
+j/6HocZNXy46lBAFY7PYUYDfDSbyFRMfx3l34lV+tc9iM8HaWfev
+-----END RSA PRIVATE KEY-----
diff --git a/integration-tests/paho-mqtt5/src/test/resources/certs/server.pem
b/integration-tests/paho-mqtt5/src/test/resources/certs/server.pem
new file mode 100644
index 0000000000..ddad4073b4
--- /dev/null
+++ b/integration-tests/paho-mqtt5/src/test/resources/certs/server.pem
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDVDCCAjwCFD0B2OCkiJZYwb3trDFCX7mQ9wc1MA0GCSqGSIb3DQEBCwUAMGUx
+CzAJBgNVBAYTAkNOMRAwDgYDVQQIDAdCZWlqaW5nMRUwEwYDVQQHDAxEZWZhdWx0
+IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFueSBMdGQxDzANBgNVBAMMBnRl
+c3RDQTAeFw0yMjA0MDExMTQwMTBaFw0zMjAzMjkxMTQwMTBaMGgxCzAJBgNVBAYT
+AkNOMRAwDgYDVQQIDAdCZWlqaW5nMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAa
+BgNVBAoME0RlZmF1bHQgQ29tcGFueSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDCC
+ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMP1Behjj/6MvL5cz84EXbWq
+C3mJxlvgxmJ23b96mOC1qSPgRDKqa1VAuvY74Dc8IEV5rncdQzOvLtR1r10e8gNo
+RzyarIZT6ZankWq1+4vUMK3U4BYphLsNwSnmI4KhPXXbWMg03FRepp3EQereEvVa
+5LlQZSLpLdJX2VbNibmjSqUaXP5kNOA3HQPEKxF0zw86YqwkwutadjrMpXYZpG70
+vVKVeLq+pZAkuCifFWSxnSXkpExiO6jnuliUvOfbpJoGY52h7hHBcy3G9MsA7m7D
+tavnTWisg/I7txhSYti5xSsJZDe8Pm/86vF5gCCr6eqtgLxv46YJlZkkqyn8ErEC
+AwEAATANBgkqhkiG9w0BAQsFAAOCAQEAHFNHzAEW5x2A6dSoxopTR/SB1ffGnhfs
+6mkMBFppYYCYwJyWSgnWYsBB/m2xakyCpup1mPT6AAb8NiS2c0R9dEQsEtgwwc/j
+6KihqXSVBmux7pvMc6vLwGVYlHH7oAodS+ZPQUSs0wii++NzugRsHwwIdvGpUbVg
+Tc8VAiDHYG7z88l/m5zWsKMQp8FK1PyhTL0eDGK8eCfuWTfM2lf7ITuAjhKXLbPY
+/vHTp13/+IfuAREcl73iXm57epIgG715T6J3+jdxyqzKdkx2vkEo3dUWusu7WR73
+PsYsjXPTOI2/zUkkiVqwwIiVOLR9E332aOxeqZaqLBrYze9PHB1lPg==
+-----END CERTIFICATE-----
diff --git a/integration-tests/paho-mqtt5/src/test/resources/mosquitto.conf
b/integration-tests/paho-mqtt5/src/test/resources/mosquitto.conf
new file mode 100644
index 0000000000..921dc8fc27
--- /dev/null
+++ b/integration-tests/paho-mqtt5/src/test/resources/mosquitto.conf
@@ -0,0 +1,14 @@
+password_file /etc/mosquitto/password
+allow_anonymous false
+
+listener 1883
+protocol mqtt
+
+listener 8883
+protocol mqtt
+cafile /etc/mosquitto/certs/ca.pem
+certfile /etc/mosquitto/certs/server.pem
+keyfile /etc/mosquitto/certs/server.key
+
+listener 9001
+protocol websockets
diff --git a/integration-tests/paho-mqtt5/src/test/resources/password.conf
b/integration-tests/paho-mqtt5/src/test/resources/password.conf
new file mode 100644
index 0000000000..822b87dc60
--- /dev/null
+++ b/integration-tests/paho-mqtt5/src/test/resources/password.conf
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+quarkus:$6$lEzlzdx4trOUKmNr$3qsGL/iTjrQpI5KZt15fh1gS4cetBtbeWANXOIVD+JFcMbuJy0Kh+pdcXIF9m2diq+Ge8sTU9Cb4aU4Ak2To/g==