This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9bc44e2c8f TLS  Support for MQTT Data Sink  (#3895)
9bc44e2c8f is described below

commit 9bc44e2c8fc9d7660edcdfbf932e18a5f2cd989a
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Thu Nov 20 15:17:02 2025 +0100

    TLS  Support for MQTT Data Sink  (#3895)
    
    Co-authored-by: Philipp Zehnder <[email protected]>
---
 .github/workflows/mvn-integration-test.yml         |  56 ++++++
 pom.xml                                            |   9 +
 .../commons/environment/Environment.java           |   2 +-
 .../iiot/IIoTAdaptersExtensionModuleExport.java    |   3 +-
 .../connect/iiot/adapters/oi4/Oi4Adapter.java      |   6 +-
 .../oi4/migration/Oi4AdapterMigrationV1.java       |  84 ++++++++
 .../strings.en                                     |  15 +-
 .../strings.en                                     |   2 +-
 .../mqtt/MqttConnectorsModuleExport.java           |   8 +-
 .../connectors/mqtt/adapter/MqttProtocol.java      |   6 +-
 .../mqtt/migration/MQTTAdapterMigrationV1.java     |  87 ++++++++
 .../mqtt/migration/MQTTSinkMigrationV1.java        | 124 ++++++++++++
 .../connectors/mqtt/security/SecurityUtils.java    | 191 ++++++++++++++++++
 .../connectors/mqtt/shared/MqttBase.java           | 113 +++++++++++
 .../connectors/mqtt/shared/MqttConfig.java         | 179 +++++++++++++++-
 .../connectors/mqtt/shared/MqttConnectUtils.java   | 224 ++++++++++++++++++++-
 .../connectors/mqtt/shared/MqttConsumer.java       | 101 +++++-----
 .../connectors/mqtt/shared/MqttPublisher.java      |  98 +++++++++
 .../connectors/mqtt/sink/MqttPublisherSink.java    | 170 +++++-----------
 .../connectors/mqtt/sink/common/MqttClient.java    | 165 ---------------
 .../connectors/mqtt/sink/common/MqttOptions.java   | 181 -----------------
 .../connectors/mqtt/sink/common/MqttUtils.java     |  77 -------
 .../documentation.md                               |   7 +-
 .../strings.en                                     |  15 +-
 .../strings.en                                     |  38 ++--
 streampipes-integration-tests/pom.xml              |  63 +++++-
 .../integration/adapters/AdapterTesterBase.java    |  11 +-
 ...ptersTest.java => AdaptersIntegrationTest.java} |  37 +++-
 .../integration/adapters/MQTTPublisherUtils.java   |  61 ++++++
 ...dapterTester.java => MqttAdapterTLSTester.java} |  37 +---
 .../integration/adapters/MqttAdapterTester.java    |  31 +--
 ...est.java => ClientLiveDataIntegrationTest.java} |   2 +-
 .../integration/containers/MosquittoContainer.java |  27 ++-
 .../src/test/resources/cacerts.pfx                 | Bin 0 -> 187040 bytes
 .../src/test/resources/mosquitto copy.conf         |  43 ++++
 .../src/test/resources/mosquitto.conf              |  12 +-
 .../src/test/resources/mosquitto.crt               |  21 ++
 .../src/test/resources/mosquitto.key               |  28 +++
 .../src/test/resources/passwd                      |   1 +
 .../apache/streampipes/sdk/StaticProperties.java   |  15 ++
 40 files changed, 1645 insertions(+), 705 deletions(-)

diff --git a/.github/workflows/mvn-integration-test.yml 
b/.github/workflows/mvn-integration-test.yml
new file mode 100644
index 0000000000..0f95d120f1
--- /dev/null
+++ b/.github/workflows/mvn-integration-test.yml
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name: run-mvn-integration-tests
+
+on:
+  pull_request:
+
+
+jobs:
+  build_and_integration_test:
+    runs-on: ubuntu-latest 
+
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v5
+
+      - name: Set up JDK 17
+        uses: actions/setup-java@v5
+        with:
+          distribution: 'temurin'
+          java-version: '17'
+          cache: 'maven'
+      - name: Set up Docker
+        uses: docker/setup-buildx-action@v2
+        with:
+          version: latest
+      - name: Install Maven
+        run: |
+          sudo apt update
+          sudo apt install -y maven
+       # Step 1: Build everything but skip tests
+      - name: Build all StreamPipes modules (skip tests)
+        run: mvn -B clean install -DskipTests
+
+      # Step 2: Run only integration tests (selfsigned)
+      - name: Run Integration Tests (selfsigned)
+        run: |
+          mvn -B test -P selfsigned -pl streampipes-integration-tests 
-Dtest='**/*Test'
+
+      # Step 3: Run only integration tests (keystore)
+      - name: Run Integration Tests (keystore)
+        run: |
+          mvn -B test -P keystore -pl streampipes-integration-tests 
-Dtest='**/*Test'
+ 
diff --git a/pom.xml b/pom.xml
index ccd94266e7..8eead72d48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1498,6 +1498,15 @@
                         <!-- Exclude GO -->
                         <exclude>**/go.sum</exclude>
 
+                       
+
+          <!-- Resource files for Integration Test-->
+          
<exclude>streampipes-integration-tests/src/test/resources/mosquitto.crt</exclude>
+          <exclude>streampipes-integration-tests/src/test/resources/mosquitto 
copy.conf</exclude>
+          
<exclude>streampipes-integration-tests/src/test/resources/mosquitto.key</exclude>
+          
<exclude>streampipes-integration-tests/src/test/resources/passwd</exclude>
+
+
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 628f03ef40..af604ac00e 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -187,7 +187,7 @@ public interface Environment {
   StringEnvironmentVariable getTruststoreType();
 
   BooleanEnvironmentVariable getAllowSelfSignedCertificates();
-
+  
   IntEnvironmentVariable getPlc4xMaxWaitTimeMs();
 
   IntEnvironmentVariable getPlc4xMaxLeaseTimeMs();
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
index 31c984e0b6..5c9dbf874c 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.connect.iiot;
 
 import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
+import 
org.apache.streampipes.connect.iiot.adapters.oi4.migration.Oi4AdapterMigrationV1;
 import 
org.apache.streampipes.connect.iiot.adapters.simulator.machine.MachineDataSimulatorAdapter;
 import org.apache.streampipes.connect.iiot.protocol.stream.FileReplayAdapter;
 import org.apache.streampipes.connect.iiot.protocol.stream.HttpServerProtocol;
@@ -50,6 +51,6 @@ public class IIoTAdaptersExtensionModuleExport implements 
IExtensionModuleExport
 
   @Override
   public List<IModelMigrator<?, ?>> migrators() {
-    return List.of();
+    return List.of(new Oi4AdapterMigrationV1());
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
index b191a47922..b83a98a249 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
@@ -99,13 +99,13 @@ public class Oi4Adapter implements StreamPipesAdapter {
   public IAdapterConfiguration declareConfig() {
 
     return AdapterConfigurationBuilder
-        .create(ID, 0, Oi4Adapter::new)
+        .create(ID, 1, Oi4Adapter::new)
         .withLocales(Locales.EN)
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
         .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
-        .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAlternativesOne(),
-            MqttConnectUtils.getAlternativesTwo()
+        .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAnonymousAccess(),
+            MqttConnectUtils.getUsernameAccess(), 
MqttConnectUtils.getClientCertAccess()
         )
         .requiredAlternatives(
             Labels.withId(OI4AdapterLabels.LABEL_KEY_SENSOR_DESCRIPTION),
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java
new file mode 100644
index 0000000000..be2281bbd0
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.iiot.adapters.oi4.migration;
+
+import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import java.util.List;
+
+public class Oi4AdapterMigrationV1 implements IAdapterMigrator {
+
+    @Override
+    public ModelMigratorConfig config() {
+        return new ModelMigratorConfig(
+                Oi4Adapter.ID,
+                SpServiceTagPrefix.ADAPTER,
+                0,
+                1);
+
+    }
+
+    @Override
+    public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+            IStaticPropertyExtractor extractor) throws RuntimeException {
+
+        changeUrlDescription(element);
+
+        accessModeDescription(element);
+
+        migrateSecurity((StaticPropertyAlternatives) 
element.getConfig().get(1));
+
+        return MigrationResult.success(element);
+    }
+
+    private void migrateSecurity(StaticPropertyAlternatives 
securityAlternatives) {
+        migrateGroup(securityAlternatives.getAlternatives());
+    }
+
+    private void changeUrlDescription(AdapterDescription element) {
+        var url = (FreeTextStaticProperty) element.getConfig().get(0);
+        url.setDescription(
+                "Example: tcp://test-server.com:1883 (Protocol required. Port 
required), with TLS ssl://test-server.com:8883 (Protocol required. Port 
required)");
+        element.getConfig().set(0, url);
+    }
+
+    private void accessModeDescription(AdapterDescription element) {
+        var accessmode = (StaticPropertyAlternatives) 
element.getConfig().get(1);
+
+        accessmode.setLabel("User Authentication");
+        accessmode.setDescription(
+                "Choose an authentication method for the user");
+        element.getConfig().set(1, accessmode);
+    }
+
+    private void migrateGroup(List<StaticPropertyAlternative> alternatives) {
+        alternatives.add(MqttConnectUtils.getClientCertAccess());
+
+    }
+
+}
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
index 5acd27029c..d861e1f675 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
@@ -19,8 +19,8 @@
 org.apache.streampipes.connect.iiot.adapters.oi4.title=OI4
 org.apache.streampipes.connect.iiot.adapters.oi4.description=Connects to an 
Open Industry 4.0 (OI4)-compatible device via MQTT
 
-access-mode.title=Access Mode
-access-mode.description=Provide user credentials for the MQTT broker
+access-mode.title=User Authentication
+access-mode.description=Choose an authentication method for the user
 
 anonymous-alternative.title=Unauthenticated
 anonymous-alternative.description=
@@ -34,11 +34,20 @@ username-group.description=
 username.title=Username
 username.description=
 
+client-cert-alternative.title= Client Certificate
+client-cert-alternative.description=
+
+clientcert.title=Certificate PEM
+clientcert.description = Public key in PEM format
+
+clientkey.title=Private Key PEM
+clientkey.description=Private key in PEM format
+
 password.title=Password
 password.description=
 
 broker_url.title=Broker URL
-broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required)"
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required), with TLS ssl://test-server.com:8883 (Protocol required. Port 
required)"
 
 topic.title=Topic
 topic.description=Example: test/topic
diff --git 
a/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
 
b/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
index 9cd6bb514d..c51d33b164 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
+++ 
b/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
@@ -38,7 +38,7 @@ password.title=Password
 password.description=
 
 broker_url.title=Broker URL
-broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required)"
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required), , with TLS ssl://test-server.com:8883 (Protocol required. Port 
required)"
 
 topic.title=Topic
 topic.description=Example: test/topic
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
index a1c72f0fdf..2f47367fab 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
@@ -23,9 +23,10 @@ import 
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import org.apache.streampipes.extensions.connectors.mqtt.adapter.MqttProtocol;
+import 
org.apache.streampipes.extensions.connectors.mqtt.migration.MQTTAdapterMigrationV1;
+import 
org.apache.streampipes.extensions.connectors.mqtt.migration.MQTTSinkMigrationV1;
 import 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink;
 
-import java.util.Collections;
 import java.util.List;
 
 public class MqttConnectorsModuleExport implements IExtensionModuleExport {
@@ -45,6 +46,9 @@ public class MqttConnectorsModuleExport implements 
IExtensionModuleExport {
 
   @Override
   public List<IModelMigrator<?, ?>> migrators() {
-    return Collections.emptyList();
+    return List.of(
+ new MQTTAdapterMigrationV1(),
+ new MQTTSinkMigrationV1()
+    );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
index af9f02893a..e50a234a1b 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
@@ -65,14 +65,14 @@ public class MqttProtocol implements StreamPipesAdapter {
   @Override
   public IAdapterConfiguration declareConfig() {
     return AdapterConfigurationBuilder
-        .create(ID, 0, MqttProtocol::new)
+        .create(ID, 1, MqttProtocol::new)
         .withSupportedParsers(Parsers.defaultParsers())
         .withLocales(Locales.EN)
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
         .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
-        .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAlternativesOne(),
-            MqttConnectUtils.getAlternativesTwo())
+        .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAnonymousAccess(),
+            MqttConnectUtils.getUsernameAccess(),  
MqttConnectUtils.getClientCertAccess())
         .requiredTextParameter(MqttConnectUtils.getTopicLabel())
         .buildConfiguration();
   }
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTAdapterMigrationV1.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTAdapterMigrationV1.java
new file mode 100644
index 0000000000..1323d5e732
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTAdapterMigrationV1.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.mqtt.adapter.MqttProtocol;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import java.util.List;
+
+public class MQTTAdapterMigrationV1 implements IAdapterMigrator {
+
+
+    @Override
+    public ModelMigratorConfig config() {
+        return new ModelMigratorConfig(
+                MqttProtocol.ID,
+                SpServiceTagPrefix.ADAPTER,
+                0,
+                1);
+
+    }
+
+    @Override
+    public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+            IStaticPropertyExtractor extractor) throws RuntimeException { 
+
+        changeUrlDescription(element);
+
+        accessModeDescription(element);
+
+        migrateSecurity((StaticPropertyAlternatives) 
element.getConfig().get(1));
+
+        return MigrationResult.success(element);
+    }
+
+    private void migrateSecurity(StaticPropertyAlternatives 
securityAlternatives) {
+        migrateGroup(securityAlternatives.getAlternatives());
+    }
+
+
+
+    private void changeUrlDescription(AdapterDescription element){
+        var url = (FreeTextStaticProperty) element.getConfig().get(0);
+        url.setDescription(
+                "Example: tcp://test-server.com:1883 (Protocol required. Port 
required), with TLS ssl://test-server.com:8883 (Protocol required. Port 
required)");
+        element.getConfig().set(0, url);
+    }
+
+    private void accessModeDescription(AdapterDescription element){
+        var accessmode = (StaticPropertyAlternatives) 
element.getConfig().get(1);
+
+        accessmode.setLabel("User Authentication");
+        accessmode.setDescription(
+                "Choose an authentication method for the user");
+        element.getConfig().set(1, accessmode);
+    }
+
+    private void migrateGroup(List<StaticPropertyAlternative> alternatives) {
+        alternatives.add(MqttConnectUtils.getClientCertAccess());
+
+    }
+
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTSinkMigrationV1.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTSinkMigrationV1.java
new file mode 100644
index 0000000000..8e790bfd4a
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTSinkMigrationV1.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.sdk.StaticProperties;
+
+public class MQTTSinkMigrationV1 implements IDataSinkMigrator {
+
+        @Override
+        public ModelMigratorConfig config() {
+                return new ModelMigratorConfig(
+                                
"org.apache.streampipes.sinks.brokers.jvm.mqtt",
+                                SpServiceTagPrefix.DATA_SINK,
+                                0,
+                                1);
+
+        }
+
+        @Override
+        public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
+                        IDataSinkParameterExtractor extractor) throws 
RuntimeException {
+                // migrate Topic
+                var topic = 
migrateTopicToNewNaming(element.getStaticProperties().get(0));
+                // Migrate DAta from Host +Port + Protocol to URI
+                migrateData(element);
+                // SORT THE ITEMS
+                element.getStaticProperties().set(4, topic);
+                // Remove TLS
+                element.getStaticProperties().remove(2);
+                // Remove Port
+                element.getStaticProperties().remove(1);
+                // Add Certificate Option
+                migrateSecurity(element);
+                return MigrationResult.success(element);
+        }
+
+        private StaticProperty migrateTopicToNewNaming(StaticProperty topic) {
+                topic.setLabel(MqttConnectUtils.getTopicLabel().getLabel());
+                
topic.setInternalName(MqttConnectUtils.getTopicLabel().getInternalId());
+                return topic;
+
+        }
+
+        private String buildBrokerURI(DataSinkInvocation element) {
+
+                var host = ((FreeTextStaticProperty) 
element.getStaticProperties().get(1)).getValue();
+                var port = ((FreeTextStaticProperty) 
element.getStaticProperties().get(2)).getValue();
+                var encryptionAlternative = ((OneOfStaticProperty) 
element.getStaticProperties().get(4))
+                                .getOptions();
+                var encryption = "";
+                for (var i = 0; i < encryptionAlternative.size(); i++) {
+                        Option alternative = encryptionAlternative.get(i);
+
+                        if (alternative.isSelected()) {
+                                encryption = alternative.getName();
+                        }
+                }
+                String protocol = "tcp";
+
+                if ("SSL".equalsIgnoreCase(encryption)) {
+                        protocol = "ssl";
+                }
+
+                var brokerUri = protocol + "://" + host + ":" + port;
+                return brokerUri;
+
+        }
+
+        private void migrateData(DataSinkInvocation element) {
+
+                var brokerUri = buildBrokerURI(element);
+
+                var broker = 
StaticProperties.stringFreeTextProperty(MqttConnectUtils.getBrokerUrlLabel(), 
brokerUri);
+
+                element.getStaticProperties().set(0, broker);
+
+        }
+
+        private void migrateSecurity(DataSinkInvocation element) {
+                var oldSecurityAlternatives = (StaticPropertyAlternatives) 
element.getStaticProperties().get(1);
+
+                var securityAlternative = 
StaticProperties.alternatives(MqttConnectUtils.getAccessModeLabel(),
+                                MqttConnectUtils.getAnonymousAccess(),
+                                MqttConnectUtils.getUsernameAccess(), 
MqttConnectUtils.getClientCertAccess());
+                for (var i = 0; i < 
oldSecurityAlternatives.getAlternatives().size(); i++) {
+                        StaticPropertyAlternative alternative = 
oldSecurityAlternatives.getAlternatives().get(i);
+                        if (alternative.getSelected()) {
+                                
securityAlternative.getAlternatives().get(i).setSelected(true);
+                        } else {
+                                
securityAlternative.getAlternatives().get(i).setSelected(false);
+                        }
+                }
+                element.getStaticProperties().set(1, securityAlternative);
+        }
+
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/security/SecurityUtils.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/security/SecurityUtils.java
new file mode 100644
index 0000000000..6834c0513b
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/security/SecurityUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.security;
+
+import org.apache.streampipes.commons.environment.Environments;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.KeyFactory;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Base64;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SecurityUtils {
+
+    public static TrustManager[] acceptAllCerts() {
+        return new TrustManager[] {
+                new X509TrustManager() {
+                    public X509Certificate[] getAcceptedIssuers() {
+                        return null;
+                    }
+
+                    public void checkClientTrusted(X509Certificate[] certs, 
String authType) {
+                    }
+
+                    public void checkServerTrusted(X509Certificate[] certs, 
String authType) {
+                    }
+                }
+        };
+    }
+
+    public static X509Certificate getServerCertificate(String host, int port)
+            throws NoSuchAlgorithmException, IOException, CertificateException 
{
+
+        SSLSocketFactory factory = (SSLSocketFactory) 
SSLSocketFactory.getDefault();
+        try (Socket socket = factory.createSocket(host, port)) {
+
+            ((SSLSocket) socket).startHandshake();
+
+            SSLSession session = ((SSLSocket) socket).getSession();
+            var certChain = session.getPeerCertificates();
+
+            return (X509Certificate) certChain[0];
+        }
+    }
+
+    public static KeyStore loadServerKeyStore() throws FileNotFoundException, 
KeyStoreException, IOException,
+            NoSuchAlgorithmException, CertificateException {
+
+             var env = Environments.getEnvironment();
+        String keystoreFilename = 
env.getKeystoreFilename().getValueOrDefault();
+        String keystoreType = env.getKeystoreType().getValueOrDefault();
+        String keystorePassword = 
env.getKeystorePassword().getValueOrDefault();
+        try (FileInputStream keystoreFile = new 
FileInputStream(keystoreFilename)) {
+            KeyStore keystore = KeyStore.getInstance(keystoreType);
+            keystore.load(keystoreFile, keystorePassword.toCharArray());
+            return keystore;
+        } catch (IOException | NoSuchAlgorithmException | CertificateException 
e) {
+            throw e;
+        }
+    }
+
+    public static KeyManager[] loadClientKeyManagers(String certPem, String 
keyPem) throws Exception {
+        X509Certificate certificate = parseCertificateFromPem(certPem);
+        PrivateKey privateKey = parsePrivateKeyFromPem(keyPem);
+
+        String password = ""; // no password for in-memory keystore
+        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+        keyStore.load(null, null);
+        keyStore.setKeyEntry("client", privateKey, password.toCharArray(), new 
java.security.cert.Certificate[]{certificate});
+
+        KeyManagerFactory kmf = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        kmf.init(keyStore, password.toCharArray());
+        return kmf.getKeyManagers();
+    }
+
+    public static X509Certificate parseCertificateFromPem(String pem) throws 
Exception {
+        String normalized = pem
+                .replace("-----BEGIN CERTIFICATE-----", "")
+                .replace("-----END CERTIFICATE-----", "")
+                .replaceAll("\\s+", "");
+        byte[] decoded = Base64.getDecoder().decode(normalized);
+        CertificateFactory cf = CertificateFactory.getInstance("X.509");
+        return (X509Certificate) cf.generateCertificate(new 
ByteArrayInputStream(decoded));
+    }
+
+    public static PrivateKey parsePrivateKeyFromPem(String pem) throws 
Exception {
+        pem = pem.replace("\\n", "\n")
+                 .replace("\\r", "")
+                 .replace("\r", "")
+                 .trim();
+
+        if (pem.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+            return parsePkcs1PrivateKey(pem);
+        } else {
+            throw new IllegalArgumentException("Unsupported key format: 
missing BEGIN/END markers");
+        }
+    }
+
+    public static PrivateKey parsePkcs1PrivateKey(String pem) throws Exception 
{
+        Pattern p = Pattern.compile(
+            "-----BEGIN RSA PRIVATE KEY-----([A-Za-z0-9+/=\\s]+)-----END RSA 
PRIVATE KEY-----"
+        );
+        Matcher m = p.matcher(pem.replaceAll("\\s+", ""));
+        if (!m.find()) {
+            pem = pem.replaceAll("-----BEGIN RSA PRIVATE KEY-----", "")
+                     .replaceAll("-----END RSA PRIVATE KEY-----", "")
+                     .replaceAll("[\\s\\r\\n]", "")
+                     .trim();
+        } else {
+            pem = m.group(1);
+        }
+
+        byte[] pkcs1Bytes = Base64.getMimeDecoder().decode(pem);
+        byte[] pkcs8Bytes = convertPkcs1ToPkcs8(pkcs1Bytes);
+
+        PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkcs8Bytes);
+        KeyFactory keyFactory = KeyFactory.getInstance("RSA");
+        return keyFactory.generatePrivate(keySpec);
+    }
+
+    private static byte[] convertPkcs1ToPkcs8(byte[] pkcs1Bytes) throws 
IOException {
+        final byte[] pkcs8Header = new byte[] {
+            0x30, (byte)0x82, 
+            0, 0, 
+            0x02, 0x01, 0x00,
+            0x30, 0x0d,
+            0x06, 0x09,
+            0x2a, (byte)0x86, 0x48, (byte)0x86, (byte)0xf7, 0x0d, 0x01, 0x01, 
0x01,
+            0x05, 0x00,
+            0x04, (byte)0x82,
+            0, 0
+        };
+
+        int pkcs1Length = pkcs1Bytes.length;
+        int totalLength = pkcs8Header.length + pkcs1Length;
+
+        pkcs8Header[2] = (byte)((totalLength - 4) >> 8);
+        pkcs8Header[3] = (byte)(totalLength - 4);
+        pkcs8Header[pkcs8Header.length - 2] = (byte)(pkcs1Length >> 8);
+        pkcs8Header[pkcs8Header.length - 1] = (byte)(pkcs1Length);
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(pkcs8Header);
+        out.write(pkcs1Bytes);
+        return out.toByteArray();
+    }
+
+    public static TrustManagerFactory createTrustManagerFactory(KeyStore 
keystore) throws Exception {
+        TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(keystore);
+        return trustManagerFactory;
+    }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
new file mode 100644
index 0000000000..51a9160391
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.shared;
+
+import org.apache.streampipes.commons.environment.Environments;
+import 
org.apache.streampipes.extensions.connectors.mqtt.security.SecurityUtils;
+
+import org.fusesource.mqtt.client.MQTT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+
+public class MqttBase {
+
+    protected final MqttConfig mqttConfig;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MqttBase.class);
+
+    public MqttBase(MqttConfig mqttConfig) {
+        this.mqttConfig = mqttConfig;
+    }
+
+    protected MQTT setupMqttClient() throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setHost(mqttConfig.getUrl());
+        mqtt.setConnectAttemptsMax(1);
+
+        if (mqttConfig.getAuthenticated()) {
+            mqtt.setUserName(mqttConfig.getUsername());
+            mqtt.setPassword(mqttConfig.getPassword());
+        }
+
+        if (tlsEnabled(new URI(mqttConfig.getUrl()))) {
+            configureTls(mqtt);
+        }
+
+        return mqtt;
+    }
+
+
+private static boolean tlsEnabled(URI brokerUri) {
+    String protocol = brokerUri.getScheme();
+    if (protocol == null) {
+      return false;
+    }
+    String proto = protocol.toLowerCase();
+    return proto.equals("ssl") || proto.equals("tls") || proto.equals("mqtts");
+  }
+    private void configureTls(MQTT mqtt) throws Exception {
+        LOG.info("Configuring TLS for MQTT connection...");
+        KeyStore keyStore = null;
+
+        var env = Environments.getEnvironment();
+        boolean acceptAllCerts = 
env.getAllowSelfSignedCertificates().getValueOrDefault();
+
+        if (acceptAllCerts) {
+            LOG.info("Accepting all certificates...");
+            SSLContext sslContext = SSLContext.getInstance("TLS");
+            sslContext.init(null, SecurityUtils.acceptAllCerts(), new 
SecureRandom());
+            mqtt.setSslContext(sslContext);
+            return;
+        }
+
+        keyStore = loadKeyStore();
+        TrustManagerFactory trustManagerFactory = 
SecurityUtils.createTrustManagerFactory(keyStore);
+
+        KeyManager[] keyManagers = null;
+        if (mqttConfig.getClientCertificatePath() != null && 
mqttConfig.getClientKeyPath() != null) {
+            keyManagers = SecurityUtils.loadClientKeyManagers(
+                    mqttConfig.getClientCertificatePath(),
+                    mqttConfig.getClientKeyPath());
+        }
+
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(keyManagers, trustManagerFactory.getTrustManagers(), 
new SecureRandom());
+        mqtt.setSslContext(sslContext);
+    }
+
+    private KeyStore loadKeyStore() throws IOException, 
NoSuchAlgorithmException, CertificateException, KeyStoreException {
+        try {
+            return SecurityUtils.loadServerKeyStore();
+        } catch (IOException | NoSuchAlgorithmException | CertificateException 
| KeyStoreException e) {
+            LOG.error("Error loading keystore from file: {}", e);
+            throw e;  // Re-throwing to handle it at the top level
+        }
+    }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
index 4a3b30a224..25c8a9ae07 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
@@ -17,6 +17,8 @@
  */
 package org.apache.streampipes.extensions.connectors.mqtt.shared;
 
+import org.fusesource.mqtt.client.QoS;
+
 public class MqttConfig {
 
   private Boolean authenticated;
@@ -26,6 +28,23 @@ public class MqttConfig {
   private String username;
   private String password;
 
+  private String clientCertificate = null;
+  private String clientKey = null;
+  private Boolean tls = false;
+
+  private boolean isLastWill = false;
+  private QoS willQoS = QoS.AT_MOST_ONCE;
+  private Boolean willRetain = false;
+  private String willTopic = "";
+  private String willMessage = "";
+  private String mqttProtocolVersion = "3.1";
+  private QoS qos = QoS.AT_MOST_ONCE;
+  private long reconnectDelayMaxInMs = 10000L;
+  private boolean cleanSession = true;
+  private boolean retain = false;
+  private short keepAliveInSec = 60;
+  private String clientId = "";
+
   public MqttConfig(String url, String topic) {
     this.authenticated = false;
     this.url = url;
@@ -39,23 +58,181 @@ public class MqttConfig {
     this.password = password;
   }
 
+  public MqttConfig(String url, String topic, Boolean tlsEnabled, String 
clientCertificate, String clientKey) {
+    this.authenticated = false;
+    this.url = url;
+    this.topic = topic;
+    this.tls = tlsEnabled;
+    this.clientCertificate = clientCertificate;
+    this.clientKey = clientKey;
+  }
+
+  public MqttConfig(String url, String topic, String username, String 
password, Boolean tlsEnabled) {
+    this(url, topic, username, password);
+    this.tls = tlsEnabled;
+  }
+
+  public MqttConfig(String url, String topic, Boolean tlsEnabled) {
+    this(url, topic);
+    this.tls = tlsEnabled;
+  }
+
   public Boolean getAuthenticated() {
     return authenticated;
   }
 
+  public void setAuthenticated(Boolean authenticated) {
+    this.authenticated = authenticated;
+  }
+
   public String getUrl() {
     return url;
   }
 
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
   public String getTopic() {
     return topic;
   }
 
+  public void setTopic(String topic) {
+    this.topic = topic;
+  }
+
   public String getUsername() {
     return username;
   }
 
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
   public String getPassword() {
     return password;
   }
-}
+    public String getClientId() {
+    return clientId;
+  }
+
+      public void setClientId(String clientId) {
+    this.clientId = clientId;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getClientCertificatePath() {
+    return clientCertificate;
+  }
+
+  public void setClientCertificatePath(String clientCertificate) {
+    this.clientCertificate = clientCertificate;
+  }
+
+  public String getClientKeyPath() {
+    return clientKey;
+  }
+
+  public void setClientKey(String clientKey) {
+    this.clientKey = clientKey;
+  }
+
+  public Boolean getTlsEnabled() {
+    return tls;
+  }
+
+  public void setTlsEnabled(Boolean tlsEnabled) {
+    this.tls = tlsEnabled;
+  }
+
+  public boolean isLastWill() {
+    return isLastWill;
+  }
+
+  public void setLastWill(boolean lastWill) {
+    this.isLastWill = lastWill;
+  }
+
+  public QoS getWillQoS() {
+    return willQoS;
+  }
+
+  public void setWillQoS(QoS willQoS) {
+    this.willQoS = willQoS;
+  }
+
+  public Boolean getWillRetain() {
+    return willRetain;
+  }
+
+  public void setWillRetain(Boolean willRetain) {
+    this.willRetain = willRetain;
+  }
+
+  public String getWillTopic() {
+    return willTopic;
+  }
+
+  public void setWillTopic(String willTopic) {
+    this.willTopic = willTopic;
+  }
+
+  public String getWillMessage() {
+    return willMessage;
+  }
+
+  public void setWillMessage(String willMessage) {
+    this.willMessage = willMessage;
+  }
+
+  public String getMqttProtocolVersion() {
+    return mqttProtocolVersion;
+  }
+
+  public void setMqttProtocolVersion(String mqttProtocolVersion) {
+    this.mqttProtocolVersion = mqttProtocolVersion;
+  }
+
+  public QoS getQos() {
+    return qos;
+  }
+
+  public void setQos(QoS qos) {
+    this.qos = qos;
+  }
+
+  public long getReconnectDelayMaxInMs() {
+    return reconnectDelayMaxInMs;
+  }
+
+  public void setReconnectDelayMaxInMs(long reconnectDelayMaxInMs) {
+    this.reconnectDelayMaxInMs = reconnectDelayMaxInMs;
+  }
+
+  public boolean isCleanSession() {
+    return cleanSession;
+  }
+
+  public void setCleanSession(boolean cleanSession) {
+    this.cleanSession = cleanSession;
+  }
+
+  public boolean isRetain() {
+    return retain;
+  }
+
+  public void setRetain(boolean retain) {
+    this.retain = retain;
+  }
+
+  public short getKeepAliveInSec() {
+    return keepAliveInSec;
+  }
+
+  public void setKeepAliveInSec(short keepAliveInSec) {
+    this.keepAliveInSec = keepAliveInSec;
+  }
+}
\ No newline at end of file
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
index 6b4b4a7e55..b579fc5f16 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
@@ -18,26 +18,56 @@
 
 package org.apache.streampipes.extensions.connectors.mqtt.shared;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Label;
 import org.apache.streampipes.sdk.helpers.Labels;
 
+import org.fusesource.mqtt.client.QoS;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
 public class MqttConnectUtils {
 
   /**
    * Keys of user configuration parameters
    */
+  // Adapter
   public static final String ACCESS_MODE = "access-mode";
   public static final String ANONYMOUS_ACCESS = "anonymous-alternative";
   public static final String USERNAME_ACCESS = "username-alternative";
+  public static final String CLIENT_CERT_ACCESS = "client-cert-alternative";
   public static final String USERNAME_GROUP = "username-group";
+  public static final String CLIENT_CERT_GROUP = "client-cert-group";
   public static final String USERNAME = "username";
   public static final String PASSWORD = "password";
+  public static final String CLIENTCERT = "clientcert";
+  public static final String CLIENTKEY = "clientkey";
   public static final String BROKER_URL = "broker_url";
   public static final String TOPIC = "topic";
+  // Pubisher
+  public static final String QOS_LEVEL_KEY = "qos-level";
+  public static final String CLEAN_SESSION_KEY = "clean-session";
+  public static final String WILL_RETAIN = "will-retain";
+  public static final String RECONNECT_PERIOD_IN_SEC = "reconnect-period";
+  public static final String WILL_MODE = "lwt-mode";
+  public static final String NO_WILL_ALTERNATIVE = "no-lwt-alternative";
+  public static final String WILL_ALTERNATIVE = "lwt-alternative";
+  public static final String WILL_GROUP = "lwt-group";
+  public static final String WILL_TOPIC = "lwt-topic";
+  public static final String WILL_MESSAGE = "lwt-message";
+  public static final String WILL_QOS = "lwt-qos-level";
+  public static final String RETAIN = "retain";
+  public static final String KEEP_ALIVE_IN_SEC = "keep-alive";
+  public static final String MQTT_COMPLIANT = "mqtt-version-compliant";
 
   public static Label getAccessModeLabel() {
     return Labels.withId(ACCESS_MODE);
@@ -51,17 +81,66 @@ public class MqttConnectUtils {
     return Labels.withId(TOPIC);
   }
 
-  public static StaticPropertyAlternative getAlternativesOne() {
+  public static Label getQosLevelLabel() {
+    return Labels.withId(QOS_LEVEL_KEY);
+  }
+
+  public static Label getRetainLabel() {
+    return Labels.withId(RETAIN);
+  }
+
+  public static Label getCleanSessionLabel() {
+    return Labels.withId(CLEAN_SESSION_KEY);
+  }
+
+  public static Label getReconnectPeriodLabel() {
+    return Labels.withId(RECONNECT_PERIOD_IN_SEC);
+  }
+
+  public static Label getKeepAliveLabel() {
+    return Labels.withId(KEEP_ALIVE_IN_SEC);
+  }
+
+  public static Label getMqttComplient() {
+    return Labels.withId(MQTT_COMPLIANT);
+  }
+
+  public static Label getWillModeLabel() {
+    return Labels.withId(WILL_MODE);
+  }
+
+  public static StaticPropertyAlternative getNoWillAlternative() {
+    return Alternatives.from(Labels.withId(NO_WILL_ALTERNATIVE), true);
+  }
+
+  public static StaticPropertyAlternative getWillAlternative() {
+    return Alternatives.from(Labels.withId(WILL_ALTERNATIVE),
+        StaticProperties.group(Labels.withId(WILL_GROUP),
+            StaticProperties.stringFreeTextProperty(Labels.withId(WILL_TOPIC)),
+            
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_MESSAGE)),
+            StaticProperties.singleValueSelection(Labels.withId(WILL_RETAIN),
+                Arrays.asList(
+                    new Option("Yes", false),
+                    new Option("No", true))),
+            StaticProperties.singleValueSelection(
+                Labels.withId(WILL_QOS),
+                Arrays.asList(
+                    new Option("0 - at-most-once", true),
+                    new Option("1 - at-least-once", false),
+                    new Option("2 - exactly-once", false)))));
+  }
+
+  public static StaticPropertyAlternative getAnonymousAccess() {
     return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
 
   }
 
-  public static StaticPropertyAlternative getAlternativesOne(boolean selected) 
{
+  public static StaticPropertyAlternative getAnonymousAccess(boolean selected) 
{
     return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS), selected);
 
   }
 
-  public static StaticPropertyAlternative getAlternativesTwo() {
+  public static StaticPropertyAlternative getUsernameAccess() {
     return Alternatives.from(Labels.withId(USERNAME_ACCESS),
         StaticProperties.group(Labels.withId(USERNAME_GROUP),
             StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME)),
@@ -69,13 +148,71 @@ public class MqttConnectUtils {
 
   }
 
+  public static List<Option> getQOSLevelSelection() {
+    return Arrays.asList(
+        new Option("0 - at-most-once", false),
+        new Option("1 - at-least-once", true),
+        new Option("2 - exactly-once", false));
+  }
+
+  public static List<Option> getRetainSelection() {
+    return Arrays.asList(
+        new Option("Yes", false),
+        new Option("No", true));
+  }
+
+  public static List<Option> getCleanSessionSelection() {
+    return Arrays.asList(
+        new Option("Yes", true),
+        new Option("No", false));
+  }
+
+  public static List<Option> getMqttSelection() {
+    return Arrays.asList(
+        new Option("Yes", true),
+        new Option("No", false));
+  }
+
+  public static StaticPropertyAlternative getClientCertAccess() {
+    var group = StaticProperties.group(
+        Labels.withId(CLIENT_CERT_GROUP),
+        StaticProperties.stringFreeTextProperty(Labels.withId(CLIENTCERT), 
true, false),
+        StaticProperties.secretValue(Labels.withId(CLIENTKEY)));
+    group.setHorizontalRendering(false);
+    return Alternatives.from(Labels.withId(CLIENT_CERT_ACCESS), group);
+
+  }
+
   public static MqttConfig getMqttConfig(IParameterExtractor extractor) {
     return getMqttConfig(extractor, null);
   }
 
+  public static String getProtocol(String brokeruri) {
+    String protocol = null;
+
+    try {
+      URI uri = new URI(brokeruri);
+      protocol = uri.getScheme();
+    } catch (URISyntaxException e) {
+
+    }
+
+    return protocol;
+  }
+
+  public static boolean tlsEnabled(String protocol) {
+    if (protocol == null) {
+      return false;
+    }
+    String proto = protocol.toLowerCase();
+    return proto.equals("ssl") || proto.equals("tls") || proto.equals("mqtts");
+  }
+
   public static MqttConfig getMqttConfig(IParameterExtractor extractor, String 
topicInput) {
     MqttConfig mqttConfig;
     String brokerUrl = extractor.singleValueParameter(BROKER_URL, 
String.class);
+    String protocol = getProtocol(brokerUrl);
+    boolean tlsEnabled = tlsEnabled(protocol);
 
     String topic;
     if (topicInput == null) {
@@ -88,13 +225,92 @@ public class MqttConnectUtils {
 
     if (selectedAlternative.equals(ANONYMOUS_ACCESS)) {
       mqttConfig = new MqttConfig(brokerUrl, topic);
+      if (tlsEnabled) {
+        mqttConfig = new MqttConfig(brokerUrl, topic, true);
+      }
+    } else if (selectedAlternative.equals(CLIENT_CERT_ACCESS)) {
+
+      String clientcert = extractor.singleValueParameter(CLIENTCERT, 
String.class);
+      String clientkey = extractor.secretValue(CLIENTKEY);
+      // TWO way auth so TLS needs to be enabled
+      mqttConfig = new MqttConfig(brokerUrl, topic, true, clientcert, 
clientkey);
+
     } else {
       String username = extractor.singleValueParameter(USERNAME, String.class);
       String password = extractor.secretValue(PASSWORD);
-      mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
+      if (tlsEnabled) {
+        mqttConfig = new MqttConfig(brokerUrl, topic, username, password, 
true);
+      } else {
+        mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
+      }
     }
 
     return mqttConfig;
   }
 
+  public static QoS extractQoSFromString(String s) {
+    int qos = Integer.parseInt(s.replaceAll("\\D+", ""));
+    switch (qos) {
+      case 0:
+        return QoS.AT_MOST_ONCE;
+      case 1:
+        return QoS.AT_LEAST_ONCE;
+      case 2:
+        return QoS.EXACTLY_ONCE;
+    }
+    throw new SpRuntimeException("Could not retrieve QoS level: QoS " + qos);
+  }
+
+
+  public static boolean extractBoolean(String s) {
+    switch (s) {
+      case "Yes":
+        return true;
+      case "No":
+        return false;
+    }
+    throw new SpRuntimeException("Could not map string value to boolean: " + 
s);
+  }
+
+  public static long fromSecToMs(Long value) {
+    return value * 1000;
+  }
+
+
+  public static MqttConfig extractDataSinkParams(IParameterExtractor 
extractor) {
+
+
+    MqttConfig mqttConfig = getMqttConfig(extractor);
+
+    mqttConfig.setQos(MqttConnectUtils.extractQoSFromString(
+        extractor.selectedSingleValue(QOS_LEVEL_KEY, String.class)));
+
+    mqttConfig.setClientId(UUID.randomUUID().toString());
+
+    mqttConfig.setReconnectDelayMaxInMs(MqttConnectUtils
+        .fromSecToMs(extractor.singleValueParameter(RECONNECT_PERIOD_IN_SEC, 
Long.class)));
+
+    
mqttConfig.setKeepAliveInSec(extractor.singleValueParameter(KEEP_ALIVE_IN_SEC, 
Short.class));
+
+    mqttConfig
+        
.setCleanSession(MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(CLEAN_SESSION_KEY,
 String.class)));
+    
mqttConfig.setRetain(MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(RETAIN,
 String.class)));
+
+    boolean isCompliant = 
MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(MQTT_COMPLIANT, 
String.class));
+    if (isCompliant) {
+      mqttConfig.setMqttProtocolVersion("3.1.1");
+    }
+
+
+    String willMode = extractor.selectedAlternativeInternalId(WILL_MODE);
+    if (willMode.equals(WILL_ALTERNATIVE)) {
+      mqttConfig.setLastWill(true);
+      mqttConfig.setWillTopic(extractor.singleValueParameter(WILL_TOPIC, 
String.class));
+      mqttConfig.setWillMessage(extractor.singleValueParameter(WILL_MESSAGE, 
String.class));
+      
mqttConfig.setWillQoS(MqttConnectUtils.extractQoSFromString(extractor.selectedSingleValue(WILL_QOS,
 String.class)));
+      
mqttConfig.setWillRetain(MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(WILL_RETAIN,
 String.class)));
+    }
+
+    return mqttConfig;
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
index db81aab8ba..19895b80f6 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
@@ -24,63 +24,64 @@ import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class MqttConsumer implements Runnable {
+public class MqttConsumer extends MqttBase implements Runnable {
 
-  private final InternalEventProcessor<byte[]> consumer;
-  private boolean running;
-  private int maxElementsToReceive = -1;
-  private int messageCount = 0;
+    private final InternalEventProcessor<byte[]> consumer;
+    private boolean running;
+    private int maxElementsToReceive = -1;
+    private int messageCount = 0;
 
-  private final MqttConfig mqttConfig;
+    private static final Logger LOG = 
LoggerFactory.getLogger(MqttConsumer.class);
 
-  public MqttConsumer(MqttConfig mqttConfig,
-                      InternalEventProcessor<byte[]> consumer) {
-    this.mqttConfig = mqttConfig;
-    this.consumer = consumer;
-  }
+    public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> 
consumer) {
+        super(mqttConfig);
+        this.consumer = consumer;
+    }
 
-  public MqttConsumer(MqttConfig mqttConfig,
-                      InternalEventProcessor<byte[]> consumer,
-                      int maxElementsToReceive) {
-    this(mqttConfig, consumer);
-    this.maxElementsToReceive = maxElementsToReceive;
-  }
+    public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> 
consumer, int maxElementsToReceive) {
+        this(mqttConfig, consumer);
+        this.maxElementsToReceive = maxElementsToReceive;
+    }
 
-  @Override
-  public void run() {
-    this.running = true;
-    MQTT mqtt = new MQTT();
-    try {
-      mqtt.setHost(mqttConfig.getUrl());
-      mqtt.setConnectAttemptsMax(1);
-      if (mqttConfig.getAuthenticated()) {
-        mqtt.setUserName(mqttConfig.getUsername());
-        mqtt.setPassword(mqttConfig.getPassword());
-      }
-      BlockingConnection connection = mqtt.blockingConnection();
-      connection.connect();
-      Topic[] topics = {new Topic(mqttConfig.getTopic(), QoS.AT_LEAST_ONCE)};
-      byte[] qoses = connection.subscribe(topics);
+    @Override
+    public void run() {
+        this.running = true;
+        try {
+            MQTT mqtt = super.setupMqttClient();
+            BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+            subscribeToTopic(connection);
+            processMessages(connection);
+            connection.disconnect();
+        } catch (Exception e) {
+            LOG.error("Error in MQTT consumer: ", e);
+            throw new RuntimeException("Error when receiving data from MQTT", 
e);
+        }
+    }
 
-      while (running && ((maxElementsToReceive == -1) || (this.messageCount <= 
maxElementsToReceive))) {
-        Message message = connection.receive();
-        byte[] payload = message.getPayload();
-        consumer.onEvent(payload);
-        message.ack();
-        this.messageCount++;
-      }
-      connection.disconnect();
-    } catch (Exception e) {
-      throw new RuntimeException("Error when receiving data from MQTT", e);
+    private void processMessages(BlockingConnection connection) throws 
Exception {
+        while (running && (maxElementsToReceive == -1 || messageCount < 
maxElementsToReceive)) {
+            Message message = connection.receive();
+            byte[] payload = message.getPayload();
+            consumer.onEvent(payload);
+            message.ack();
+            messageCount++;
+        }
     }
-  }
 
-  public void close() {
-    this.running = false;
-  }
+    private void subscribeToTopic(BlockingConnection connection) throws 
Exception {
+        Topic[] topics = { new Topic(super.mqttConfig.getTopic(), 
QoS.AT_LEAST_ONCE) };
+        connection.subscribe(topics);
+    }
 
-  public Integer getMessageCount() {
-    return messageCount;
-  }
-}
+    public void close() {
+        this.running = false;
+    }
+
+    public Integer getMessageCount() {
+        return messageCount;
+    }
+}
\ No newline at end of file
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
new file mode 100644
index 0000000000..b3c8e2d0de
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.shared;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
+import org.apache.streampipes.model.runtime.Event;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.net.URI;
+
+
+public class MqttPublisher extends MqttBase {
+
+  private URI uri;
+  private MQTT mqtt;
+  private BlockingConnection conn;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MqttPublisher.class);
+
+  public MqttPublisher(IDataSinkParameters params) {
+
+    super(MqttConnectUtils.extractDataSinkParams(params.extractor()));
+       try {
+    this.mqtt = super.setupMqttClient();
+
+       } catch (Exception e) {
+            LOG.error("Error in MQTT consumer: ", e);
+            throw new RuntimeException("Error when receiving data from MQTT", 
e);
+        }
+
+  }
+
+  /**
+   * Start blocking connection to MQTT broker.
+   */
+  public void connect() {
+    try {
+      this.conn = mqtt.blockingConnection();
+      this.conn.connect();
+    } catch (Exception e) {
+      throw new SpRuntimeException("Could not connect to MQTT broker: "
+          + uri.toString() + ", " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Publish received event to MQTT broker.
+   *
+   * @param event event to be published
+   */
+  public void publish(Event event) {
+    JsonDataFormatDefinition dataFormatDefinition = new 
JsonDataFormatDefinition();
+    byte[] payload = new 
String(dataFormatDefinition.fromMap(event.getRaw())).getBytes();
+    try {
+      this.conn.publish(super.mqttConfig.getTopic(), payload, 
super.mqttConfig.getQos(), super.mqttConfig.isRetain());
+    } catch (Exception e) {
+      throw new SpRuntimeException("Could not publish to MQTT broker: "
+          + uri.toString() + ", " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Disconnect from MQTT broker.
+   */
+  public void disconnect() {
+    try {
+      if (this.conn.isConnected()) {
+        this.conn.disconnect();
+      }
+    } catch (Exception e) {
+      throw new SpRuntimeException("Could not disconnect from MQTT broker: "
+          + uri.toString() + ", " + e.getMessage(), e);
+    }
+  }
+
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
index 273e23ddf0..f6c509ab6e 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
@@ -22,136 +22,74 @@ import 
org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
 import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
 import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import 
org.apache.streampipes.extensions.connectors.mqtt.sink.common.MqttClient;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttPublisher;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 
-import java.util.Arrays;
-
 public class MqttPublisherSink implements IStreamPipesDataSink {
+    public static final String ID = 
"org.apache.streampipes.sinks.brokers.jvm.mqtt";
+
+    private static final int DEFAULT_MQTT_PORT = 1883;
+    private static final int DEFAULT_RECONNECT_PERIOD = 30;
+    private static final int DEFAULT_KEEP_ALIVE = 30;
 
-  private static final int DEFAULT_MQTT_PORT = 1883;
-  private static final int DEFAULT_RECONNECT_PERIOD = 30;
-  private static final int DEFAULT_KEEP_ALIVE = 30;
 
-  public static final String TOPIC = "topic";
-  public static final String HOST = "host";
-  public static final String PORT = "port";
-  public static final String AUTH_MODE = "auth-mode";
-  public static final String NO_AUTH_ALTERNATIVE = "no-auth-alternative";
-  public static final String AUTH_ALTERNATIVE = "basic-auth-alternative";
-  public static final String USERNAME_GROUP = "username-group";
-  public static final String USERNAME = "username";
-  public static final String PASSWORD = "password";
-  public static final String QOS_LEVEL_KEY = "qos-level";
-  public static final String CLEAN_SESSION_KEY = "clean-session";
-  public static final String WILL_RETAIN = "will-retain";
-  public static final String ENCRYPTION_MODE = "encryption-mode";
-  public static final String RECONNECT_PERIOD_IN_SEC = "reconnect-period";
-  public static final String WILL_MODE = "lwt-mode";
-  public static final String NO_WILL_ALTERNATIVE = "no-lwt-alternative";
-  public static final String WILL_ALTERNATIVE = "lwt-alternative";
-  public static final String WILL_GROUP = "lwt-group";
-  public static final String WILL_TOPIC = "lwt-topic";
-  public static final String WILL_MESSAGE = "lwt-message";
-  public static final String WILL_QOS = "lwt-qos-level";
-  public static final String RETAIN = "retain";
-  public static final String KEEP_ALIVE_IN_SEC = "keep-alive";
-  public static final String MQTT_COMPLIANT = "mqtt-version-compliant";
+    private MqttPublisher mqttClient;
 
-  private MqttClient mqttClient;
 
-  @Override
-  public IDataSinkConfiguration declareConfig() {
-    return DataSinkConfiguration.create(
-        MqttPublisherSink::new,
-        
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.mqtt", 0)
-            .category(DataSinkType.MESSAGING)
-            .withLocales(Locales.EN)
-            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
-            .requiredStream(StreamRequirementsBuilder.any())
-            .requiredTextParameter(Labels.withId(TOPIC))
-            .requiredTextParameter(Labels.withId(HOST))
-            .requiredIntegerParameter(Labels.withId(PORT), DEFAULT_MQTT_PORT)
-            .requiredAlternatives(
-                Labels.withId(AUTH_MODE),
-                Alternatives.from(Labels.withId(NO_AUTH_ALTERNATIVE), true),
-                Alternatives.from(Labels.withId(AUTH_ALTERNATIVE),
-                    StaticProperties.group(Labels.withId(USERNAME_GROUP),
-                        
StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME)),
-                        
StaticProperties.secretValue(Labels.withId(PASSWORD)))))
-            .requiredSingleValueSelection(
-                Labels.withId(ENCRYPTION_MODE),
-                Arrays.asList(
-                    new Option("TCP", true),
-                    // SSL not yet supported
-                    new Option("SSL/TLS", false)))
-            .requiredSingleValueSelection(
-                Labels.withId(QOS_LEVEL_KEY),
-                Arrays.asList(
-                    new Option("0 - at-most-once", false),
-                    new Option("1 - at-least-once", true),
-                    new Option("2 - exactly-once", false)))
-            .requiredSingleValueSelection(
-                Labels.withId(RETAIN),
-                Arrays.asList(
-                    new Option("Yes", false),
-                    new Option("No", true)))
-            .requiredSingleValueSelection(
-                Labels.withId(CLEAN_SESSION_KEY),
-                Arrays.asList(
-                    new Option("Yes", true),
-                    new Option("No", false)))
-            .requiredIntegerParameter(Labels.withId(RECONNECT_PERIOD_IN_SEC), 
DEFAULT_RECONNECT_PERIOD)
-            .requiredIntegerParameter(Labels.withId(KEEP_ALIVE_IN_SEC), 
DEFAULT_KEEP_ALIVE)
-            .requiredSingleValueSelection(
-                Labels.withId(MQTT_COMPLIANT),
-                Arrays.asList(
-                    new Option("Yes", true),
-                    new Option("No", false)))
-            .requiredAlternatives(
-                Labels.withId(WILL_MODE),
-                Alternatives.from(Labels.withId(NO_WILL_ALTERNATIVE), true),
-                Alternatives.from(Labels.withId(WILL_ALTERNATIVE),
-                    StaticProperties.group(Labels.withId(WILL_GROUP),
-                        
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_TOPIC)),
-                        
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_MESSAGE)),
-                        
StaticProperties.singleValueSelection(Labels.withId(WILL_RETAIN),
-                            Arrays.asList(
-                                new Option("Yes", false),
-                                new Option("No", true))),
-                        StaticProperties.singleValueSelection(
-                            Labels.withId(WILL_QOS),
-                            Arrays.asList(
-                                new Option("0 - at-most-once", true),
-                                new Option("1 - at-least-once", false),
-                                new Option("2 - exactly-once", false))))))
-            .build()
-    );
-  }
+    @Override
+    public IDataSinkConfiguration declareConfig() {
+        return DataSinkConfiguration.create(
+                MqttPublisherSink::new,
+                
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.mqtt", 1)
+                        .category(DataSinkType.MESSAGING)
+                        .withLocales(Locales.EN)
+                        .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+                        .requiredStream(StreamRequirementsBuilder.any())   
+                         
.requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+        .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAnonymousAccess(),
+            MqttConnectUtils.getUsernameAccess(),  
MqttConnectUtils.getClientCertAccess())
+        .requiredTextParameter(MqttConnectUtils.getTopicLabel())
+                        .requiredSingleValueSelection(
+                                MqttConnectUtils.getQosLevelLabel(),
+                                MqttConnectUtils.getQOSLevelSelection())
+                        .requiredSingleValueSelection(
+                                MqttConnectUtils.getRetainLabel(),
+                               MqttConnectUtils.getRetainSelection())
+                        .requiredSingleValueSelection(
+                                MqttConnectUtils.getCleanSessionLabel(),
+                                MqttConnectUtils.getCleanSessionSelection())
+                        
.requiredIntegerParameter(MqttConnectUtils.getReconnectPeriodLabel(), 
DEFAULT_RECONNECT_PERIOD)
+                        
.requiredIntegerParameter(MqttConnectUtils.getKeepAliveLabel(), 
DEFAULT_KEEP_ALIVE)
+                        .requiredSingleValueSelection(
+                                MqttConnectUtils.getMqttComplient(),
+                                MqttConnectUtils.getMqttSelection())
+                        .requiredAlternatives(
+                                MqttConnectUtils.getWillModeLabel(),
+                                MqttConnectUtils.getNoWillAlternative(),
+                               MqttConnectUtils.getWillAlternative())
+                        .build());
+    }
 
-  @Override
-  public void onPipelineStarted(IDataSinkParameters params, 
EventSinkRuntimeContext runtimeContext) {
-    this.mqttClient = new MqttClient(params);
-    this.mqttClient.connect();
-  }
+    @Override
+    public void onPipelineStarted(IDataSinkParameters params, 
EventSinkRuntimeContext runtimeContext) {
+        this.mqttClient = new MqttPublisher(params);
+        this.mqttClient.connect();
+    }
 
-  @Override
-  public void onEvent(Event event) throws SpRuntimeException {
-    this.mqttClient.publish(event);
-  }
+    @Override
+    public void onEvent(Event event) throws SpRuntimeException {
+        this.mqttClient.publish(event);
+    }
 
-  @Override
-  public void onPipelineStopped() {
-    this.mqttClient.disconnect();
-  }
+    @Override
+    public void onPipelineStopped() {
+        this.mqttClient.disconnect();
+    }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java
deleted file mode 100644
index 21b5e786df..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.extensions.connectors.mqtt.sink.common;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
-import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import org.apache.streampipes.model.runtime.Event;
-
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-
-import java.net.URI;
-
-public class MqttClient {
-
-  private final MqttOptions options;
-  private URI uri;
-  private MQTT mqtt;
-  private BlockingConnection conn;
-
-  public MqttClient(IDataSinkParameters params) {
-    this.options = new MqttOptions(params);
-    this.createMqttClient();
-  }
-
-  /**
-   * Create new MQTT client
-   */
-  public void createMqttClient() {
-    this.mqtt = new MQTT();
-    this.uri = MqttUtils.makeMqttServerUri(options.getProtocol(), 
options.getHost(), options.getPort());
-    try {
-      /**
-       * Sets the url for connecting to the MQTT broker, e.g. {@code: 
tcp://localhost:1883}.
-       */
-      mqtt.setHost(uri);
-
-      // authentication
-      if (options.isBasicAuth()) {
-        /**
-         * The username for authenticated sessions.
-         */
-        mqtt.setUserName(options.getUsername());
-        /**
-         * The password for authenticated sessions.
-         */
-        mqtt.setPassword(options.getPassword());
-      }
-
-      /**
-       * The client id used when connecting to the MQTT broker.
-       */
-      mqtt.setClientId(options.getClientId());
-
-      /**
-       * Set to false if you want the MQTT server to persist topic 
subscriptions and ack positions across
-       * client sessions. Defaults to true.
-       */
-      mqtt.setCleanSession(options.isCleanSession());
-
-      /**
-       * The maximum amount of time in ms to wait between reconnect attempts. 
Defaults to 30,000.
-       */
-      mqtt.setReconnectDelayMax(options.getReconnectDelayMaxInMs());
-
-      /**
-       * Configures the Keep Alive timer in seconds. Defines the maximum time 
interval between messages
-       * received from a client. It enables the server to detect that the 
network connection to a client has
-       * dropped, without having to wait for the long TCP/IP timeout.
-       */
-      mqtt.setKeepAlive(options.getKeepAliveInSec());
-
-      /**
-       * Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 
3.1 protocol version.
-       */
-      mqtt.setVersion(options.getMqttProtocolVersion());
-
-      // last will and testament options
-      if (options.isLastWill()) {
-        /**
-         * If set the server will publish the client's Will message to the 
specified topics if the client has
-         * an unexpected disconnection.
-         */
-        mqtt.setWillTopic(options.getWillTopic());
-
-        /**
-         * Sets the quality of service to use for the Will message. Defaults 
to QoS.AT_MOST_ONCE.
-         */
-        mqtt.setWillQos(options.getWillQoS());
-
-        /**
-         * The Will message to send. Defaults to a zero length message.
-         */
-        mqtt.setWillMessage(options.getWillMessage());
-
-        /**
-         * Set to true if you want the Will to be published with the retain 
option.
-         */
-        mqtt.setWillRetain(options.getWillRetain());
-      }
-    } catch (Exception e) {
-      throw new SpRuntimeException("Failed to initialize MQTT Client: " + 
e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Start blocking connection to MQTT broker.
-   */
-  public void connect() {
-    try {
-      this.conn = mqtt.blockingConnection();
-      this.conn.connect();
-    } catch (Exception e) {
-      throw new SpRuntimeException("Could not connect to MQTT broker: "
-          + uri.toString() + ", " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Publish received event to MQTT broker.
-   *
-   * @param event event to be published
-   */
-  public void publish(Event event) {
-    JsonDataFormatDefinition dataFormatDefinition = new 
JsonDataFormatDefinition();
-    byte[] payload = new 
String(dataFormatDefinition.fromMap(event.getRaw())).getBytes();
-    try {
-      this.conn.publish(options.getTopic(), payload, options.getQos(), 
options.isRetain());
-    } catch (Exception e) {
-      throw new SpRuntimeException("Could not publish to MQTT broker: "
-          + uri.toString() + ", " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Disconnect from MQTT broker.
-   */
-  public void disconnect() {
-    try {
-      if (this.conn.isConnected()) {
-        this.conn.disconnect();
-      }
-    } catch (Exception e) {
-      throw new SpRuntimeException("Could not disconnect from MQTT broker: "
-          + uri.toString() + ", " + e.getMessage(), e);
-    }
-  }
-
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttOptions.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttOptions.java
deleted file mode 100644
index d6823c843c..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttOptions.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.extensions.connectors.mqtt.sink.common;
-
-import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-
-import org.fusesource.mqtt.client.QoS;
-
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.AUTH_ALTERNATIVE;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.AUTH_MODE;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.CLEAN_SESSION_KEY;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.ENCRYPTION_MODE;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.HOST;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.KEEP_ALIVE_IN_SEC;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.MQTT_COMPLIANT;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.PASSWORD;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.PORT;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.QOS_LEVEL_KEY;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.RECONNECT_PERIOD_IN_SEC;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.RETAIN;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.TOPIC;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.USERNAME;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_ALTERNATIVE;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_MESSAGE;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_MODE;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_QOS;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_RETAIN;
-import static 
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_TOPIC;
-
-public class MqttOptions {
-
-  private final String clientId;
-  private final String host;
-  private final int port;
-  private final String topic;
-  private final String protocol;
-  private final QoS qos;
-  private final long reconnectDelayMaxInMs;
-  private final boolean cleanSession;
-  private final boolean retain;
-  private final short keepAliveInSec;
-
-  private String username = "";
-  private String password = "";
-  private boolean isBasicAuth = false;
-  private boolean isLastWill = false;
-  private QoS willQoS = QoS.AT_MOST_ONCE;
-  private Boolean willRetain = false;
-  private String willTopic = "";
-  private String willMessage = "";
-  private String mqttProtocolVersion = "3.1";
-
-  public MqttOptions(IDataSinkParameters params) {
-    var extract = params.extractor();
-
-    this.clientId = 
MqttUtils.runningInstanceId(params.getModel().getElementId());
-    this.topic = extract.singleValueParameter(TOPIC, String.class);
-    this.host = extract.singleValueParameter(HOST, String.class);
-    this.port = extract.singleValueParameter(PORT, Integer.class);
-    this.protocol = extract.selectedSingleValue(ENCRYPTION_MODE, String.class);
-
-    this.qos = 
MqttUtils.extractQoSFromString(extract.selectedSingleValue(QOS_LEVEL_KEY, 
String.class));
-    this.reconnectDelayMaxInMs =
-        
MqttUtils.fromSecToMs(extract.singleValueParameter(RECONNECT_PERIOD_IN_SEC, 
Long.class));
-    this.keepAliveInSec = extract.singleValueParameter(KEEP_ALIVE_IN_SEC, 
Short.class);
-    this.cleanSession = 
MqttUtils.extractBoolean(extract.selectedSingleValue(CLEAN_SESSION_KEY, 
String.class));
-    this.retain = MqttUtils.extractBoolean(extract.selectedSingleValue(RETAIN, 
String.class));
-
-    boolean isCompliant = 
MqttUtils.extractBoolean(extract.selectedSingleValue(MQTT_COMPLIANT, 
String.class));
-    if (isCompliant) {
-      this.mqttProtocolVersion = "3.1.1";
-    }
-
-    String accessMode = extract.selectedAlternativeInternalId(AUTH_MODE);
-    if (accessMode.equals(AUTH_ALTERNATIVE)) {
-      this.isBasicAuth = true;
-      this.username = extract.singleValueParameter(USERNAME, String.class);
-      this.password = extract.secretValue(PASSWORD);
-    }
-
-    String willMode = extract.selectedAlternativeInternalId(WILL_MODE);
-    if (willMode.equals(WILL_ALTERNATIVE)) {
-      this.isLastWill = true;
-      this.willTopic = extract.singleValueParameter(WILL_TOPIC, String.class);
-      this.willMessage = extract.singleValueParameter(WILL_MESSAGE, 
String.class);
-      this.willQoS = 
MqttUtils.extractQoSFromString(extract.selectedSingleValue(WILL_QOS, 
String.class));
-      this.willRetain = 
MqttUtils.extractBoolean(extract.selectedSingleValue(WILL_RETAIN, 
String.class));
-    }
-  }
-
-  public String getClientId() {
-    return clientId;
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public String getProtocol() {
-    return protocol;
-  }
-
-  public QoS getQos() {
-    return qos;
-  }
-
-  public long getReconnectDelayMaxInMs() {
-    return reconnectDelayMaxInMs;
-  }
-
-  public boolean isCleanSession() {
-    return cleanSession;
-  }
-
-  public boolean isRetain() {
-    return retain;
-  }
-
-  public short getKeepAliveInSec() {
-    return keepAliveInSec;
-  }
-
-  public String getUsername() {
-    return username;
-  }
-
-  public String getPassword() {
-    return password;
-  }
-
-  public boolean isBasicAuth() {
-    return isBasicAuth;
-  }
-
-  public boolean isLastWill() {
-    return isLastWill;
-  }
-
-  public QoS getWillQoS() {
-    return willQoS;
-  }
-
-  public Boolean getWillRetain() {
-    return willRetain;
-  }
-
-  public String getWillTopic() {
-    return willTopic;
-  }
-
-  public String getWillMessage() {
-    return willMessage;
-  }
-
-  public String getMqttProtocolVersion() {
-    return mqttProtocolVersion;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttUtils.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttUtils.java
deleted file mode 100644
index 9c3233df0c..0000000000
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttUtils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.extensions.connectors.mqtt.sink.common;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-
-import org.fusesource.mqtt.client.QoS;
-
-import java.net.URI;
-
-public class MqttUtils {
-
-  private static final String TCP = "TCP";
-  private static final String SSL_TLS = "SSL/TLS";
-  private static final String TCP_PROTOCOL = "tcp://";
-  private static final String SSL_PROTOCOL = "ssl://";
-  private static final String COLON = ":";
-
-  public MqttUtils() {
-  }
-
-  // remove non-digits
-  public static QoS extractQoSFromString(String s) {
-    int qos = Integer.parseInt(s.replaceAll("\\D+", ""));
-    switch (qos) {
-      case 0:
-        return QoS.AT_MOST_ONCE;
-      case 1:
-        return QoS.AT_LEAST_ONCE;
-      case 2:
-        return QoS.EXACTLY_ONCE;
-    }
-    throw new SpRuntimeException("Could not retrieve QoS level: QoS " + qos);
-  }
-
-  public static String runningInstanceId(String elementId) {
-    return elementId.substring(elementId.lastIndexOf(".") + 1);
-  }
-
-  public static URI makeMqttServerUri(String protocol, String host, int port) {
-    if (protocol.contains(TCP)) {
-      return URI.create(TCP_PROTOCOL + host + COLON + port);
-    } else if (protocol.contains(SSL_TLS)) {
-      return URI.create(SSL_PROTOCOL + host + COLON + port);
-    }
-    throw new SpRuntimeException("Connection protocol not supported! Use 
tcp:// or ssl://");
-  }
-
-  public static boolean extractBoolean(String s) {
-    switch (s) {
-      case "Yes":
-        return true;
-      case "No":
-        return false;
-    }
-    throw new SpRuntimeException("Could not map string value to boolean: " + 
s);
-  }
-
-  public static long fromSecToMs(Long value) {
-    return value * 1000;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
index ec5f30804a..b71d0247a9 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
@@ -37,7 +37,7 @@ The MQTT protocol adapter enables StreamPipes to consume 
messages from an MQTT b
 ## Configuration
 
 ### Broker Settings
-* **Broker URL**: The URL of the MQTT broker (e.g., 
tcp://test-server.com:1883). The protocol (tcp://) and port are required.
+* **Broker URL**: The URL of the MQTT broker (e.g., tcp://test-server.com:1883 
for tcp or ssl://test-server.com:8883 for TLS). The protocol (tcp://) or 
(ssl://) and port are required.
 
 ### Topic Settings
 * **Topic**: The MQTT topic to subscribe to (e.g., test/topic)
@@ -48,6 +48,9 @@ The MQTT protocol adapter enables StreamPipes to consume 
messages from an MQTT b
   * **Username/Password**: Basic authentication with username and password
     * **Username**: The username for authentication
     * **Password**: The password for authentication
+  * **Client Certificate** 
+    * **Certificate PEM**: Public Key in PEM format
+    * **Private Key PEM**: Private (RSA) Key in PEM format (without password !)
 
 ***
 
@@ -61,7 +64,7 @@ The MQTT protocol adapter enables StreamPipes to consume 
messages from an MQTT b
 * **Security**:
   * Basic authentication support
   * TCP protocol support
-  * SSL/TLS support (coming soon)
+  * SSL/TLS support
 
 * **Protocol Support**:
   * MQTT 3.1
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
index 53b1da5689..3ba2f8dce8 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
@@ -19,8 +19,8 @@
 org.apache.streampipes.connect.iiot.protocol.stream.mqtt.title=MQTT
 org.apache.streampipes.connect.iiot.protocol.stream.mqtt.description=Consumes 
messages from a broker using the MQTT protocol
 
-access-mode.title=Access Mode
-access-mode.description=0
+access-mode.title=User Authentication
+access-mode.description=Choose an authentication method for the user
 
 anonymous-alternative.title=Unauthenticated
 anonymous-alternative.description=
@@ -31,14 +31,23 @@ username-alternative.description=
 username-group.title=User Group
 username-group.description=
 
+client-cert-alternative.title= Client Certificate
+client-cert-alternative.description=
+
 username.title=Username
 username.description=
 
 password.title=Password
 password.description=
 
+clientcert.title=Certificate PEM
+clientcert.description = Public key in PEM format
+
+clientkey.title=Private Key PEM
+clientkey.description=Private key in PEM format
+
 broker_url.title=Broker URL
-broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required)"
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required), with TLS ssl://test-server.com:8883 (Protocol required. Port 
required)"
 
 topic.title=Topic
 topic.description=Example: test/topic
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
index 431612c40a..c27528b4fc 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
@@ -18,12 +18,8 @@
 org.apache.streampipes.sinks.brokers.jvm.mqtt.title=MQTT Publisher
 org.apache.streampipes.sinks.brokers.jvm.mqtt.description=Publishes events to 
a MQTT topic
 
-host.title=Host
-host.description=IP address or DNS of MQTT broker
-
-port.title=Port
-port.description=Port of MQTT broker (default 1883)
-
+broker_url.title=Broker URL
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required), with TLS ssl://test-server.com:8883 (Protocol required. Port 
required)"
 topic.title=Topic
 topic.description=Enter MQTT topic
 
@@ -33,17 +29,32 @@ username.description=The username to authenticate with the 
broker
 password.title=Password
 password.description=The password to authenticate with the broker
 
-auth-mode.title=Authentication
-auth-mode.description=Unauthenticated or basic auth
+clientcert.title=Certificate PEM
+clientcert.description = Public key in PEM format
+
+clientkey.title=Private Key PEM
+clientkey.description=Private key in PEM format
+
+access-mode.title=User Authentication
+access-mode.description=Choose an authentication method for the user
+
 
-no-auth-alternative.title=Unauthenticated
-no-auth-alternative.description=No authentication
+anonymous-alternative.title=Unauthenticated
+anonymous-alternative.description=
 
-basic-auth-alternative.title=Basic auth
-basic-auth-alternative.description=Username and password
+username-alternative.title=Username/Password
+username-alternative.description=
+
+username-group.title=User Group
+username-group.description=
+
+client-cert-alternative.title= Client Certificate
+client-cert-alternative.description=
 
 username-group.title=Username and password
 
+cert-group.title=Client Certificate
+
 qos-level.title=QoS
 qos-level.description=Quality of Service level (default: at-least-once)
 
@@ -53,9 +64,6 @@ clean-session.description=Deselect to persist topic 
subscriptions & ack position
 will-retain.title=Retain will message?
 will-retain.description=Select if you want the Will to be published with the 
retain option (default: No)
 
-encryption-mode.title=Encryption
-encryption-mode.description=Select protocol. TCP (plaintext), SSL/TLS 
(encrypted, not yet supported!)
-
 reconnect-period.title=Reconnect period (seconds)
 reconnect-period.description=Amount of time to wait between reconnect attempts 
(default: 30s)
 
diff --git a/streampipes-integration-tests/pom.xml 
b/streampipes-integration-tests/pom.xml
index 2430cb3fed..dd8a7fc524 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -33,7 +33,6 @@
   <properties>
     <owasp.check.skip>true</owasp.check.skip>
     <maven.deploy.skip>false</maven.deploy.skip>
-
     <pulsar.version>3.2.2</pulsar.version>
   </properties>
 
@@ -134,13 +133,67 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-               <propertyExpansion>
-                               
checkstyle.config.base.path=${project.parent.basedir}/tools/maven
-            </propertyExpansion>
+          <propertyExpansion>
+            checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+          </propertyExpansion>
+        </configuration>
+      </plugin>
+
+      <!-- Surefire Plugin for Test Execution -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>3.2.5</version>
+        <configuration>
+          <skipTests>true</skipTests> <!-- Skip tests during mvn clean package 
-->
         </configuration>
       </plugin>
     </plugins>
   </build>
 
+  <profiles>
+    <!-- Self-Signed Certificate Profile -->
+    <profile>
+      <id>selfsigned</id>
+      <activation>
+        <activeByDefault>true</activeByDefault> <!-- You can change this to 
false if you don't want it to be active by default -->
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>3.2.5</version>
+            <configuration>
+            <skipTests>false</skipTests>
+              <environmentVariables>
+                
<SP_SECURITY_ALLOW_SELFSIGNED>true</SP_SECURITY_ALLOW_SELFSIGNED>
+              </environmentVariables>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
 
-</project>
+    <!-- Keystore Profile -->
+    <profile>
+      <id>keystore</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>3.2.5</version>
+            <configuration>
+            <skipTests>false</skipTests>
+              <environmentVariables>
+                
<SP_SECURITY_KEYSTORE_FILENAME>src/test/resources/cacerts.pfx</SP_SECURITY_KEYSTORE_FILENAME>
+                
<SP_SECURITY_KEYSTORE_PASSWORD>changeit</SP_SECURITY_KEYSTORE_PASSWORD>
+              </environmentVariables>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>
\ No newline at end of file
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
index 63bdc9f1c5..932d54c0b9 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
@@ -38,6 +38,7 @@ public abstract class AdapterTesterBase implements 
AutoCloseable {
 
   private int counter;
 
+
   /**
    Executes the test procedure for the adapter integration test.
    This method performs the necessary actions to test the full integraion of 
adapters with third party services.
@@ -55,8 +56,7 @@ public abstract class AdapterTesterBase implements 
AutoCloseable {
   public void run() throws Exception {
     // prepare the third party docker service for the test (e.g. MQTT Broker)
     startAdapterService();
-
-    // generate the AdapterConfiguration for the test adapter
+    // generate the AdapterConfiguraton for the test adapter
     IAdapterConfiguration adapterConfiguration = prepareAdapter();
 
     // start the adapter instance
@@ -70,7 +70,7 @@ public abstract class AdapterTesterBase implements 
AutoCloseable {
 
     // send events to thrird party service
     publishEvents(expectedEvents);
-
+    
     // validate that events where send correctly
     validate(expectedEvents);
   }
@@ -91,8 +91,11 @@ public abstract class AdapterTesterBase implements 
AutoCloseable {
 
     adapter.onAdapterStarted(extractor, (event -> {
       // This collector validates that the events are sent correctly and 
within the right order
+
       assertTrue(Maps.difference(event, 
expectedEvents.get(counter)).areEqual());
       counter++;
+     
+     
     }), null);
     return adapter;
 
@@ -138,6 +141,8 @@ public abstract class AdapterTesterBase implements 
AutoCloseable {
    */
   public void validate(List<Map<String, Object>> expectedEvents) throws 
InterruptedException {
     int retry = 0;
+  
+
     while (counter != expectedEvents.size() && retry < 5) {
       Thread.sleep(1000);
       retry++;
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersIntegrationTest.java
similarity index 75%
rename from 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
rename to 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersIntegrationTest.java
index dad51a0991..3b311d4369 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersIntegrationTest.java
@@ -17,30 +17,53 @@
  */
 package org.apache.streampipes.integration.adapters;
 
+
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 
-public class AdaptersTest {
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class AdaptersIntegrationTest {
 
 
   @Test
-  public void testPulsarAdapter() throws Exception {
-    try (PulsarAdapterTester pulsarAdapterTester = new PulsarAdapterTester()) {
-      pulsarAdapterTester.run();
+  @Order(1)
+  public void testMqttAdapter() throws Exception {
+    try (MqttAdapterTester mqttAdapterTester = new MqttAdapterTester()) {
+      mqttAdapterTester.run();
     }
   }
 
+    @Test
+      @Order(2)
+  public void testMqttTLSAdapter() throws Exception {
+
+try (MqttAdapterTLSTester mqttAdapterTLSTester = new MqttAdapterTLSTester()) {
+      mqttAdapterTLSTester.run();
+    }
+  }
+  
   @Test
-  public void testMqttAdapter() throws Exception {
-    try (MqttAdapterTester mqttAdapterTester = new MqttAdapterTester()) {
-      mqttAdapterTester.run();
+  @Order(3)
+  public void testPulsarAdapter() throws Exception {
+    try (PulsarAdapterTester pulsarAdapterTester = new PulsarAdapterTester()) {
+      pulsarAdapterTester.run();
     }
   }
 
 
+
+
+
   @Test
+  @Order(4)
   public void testKafkaAdapter() throws Exception {
+
     try (KafkaAdapterTester kafkaAdapterTester = new KafkaAdapterTester()) {
       kafkaAdapterTester.run();
     }
+
+   
   }
 }
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
new file mode 100644
index 0000000000..bb34078ca3
--- /dev/null
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.adapters;
+
+import org.apache.streampipes.integration.containers.MosquittoContainer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.List;
+import java.util.Map;
+
+public class MQTTPublisherUtils {
+
+    public static void publishEvents(MqttPublisher publisher, List<Map<String, 
Object>> events) {
+        var objectMapper = new ObjectMapper();
+
+        events.forEach(event -> {
+
+            try {
+                var serializedEvent = objectMapper.writeValueAsBytes(event);
+                publisher.publish(serializedEvent);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        publisher.disconnect();
+    }
+
+    @NotNull
+    public static MqttPublisher getMqttPublisher(MosquittoContainer 
mosquittoContainer, String topic) {
+        MqttTransportProtocol mqttSettings = new MqttTransportProtocol(
+                mosquittoContainer.getBrokerHost(),
+                mosquittoContainer.getBrokerPort(),
+                topic);
+        MqttPublisher publisher = new MqttPublisher(mqttSettings);
+        publisher.connect();
+        return publisher;
+    }
+
+}
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTLSTester.java
similarity index 79%
copy from 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
copy to 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTLSTester.java
index 4b94590f35..7bf210b708 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTLSTester.java
@@ -28,13 +28,9 @@ import 
org.apache.streampipes.integration.containers.MosquittoDevContainer;
 import org.apache.streampipes.integration.utils.Utils;
 import org.apache.streampipes.manager.template.AdapterTemplateHandler;
 import org.apache.streampipes.messaging.mqtt.MqttPublisher;
-import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
 import org.apache.streampipes.model.template.PipelineElementTemplate;
 
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.jetbrains.annotations.NotNull;
 
 import java.util.ArrayList;
@@ -42,7 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class MqttAdapterTester extends AdapterTesterBase {
+public class MqttAdapterTLSTester extends AdapterTesterBase {
 
   MosquittoContainer mosquittoContainer;
 
@@ -66,7 +62,8 @@ public class MqttAdapterTester extends AdapterTesterBase {
 
     List<Map<String, Object>> configs = new ArrayList<>();
     configs.add(Map.of(MqttConnectUtils.TOPIC, TOPIC));
-    configs.add(Map.of(MqttConnectUtils.BROKER_URL, 
mosquittoContainer.getBrokerUrl()));
+    configs.add(Map.of(MqttConnectUtils.BROKER_URL, 
mosquittoContainer.getBrokerUrlTLS()));
+
 
     var template = new PipelineElementTemplate("name", "description", configs);
 
@@ -84,10 +81,11 @@ public class MqttAdapterTester extends AdapterTesterBase {
         .get(0)
         .setSelected(true);
 
+
     // Set format to Json
     ((StaticPropertyAlternatives) (desc)
         .getConfig()
-        .get(3))
+        .get(3)) 
         .getAlternatives()
         .get(0)
         .setSelected(true);
@@ -107,31 +105,14 @@ public class MqttAdapterTester extends AdapterTesterBase {
 
 
   @Override
-  public void publishEvents(List<Map<String, Object>> events) {
-    var publisher = getMqttPublisher();
-    var objectMapper = new ObjectMapper();
-
-    events.forEach(event -> {
-      try {
-        var serializedEvent = objectMapper.writeValueAsBytes(event);
-        publisher.publish(serializedEvent);
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
-    });
-
-    publisher.disconnect();
+  public void publishEvents(List<Map<String, Object>> events) { 
+      var pub = getMqttPublisher();
+      MQTTPublisherUtils.publishEvents(pub, events);
   }
 
   @NotNull
   private MqttPublisher getMqttPublisher() {
-    MqttTransportProtocol mqttSettings = new MqttTransportProtocol(
-        mosquittoContainer.getBrokerHost(),
-        mosquittoContainer.getBrokerPort(),
-        TOPIC);
-    MqttPublisher publisher = new MqttPublisher(mqttSettings);
-    publisher.connect();
-    return publisher;
+    return MQTTPublisherUtils.getMqttPublisher(mosquittoContainer, TOPIC);
   }
 
   @Override
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
index 4b94590f35..23be3ea134 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
@@ -28,13 +28,9 @@ import 
org.apache.streampipes.integration.containers.MosquittoDevContainer;
 import org.apache.streampipes.integration.utils.Utils;
 import org.apache.streampipes.manager.template.AdapterTemplateHandler;
 import org.apache.streampipes.messaging.mqtt.MqttPublisher;
-import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
 import org.apache.streampipes.model.template.PipelineElementTemplate;
 
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.jetbrains.annotations.NotNull;
 
 import java.util.ArrayList;
@@ -84,6 +80,7 @@ public class MqttAdapterTester extends AdapterTesterBase {
         .get(0)
         .setSelected(true);
 
+
     // Set format to Json
     ((StaticPropertyAlternatives) (desc)
         .getConfig()
@@ -105,33 +102,15 @@ public class MqttAdapterTester extends AdapterTesterBase {
     return Utils.getSimpleTestEvents();
   }
 
-
   @Override
-  public void publishEvents(List<Map<String, Object>> events) {
-    var publisher = getMqttPublisher();
-    var objectMapper = new ObjectMapper();
-
-    events.forEach(event -> {
-      try {
-        var serializedEvent = objectMapper.writeValueAsBytes(event);
-        publisher.publish(serializedEvent);
-      } catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
-    });
-
-    publisher.disconnect();
+  public void publishEvents(List<Map<String, Object>> events) { 
+      var pub = getMqttPublisher();
+      MQTTPublisherUtils.publishEvents(pub, events);
   }
 
   @NotNull
   private MqttPublisher getMqttPublisher() {
-    MqttTransportProtocol mqttSettings = new MqttTransportProtocol(
-        mosquittoContainer.getBrokerHost(),
-        mosquittoContainer.getBrokerPort(),
-        TOPIC);
-    MqttPublisher publisher = new MqttPublisher(mqttSettings);
-    publisher.connect();
-    return publisher;
+    return MQTTPublisherUtils.getMqttPublisher(mosquittoContainer, TOPIC);
   }
 
   @Override
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTest.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataIntegrationTest.java
similarity index 95%
rename from 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTest.java
rename to 
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataIntegrationTest.java
index 0115b0bc3f..069947d16a 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTest.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataIntegrationTest.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.integration.client;
 
 import org.junit.jupiter.api.Test;
 
-public class ClientLiveDataTest {
+public class ClientLiveDataIntegrationTest {
 
   @Test
   public void testNatsClient() throws Exception {
diff --git 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
index a22de39ec1..690634fed9 100644
--- 
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
+++ 
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
@@ -20,23 +20,33 @@ package org.apache.streampipes.integration.containers;
 
 import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.Wait;
 
 public class MosquittoContainer extends GenericContainer<MosquittoContainer> {
 
-  protected static final int MOSQUITTO_PORT = 1883;
+  protected static final Integer[] MOSQUITTO_PORTS = {1883, 8883};
 
   public MosquittoContainer() {
     super("eclipse-mosquitto:latest");
   }
 
   public void start() {
-    this.waitStrategy = Wait.forLogMessage(".*listen socket on port 1883.*", 
1);
-    this.withExposedPorts(MOSQUITTO_PORT);
+    this.withExposedPorts(MOSQUITTO_PORTS);
     this.withClasspathResourceMapping(
         "mosquitto.conf",
         "/mosquitto/config/mosquitto.conf",
         BindMode.READ_ONLY);
+    this.withClasspathResourceMapping(
+        "mosquitto.crt",
+        "/mosquitto/config/mosquitto.crt",
+        BindMode.READ_ONLY);
+      this.withClasspathResourceMapping(
+        "mosquitto.key",
+        "/mosquitto/config/mosquitto.key",
+       BindMode.READ_ONLY);
+        this.withClasspathResourceMapping(
+        "passwd",
+        "/mosquitto/config/passwd",
+        BindMode.READ_ONLY);
     super.start();
   }
 
@@ -45,10 +55,17 @@ public class MosquittoContainer extends 
GenericContainer<MosquittoContainer> {
   }
 
   public Integer getBrokerPort() {
-    return getMappedPort(MOSQUITTO_PORT);
+    return getMappedPort(MOSQUITTO_PORTS[0]);
+  }
+  public Integer getBrokerTLSPort() {
+    return getMappedPort(MOSQUITTO_PORTS[1]);
   }
 
   public String getBrokerUrl() {
     return "tcp://" + getBrokerHost() + ":" + getBrokerPort();
   }
+
+  public String getBrokerUrlTLS() {
+    return "ssl://" + getBrokerHost() + ":" + getBrokerTLSPort();
+  }
 }
diff --git a/streampipes-integration-tests/src/test/resources/cacerts.pfx 
b/streampipes-integration-tests/src/test/resources/cacerts.pfx
new file mode 100644
index 0000000000..acc6512475
Binary files /dev/null and 
b/streampipes-integration-tests/src/test/resources/cacerts.pfx differ
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto 
copy.conf b/streampipes-integration-tests/src/test/resources/mosquitto copy.conf
new file mode 100644
index 0000000000..fb0917ee2a
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto copy.conf      
@@ -0,0 +1,43 @@
+# mosquitto.conf
+# Basic listener configuration (for non-TLS)
+listener 1883
+allow_anonymous false
+
+# TLS listener (for secure MQTT connection)
+listener 8883
+cafile /mosquitto/config/mosquitto.crt
+certfile /mosquitto/config/mosquitto.crt
+keyfile /mosquitto/config/mosquitto.key
+allow_anonymous true # Optional: you can enforce authentication on TLS as well
+
+# WebSocket listener (non-TLS)
+listener 9001
+protocol websockets
+allow_anonymous false
+
+# WebSocket listener (TLS)
+listener 9002
+protocol websockets
+cafile /mosquitto/config/mosquitto.crt
+certfile /mosquitto/config/mosquitto.crt
+keyfile /mosquitto/config/mosquitto.key
+allow_anonymous true  # Optional: enforce authentication for WebSockets over 
TLS as well
+
+# Point to password file (for username/password authentication)
+password_file /mosquitto/config/passwd
+
+# Persistence settings (to retain message state)
+persistence true
+persistence_location /mosquitto/data/
+
+# Logging configuration
+log_dest file /mosquitto/log/mosquitto.log
+log_type error
+log_type warning
+log_type notice
+log_type information
+
+# Uncomment and adjust paths if using additional configurations (like custom 
log destinations or file locations)
+#log_dest stdout
+#log_dest syslog
+#log_dest topic
\ No newline at end of file
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.conf 
b/streampipes-integration-tests/src/test/resources/mosquitto.conf
index 3c4318c1ad..dc0aea6993 100644
--- a/streampipes-integration-tests/src/test/resources/mosquitto.conf
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.conf
@@ -16,6 +16,14 @@
 #
 #
 
-allow_anonymous true
 listener 1883
-log_type all
+allow_anonymous true
+
+
+listener 8883
+cafile /mosquitto/config/mosquitto.crt
+certfile /mosquitto/config/mosquitto.crt
+keyfile /mosquitto/config/mosquitto.key
+allow_anonymous true 
+
+password_file /mosquitto/config/passwd
\ No newline at end of file
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.crt 
b/streampipes-integration-tests/src/test/resources/mosquitto.crt
new file mode 100644
index 0000000000..273c47cf21
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.crt
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDYDCCAkgCCQD+NxhAtLaXiDANBgkqhkiG9w0BAQsFADByMQswCQYDVQQGEwJV
+UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEVMBMGA1UEBwwMU2FuRnJhbmNpc2NvMRIw
+EAYDVQQKDAlNeUNvbXBhbnkxDzANBgNVBAsMBk15RGVwdDESMBAGA1UEAwwJbG9j
+YWxob3N0MB4XDTI1MTAyNzEzNTQzMFoXDTI2MTAyNzEzNTQzMFowcjELMAkGA1UE
+BhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFTATBgNVBAcMDFNhbkZyYW5jaXNj
+bzESMBAGA1UECgwJTXlDb21wYW55MQ8wDQYDVQQLDAZNeURlcHQxEjAQBgNVBAMM
+CWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAP5kU4a2
+3XWkcFYTISsgJ2x59tSzcYn05EHNT/sr3j4lKSMNFO3/wLimchuoQxb08DE/9P4h
+KkzzdWM8TiOWo2usfNcQnRUfgvjnWkSwsTHlDqrH7cRQldvWWBzbm27y+I0fu7IN
+4Lw4X1jTLRE7cy5XWkmKHWDUlFYzn6CACi0BzcXO9eYqkcncbVe4rIVGB0BThM80
+JtXuWKwDyTpJAobdCY8RKwvqIPwo265UOidGeMrsC4A5TFGh8oobCArYgIGl/QoZ
+pEpOq2NOjykJqsRKu/Kxrom1bRj2Qp8K/9jtIUeOPubqBNMeh83ojgdgfA5ZwUxq
+vEz2teJeJe8N3jMCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA624V/aM1Dyyhj/oS
++irtAZJFPAHkyaWJkiIeWMpQtY9kkYPIrO5oIdZ9oVZdBFW2rs6KvFf14FT5vTyU
+raq+FZ5UMUK53sH52KeqKbeejVZCaOG9jnys02mkhI0Gx+l3lWtrhN8xqzvcfh9g
+aYMLfh5yAJ6d5hkKwFMdJsNh/PUBhW0onRYfVq304PaDaM9Mtbzk7tbijr5KNgsB
+4vOPn09MC+L6KOSxfMB9urHeeAgtHyURIQpJjnsaG0UdSCOTenRrxqqweOeyo+Q1
+/zVkf3EIQ66CFkjxoPdjXtUeA320pZPw9R45DoWXUHwsLT7Ddj9nngm9e2Es+wGd
+5gQGIQ==
+-----END CERTIFICATE-----
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.key 
b/streampipes-integration-tests/src/test/resources/mosquitto.key
new file mode 100644
index 0000000000..3b1ef10f59
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQD+ZFOGtt11pHBW
+EyErICdsefbUs3GJ9ORBzU/7K94+JSkjDRTt/8C4pnIbqEMW9PAxP/T+ISpM83Vj
+PE4jlqNrrHzXEJ0VH4L451pEsLEx5Q6qx+3EUJXb1lgc25tu8viNH7uyDeC8OF9Y
+0y0RO3MuV1pJih1g1JRWM5+ggAotAc3FzvXmKpHJ3G1XuKyFRgdAU4TPNCbV7lis
+A8k6SQKG3QmPESsL6iD8KNuuVDonRnjK7AuAOUxRofKKGwgK2ICBpf0KGaRKTqtj
+To8pCarESrvysa6JtW0Y9kKfCv/Y7SFHjj7m6gTTHofN6I4HYHwOWcFMarxM9rXi
+XiXvDd4zAgMBAAECggEAQQOI6Tvg20j8QLNA3dGo4atF7tQxZy2EDGEZWLq8YKuE
+mOWl/LFJIqe/L9xP0RUmMaADz9LQCbyKuqLV4XiFKWZ6vUgMrTJReaU+x6FUl8jP
+d8wCsxJZSka8XBuv3KoR5Zc/k+DCF7hcfcnykZ3c8PH4LCU1HuMVSfaFjDJM53U1
+HeH/S9rOB60zTSFeegx+dq5WcCUGAGz4zGhbGaM47vd0etj5JYQwMrc0aCW2L+io
+LMF75T2lbXqNvDc6o/SJQVhoFxfkVCzh1ACh/toy1R/cOuLalBsdfqDynvQd+UN2
+xiSYBzCbP/mpmNt1FS56PvAiSSKHTrO0dhsD4f3yuQKBgQD/5UvAqEtnl+2KOHiX
+6ox6Rktoveqm6LcDRZRpXSfXG0o5Z78tOs00tnNsnwbCVMnBu8XbDHS+qYW6eiFM
+6Gq50bI+hNzK/NKm8r/teDN5nXqkpraX36MtV9JSv5lcVWFcOJx/z3nUBQJKcu+P
+A7HN6K/b/UdX+c6cY3NBsYT7JQKBgQD+ft+ZmhjKBIT3SuSAde97WdpsJqHrFFB1
+N3akYl/3AbzamoM5ET3ZoG5rkczAkSIPu8cIiIf4fk6zTgf2iuFemgoRdjabN02s
+uUYTfeAOKRRkWY1aopYpC60UuaoliIG6LowKcooXVQ72x2ZnSmz7IPcrtJWTWgjT
+NI0bKE6gdwKBgQDH3lyQmeJrg2rxbrIiVfxq9MSphszkmRd44rvMoAoiJRqQQ8w6
+k5b7+RWmXX92AaukOfmL4eq9kML2p7Wi0FWr1XGXC0c49MfDxg7Kd/wcnTfRqrUr
+Ym2dWN7Z6vTp/XYSBdWWroLFazQi2irqVURnQ7s35Ff5CxCpbbP0N6daUQKBgQDo
+NLEmQID/yrHbxTzKrVDuVqTB61nv2WA0I4AgKxZulOpQ94xlxIKPkB9QDP8qcQII
+IwhOk+ykYfLDDZ6caEmL/LbVCex3ITXBNGdpH4AQy5Csoz0jhpfGKb4p2+IQTwY2
+74OdgLbY5SY7KuMXucPIO2LrQOD9SrgkpZ1eOx/KrwKBgHm/mklfnvz25ps9LFBm
+AgFBIv5n/vq26o5GK5K8sAdN43T27K9WAtlgnOz0KO1i8HSHj8jdtm9NtKSJBAMg
+ImCvK32/YlP9R6Lo77M1POC658YCI4Yd1DXGwFz/YHgVYteVWjGlAIW26f9XI+pL
+Rp0UHFAdSaXiYQDiwwCbDYLn
+-----END PRIVATE KEY-----
diff --git a/streampipes-integration-tests/src/test/resources/passwd 
b/streampipes-integration-tests/src/test/resources/passwd
new file mode 100644
index 0000000000..dd3b2e2657
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/passwd
@@ -0,0 +1 @@
+username:$7$101$Kb+Z4WgvFpg/VLZB$6TYUfv4iH0NqBLaZe5ApIOdBVDQ3Vmd0XH6/2DW0B8aHkZWGZRXy2gPZnf8K9n55uKiuMxiDIDAms+8FA1s/bQ==
diff --git 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
index 1793698efb..1ca0db7d46 100644
--- 
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
+++ 
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
@@ -33,6 +33,7 @@ import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableGroupStaticP
 import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticProperty;
 import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
 import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
@@ -64,6 +65,20 @@ public class StaticProperties {
     return alternativesContainer;
   }
 
+  public static SlideToggleStaticProperty toggleAlternative (Label label, 
boolean defaultValue){
+
+        SlideToggleStaticProperty slideToggle = new SlideToggleStaticProperty(
+        label.getInternalId(),
+        label.getLabel(),
+        label.getDescription(),
+        defaultValue);
+
+    slideToggle.setSelected(defaultValue);
+
+    return slideToggle;
+  
+  }
+
   public static MappingPropertyUnary mappingPropertyUnary(Label label, 
RequirementsSelector requirementsSelector,
                                                           PropertyScope 
propertyScope) {
     MappingPropertyUnary mp = new MappingPropertyUnary(label.getInternalId(), 
label

Reply via email to