This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 9bc44e2c8f TLS Support for MQTT Data Sink (#3895)
9bc44e2c8f is described below
commit 9bc44e2c8fc9d7660edcdfbf932e18a5f2cd989a
Author: Jacqueline Höllig <[email protected]>
AuthorDate: Thu Nov 20 15:17:02 2025 +0100
TLS Support for MQTT Data Sink (#3895)
Co-authored-by: Philipp Zehnder <[email protected]>
---
.github/workflows/mvn-integration-test.yml | 56 ++++++
pom.xml | 9 +
.../commons/environment/Environment.java | 2 +-
.../iiot/IIoTAdaptersExtensionModuleExport.java | 3 +-
.../connect/iiot/adapters/oi4/Oi4Adapter.java | 6 +-
.../oi4/migration/Oi4AdapterMigrationV1.java | 84 ++++++++
.../strings.en | 15 +-
.../strings.en | 2 +-
.../mqtt/MqttConnectorsModuleExport.java | 8 +-
.../connectors/mqtt/adapter/MqttProtocol.java | 6 +-
.../mqtt/migration/MQTTAdapterMigrationV1.java | 87 ++++++++
.../mqtt/migration/MQTTSinkMigrationV1.java | 124 ++++++++++++
.../connectors/mqtt/security/SecurityUtils.java | 191 ++++++++++++++++++
.../connectors/mqtt/shared/MqttBase.java | 113 +++++++++++
.../connectors/mqtt/shared/MqttConfig.java | 179 +++++++++++++++-
.../connectors/mqtt/shared/MqttConnectUtils.java | 224 ++++++++++++++++++++-
.../connectors/mqtt/shared/MqttConsumer.java | 101 +++++-----
.../connectors/mqtt/shared/MqttPublisher.java | 98 +++++++++
.../connectors/mqtt/sink/MqttPublisherSink.java | 170 +++++-----------
.../connectors/mqtt/sink/common/MqttClient.java | 165 ---------------
.../connectors/mqtt/sink/common/MqttOptions.java | 181 -----------------
.../connectors/mqtt/sink/common/MqttUtils.java | 77 -------
.../documentation.md | 7 +-
.../strings.en | 15 +-
.../strings.en | 38 ++--
streampipes-integration-tests/pom.xml | 63 +++++-
.../integration/adapters/AdapterTesterBase.java | 11 +-
...ptersTest.java => AdaptersIntegrationTest.java} | 37 +++-
.../integration/adapters/MQTTPublisherUtils.java | 61 ++++++
...dapterTester.java => MqttAdapterTLSTester.java} | 37 +---
.../integration/adapters/MqttAdapterTester.java | 31 +--
...est.java => ClientLiveDataIntegrationTest.java} | 2 +-
.../integration/containers/MosquittoContainer.java | 27 ++-
.../src/test/resources/cacerts.pfx | Bin 0 -> 187040 bytes
.../src/test/resources/mosquitto copy.conf | 43 ++++
.../src/test/resources/mosquitto.conf | 12 +-
.../src/test/resources/mosquitto.crt | 21 ++
.../src/test/resources/mosquitto.key | 28 +++
.../src/test/resources/passwd | 1 +
.../apache/streampipes/sdk/StaticProperties.java | 15 ++
40 files changed, 1645 insertions(+), 705 deletions(-)
diff --git a/.github/workflows/mvn-integration-test.yml
b/.github/workflows/mvn-integration-test.yml
new file mode 100644
index 0000000000..0f95d120f1
--- /dev/null
+++ b/.github/workflows/mvn-integration-test.yml
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name: run-mvn-integration-tests
+
+on:
+ pull_request:
+
+
+jobs:
+ build_and_integration_test:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v5
+
+ - name: Set up JDK 17
+ uses: actions/setup-java@v5
+ with:
+ distribution: 'temurin'
+ java-version: '17'
+ cache: 'maven'
+ - name: Set up Docker
+ uses: docker/setup-buildx-action@v2
+ with:
+ version: latest
+ - name: Install Maven
+ run: |
+ sudo apt update
+ sudo apt install -y maven
+ # Step 1: Build everything but skip tests
+ - name: Build all StreamPipes modules (skip tests)
+ run: mvn -B clean install -DskipTests
+
+ # Step 2: Run only integration tests (selfsigned)
+ - name: Run Integration Tests (selfsigned)
+ run: |
+ mvn -B test -P selfsigned -pl streampipes-integration-tests
-Dtest='**/*Test'
+
+ # Step 3: Run only integration tests (keystore)
+ - name: Run Integration Tests (keystore)
+ run: |
+ mvn -B test -P keystore -pl streampipes-integration-tests
-Dtest='**/*Test'
+
diff --git a/pom.xml b/pom.xml
index ccd94266e7..8eead72d48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1498,6 +1498,15 @@
<!-- Exclude GO -->
<exclude>**/go.sum</exclude>
+
+
+ <!-- Resource files for Integration Test-->
+
<exclude>streampipes-integration-tests/src/test/resources/mosquitto.crt</exclude>
+ <exclude>streampipes-integration-tests/src/test/resources/mosquitto
copy.conf</exclude>
+
<exclude>streampipes-integration-tests/src/test/resources/mosquitto.key</exclude>
+
<exclude>streampipes-integration-tests/src/test/resources/passwd</exclude>
+
+
</excludes>
</configuration>
</plugin>
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 628f03ef40..af604ac00e 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -187,7 +187,7 @@ public interface Environment {
StringEnvironmentVariable getTruststoreType();
BooleanEnvironmentVariable getAllowSelfSignedCertificates();
-
+
IntEnvironmentVariable getPlc4xMaxWaitTimeMs();
IntEnvironmentVariable getPlc4xMaxLeaseTimeMs();
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
index 31c984e0b6..5c9dbf874c 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/IIoTAdaptersExtensionModuleExport.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.connect.iiot;
import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
+import
org.apache.streampipes.connect.iiot.adapters.oi4.migration.Oi4AdapterMigrationV1;
import
org.apache.streampipes.connect.iiot.adapters.simulator.machine.MachineDataSimulatorAdapter;
import org.apache.streampipes.connect.iiot.protocol.stream.FileReplayAdapter;
import org.apache.streampipes.connect.iiot.protocol.stream.HttpServerProtocol;
@@ -50,6 +51,6 @@ public class IIoTAdaptersExtensionModuleExport implements
IExtensionModuleExport
@Override
public List<IModelMigrator<?, ?>> migrators() {
- return List.of();
+ return List.of(new Oi4AdapterMigrationV1());
}
}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
index b191a47922..b83a98a249 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
@@ -99,13 +99,13 @@ public class Oi4Adapter implements StreamPipesAdapter {
public IAdapterConfiguration declareConfig() {
return AdapterConfigurationBuilder
- .create(ID, 0, Oi4Adapter::new)
+ .create(ID, 1, Oi4Adapter::new)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
.requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
- .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAlternativesOne(),
- MqttConnectUtils.getAlternativesTwo()
+ .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAnonymousAccess(),
+ MqttConnectUtils.getUsernameAccess(),
MqttConnectUtils.getClientCertAccess()
)
.requiredAlternatives(
Labels.withId(OI4AdapterLabels.LABEL_KEY_SENSOR_DESCRIPTION),
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java
new file mode 100644
index 0000000000..be2281bbd0
--- /dev/null
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.iiot.adapters.oi4.migration;
+
+import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import java.util.List;
+
+public class Oi4AdapterMigrationV1 implements IAdapterMigrator {
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ Oi4Adapter.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 0,
+ 1);
+
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription
element,
+ IStaticPropertyExtractor extractor) throws RuntimeException {
+
+ changeUrlDescription(element);
+
+ accessModeDescription(element);
+
+ migrateSecurity((StaticPropertyAlternatives)
element.getConfig().get(1));
+
+ return MigrationResult.success(element);
+ }
+
+ private void migrateSecurity(StaticPropertyAlternatives
securityAlternatives) {
+ migrateGroup(securityAlternatives.getAlternatives());
+ }
+
+ private void changeUrlDescription(AdapterDescription element) {
+ var url = (FreeTextStaticProperty) element.getConfig().get(0);
+ url.setDescription(
+ "Example: tcp://test-server.com:1883 (Protocol required. Port
required), with TLS ssl://test-server.com:8883 (Protocol required. Port
required)");
+ element.getConfig().set(0, url);
+ }
+
+ private void accessModeDescription(AdapterDescription element) {
+ var accessmode = (StaticPropertyAlternatives)
element.getConfig().get(1);
+
+ accessmode.setLabel("User Authentication");
+ accessmode.setDescription(
+ "Choose an authentication method for the user");
+ element.getConfig().set(1, accessmode);
+ }
+
+ private void migrateGroup(List<StaticPropertyAlternative> alternatives) {
+ alternatives.add(MqttConnectUtils.getClientCertAccess());
+
+ }
+
+}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
index 5acd27029c..d861e1f675 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en
@@ -19,8 +19,8 @@
org.apache.streampipes.connect.iiot.adapters.oi4.title=OI4
org.apache.streampipes.connect.iiot.adapters.oi4.description=Connects to an
Open Industry 4.0 (OI4)-compatible device via MQTT
-access-mode.title=Access Mode
-access-mode.description=Provide user credentials for the MQTT broker
+access-mode.title=User Authentication
+access-mode.description=Choose an authentication method for the user
anonymous-alternative.title=Unauthenticated
anonymous-alternative.description=
@@ -34,11 +34,20 @@ username-group.description=
username.title=Username
username.description=
+client-cert-alternative.title= Client Certificate
+client-cert-alternative.description=
+
+clientcert.title=Certificate PEM
+clientcert.description = Public key in PEM format
+
+clientkey.title=Private Key PEM
+clientkey.description=Private key in PEM format
+
password.title=Password
password.description=
broker_url.title=Broker URL
-broker_url.description=Example: tcp://test-server.com:1883 (Protocol required.
Port required)"
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required.
Port required), with TLS ssl://test-server.com:8883 (Protocol required. Port
required)"
topic.title=Topic
topic.description=Example: test/topic
diff --git
a/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
b/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
index 9cd6bb514d..c51d33b164 100644
---
a/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
+++
b/streampipes-extensions/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.iiot.adapters.netio.mqtt/strings.en
@@ -38,7 +38,7 @@ password.title=Password
password.description=
broker_url.title=Broker URL
-broker_url.description=Example: tcp://test-server.com:1883 (Protocol required.
Port required)"
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required.
Port required), , with TLS ssl://test-server.com:8883 (Protocol required. Port
required)"
topic.title=Topic
topic.description=Example: test/topic
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
index a1c72f0fdf..2f47367fab 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/MqttConnectorsModuleExport.java
@@ -23,9 +23,10 @@ import
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.mqtt.adapter.MqttProtocol;
+import
org.apache.streampipes.extensions.connectors.mqtt.migration.MQTTAdapterMigrationV1;
+import
org.apache.streampipes.extensions.connectors.mqtt.migration.MQTTSinkMigrationV1;
import
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink;
-import java.util.Collections;
import java.util.List;
public class MqttConnectorsModuleExport implements IExtensionModuleExport {
@@ -45,6 +46,9 @@ public class MqttConnectorsModuleExport implements
IExtensionModuleExport {
@Override
public List<IModelMigrator<?, ?>> migrators() {
- return Collections.emptyList();
+ return List.of(
+ new MQTTAdapterMigrationV1(),
+ new MQTTSinkMigrationV1()
+ );
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
index af9f02893a..e50a234a1b 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
@@ -65,14 +65,14 @@ public class MqttProtocol implements StreamPipesAdapter {
@Override
public IAdapterConfiguration declareConfig() {
return AdapterConfigurationBuilder
- .create(ID, 0, MqttProtocol::new)
+ .create(ID, 1, MqttProtocol::new)
.withSupportedParsers(Parsers.defaultParsers())
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
.requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
- .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAlternativesOne(),
- MqttConnectUtils.getAlternativesTwo())
+ .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAnonymousAccess(),
+ MqttConnectUtils.getUsernameAccess(),
MqttConnectUtils.getClientCertAccess())
.requiredTextParameter(MqttConnectUtils.getTopicLabel())
.buildConfiguration();
}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTAdapterMigrationV1.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTAdapterMigrationV1.java
new file mode 100644
index 0000000000..1323d5e732
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTAdapterMigrationV1.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.mqtt.adapter.MqttProtocol;
+import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import java.util.List;
+
+public class MQTTAdapterMigrationV1 implements IAdapterMigrator {
+
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ MqttProtocol.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 0,
+ 1);
+
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription
element,
+ IStaticPropertyExtractor extractor) throws RuntimeException {
+
+ changeUrlDescription(element);
+
+ accessModeDescription(element);
+
+ migrateSecurity((StaticPropertyAlternatives)
element.getConfig().get(1));
+
+ return MigrationResult.success(element);
+ }
+
+ private void migrateSecurity(StaticPropertyAlternatives
securityAlternatives) {
+ migrateGroup(securityAlternatives.getAlternatives());
+ }
+
+
+
+ private void changeUrlDescription(AdapterDescription element){
+ var url = (FreeTextStaticProperty) element.getConfig().get(0);
+ url.setDescription(
+ "Example: tcp://test-server.com:1883 (Protocol required. Port
required), with TLS ssl://test-server.com:8883 (Protocol required. Port
required)");
+ element.getConfig().set(0, url);
+ }
+
+ private void accessModeDescription(AdapterDescription element){
+ var accessmode = (StaticPropertyAlternatives)
element.getConfig().get(1);
+
+ accessmode.setLabel("User Authentication");
+ accessmode.setDescription(
+ "Choose an authentication method for the user");
+ element.getConfig().set(1, accessmode);
+ }
+
+ private void migrateGroup(List<StaticPropertyAlternative> alternatives) {
+ alternatives.add(MqttConnectUtils.getClientCertAccess());
+
+ }
+
+}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTSinkMigrationV1.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTSinkMigrationV1.java
new file mode 100644
index 0000000000..8e790bfd4a
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTSinkMigrationV1.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.sdk.StaticProperties;
+
+public class MQTTSinkMigrationV1 implements IDataSinkMigrator {
+
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+
"org.apache.streampipes.sinks.brokers.jvm.mqtt",
+ SpServiceTagPrefix.DATA_SINK,
+ 0,
+ 1);
+
+ }
+
+ @Override
+ public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation
element,
+ IDataSinkParameterExtractor extractor) throws
RuntimeException {
+ // migrate Topic
+ var topic =
migrateTopicToNewNaming(element.getStaticProperties().get(0));
+ // Migrate DAta from Host +Port + Protocol to URI
+ migrateData(element);
+ // SORT THE ITEMS
+ element.getStaticProperties().set(4, topic);
+ // Remove TLS
+ element.getStaticProperties().remove(2);
+ // Remove Port
+ element.getStaticProperties().remove(1);
+ // Add Certificate Option
+ migrateSecurity(element);
+ return MigrationResult.success(element);
+ }
+
+ private StaticProperty migrateTopicToNewNaming(StaticProperty topic) {
+ topic.setLabel(MqttConnectUtils.getTopicLabel().getLabel());
+
topic.setInternalName(MqttConnectUtils.getTopicLabel().getInternalId());
+ return topic;
+
+ }
+
+ private String buildBrokerURI(DataSinkInvocation element) {
+
+ var host = ((FreeTextStaticProperty)
element.getStaticProperties().get(1)).getValue();
+ var port = ((FreeTextStaticProperty)
element.getStaticProperties().get(2)).getValue();
+ var encryptionAlternative = ((OneOfStaticProperty)
element.getStaticProperties().get(4))
+ .getOptions();
+ var encryption = "";
+ for (var i = 0; i < encryptionAlternative.size(); i++) {
+ Option alternative = encryptionAlternative.get(i);
+
+ if (alternative.isSelected()) {
+ encryption = alternative.getName();
+ }
+ }
+ String protocol = "tcp";
+
+ if ("SSL".equalsIgnoreCase(encryption)) {
+ protocol = "ssl";
+ }
+
+ var brokerUri = protocol + "://" + host + ":" + port;
+ return brokerUri;
+
+ }
+
+ private void migrateData(DataSinkInvocation element) {
+
+ var brokerUri = buildBrokerURI(element);
+
+ var broker =
StaticProperties.stringFreeTextProperty(MqttConnectUtils.getBrokerUrlLabel(),
brokerUri);
+
+ element.getStaticProperties().set(0, broker);
+
+ }
+
+ private void migrateSecurity(DataSinkInvocation element) {
+ var oldSecurityAlternatives = (StaticPropertyAlternatives)
element.getStaticProperties().get(1);
+
+ var securityAlternative =
StaticProperties.alternatives(MqttConnectUtils.getAccessModeLabel(),
+ MqttConnectUtils.getAnonymousAccess(),
+ MqttConnectUtils.getUsernameAccess(),
MqttConnectUtils.getClientCertAccess());
+ for (var i = 0; i <
oldSecurityAlternatives.getAlternatives().size(); i++) {
+ StaticPropertyAlternative alternative =
oldSecurityAlternatives.getAlternatives().get(i);
+ if (alternative.getSelected()) {
+
securityAlternative.getAlternatives().get(i).setSelected(true);
+ } else {
+
securityAlternative.getAlternatives().get(i).setSelected(false);
+ }
+ }
+ element.getStaticProperties().set(1, securityAlternative);
+ }
+
+}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/security/SecurityUtils.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/security/SecurityUtils.java
new file mode 100644
index 0000000000..6834c0513b
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/security/SecurityUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.security;
+
+import org.apache.streampipes.commons.environment.Environments;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.KeyFactory;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Base64;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SecurityUtils {
+
+ public static TrustManager[] acceptAllCerts() {
+ return new TrustManager[] {
+ new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ public void checkClientTrusted(X509Certificate[] certs,
String authType) {
+ }
+
+ public void checkServerTrusted(X509Certificate[] certs,
String authType) {
+ }
+ }
+ };
+ }
+
+ public static X509Certificate getServerCertificate(String host, int port)
+ throws NoSuchAlgorithmException, IOException, CertificateException
{
+
+ SSLSocketFactory factory = (SSLSocketFactory)
SSLSocketFactory.getDefault();
+ try (Socket socket = factory.createSocket(host, port)) {
+
+ ((SSLSocket) socket).startHandshake();
+
+ SSLSession session = ((SSLSocket) socket).getSession();
+ var certChain = session.getPeerCertificates();
+
+ return (X509Certificate) certChain[0];
+ }
+ }
+
+ public static KeyStore loadServerKeyStore() throws FileNotFoundException,
KeyStoreException, IOException,
+ NoSuchAlgorithmException, CertificateException {
+
+ var env = Environments.getEnvironment();
+ String keystoreFilename =
env.getKeystoreFilename().getValueOrDefault();
+ String keystoreType = env.getKeystoreType().getValueOrDefault();
+ String keystorePassword =
env.getKeystorePassword().getValueOrDefault();
+ try (FileInputStream keystoreFile = new
FileInputStream(keystoreFilename)) {
+ KeyStore keystore = KeyStore.getInstance(keystoreType);
+ keystore.load(keystoreFile, keystorePassword.toCharArray());
+ return keystore;
+ } catch (IOException | NoSuchAlgorithmException | CertificateException
e) {
+ throw e;
+ }
+ }
+
+ public static KeyManager[] loadClientKeyManagers(String certPem, String
keyPem) throws Exception {
+ X509Certificate certificate = parseCertificateFromPem(certPem);
+ PrivateKey privateKey = parsePrivateKeyFromPem(keyPem);
+
+ String password = ""; // no password for in-memory keystore
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ keyStore.load(null, null);
+ keyStore.setKeyEntry("client", privateKey, password.toCharArray(), new
java.security.cert.Certificate[]{certificate});
+
+ KeyManagerFactory kmf =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(keyStore, password.toCharArray());
+ return kmf.getKeyManagers();
+ }
+
+ public static X509Certificate parseCertificateFromPem(String pem) throws
Exception {
+ String normalized = pem
+ .replace("-----BEGIN CERTIFICATE-----", "")
+ .replace("-----END CERTIFICATE-----", "")
+ .replaceAll("\\s+", "");
+ byte[] decoded = Base64.getDecoder().decode(normalized);
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ return (X509Certificate) cf.generateCertificate(new
ByteArrayInputStream(decoded));
+ }
+
+ public static PrivateKey parsePrivateKeyFromPem(String pem) throws
Exception {
+ pem = pem.replace("\\n", "\n")
+ .replace("\\r", "")
+ .replace("\r", "")
+ .trim();
+
+ if (pem.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+ return parsePkcs1PrivateKey(pem);
+ } else {
+ throw new IllegalArgumentException("Unsupported key format:
missing BEGIN/END markers");
+ }
+ }
+
+ public static PrivateKey parsePkcs1PrivateKey(String pem) throws Exception
{
+ Pattern p = Pattern.compile(
+ "-----BEGIN RSA PRIVATE KEY-----([A-Za-z0-9+/=\\s]+)-----END RSA
PRIVATE KEY-----"
+ );
+ Matcher m = p.matcher(pem.replaceAll("\\s+", ""));
+ if (!m.find()) {
+ pem = pem.replaceAll("-----BEGIN RSA PRIVATE KEY-----", "")
+ .replaceAll("-----END RSA PRIVATE KEY-----", "")
+ .replaceAll("[\\s\\r\\n]", "")
+ .trim();
+ } else {
+ pem = m.group(1);
+ }
+
+ byte[] pkcs1Bytes = Base64.getMimeDecoder().decode(pem);
+ byte[] pkcs8Bytes = convertPkcs1ToPkcs8(pkcs1Bytes);
+
+ PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkcs8Bytes);
+ KeyFactory keyFactory = KeyFactory.getInstance("RSA");
+ return keyFactory.generatePrivate(keySpec);
+ }
+
+ private static byte[] convertPkcs1ToPkcs8(byte[] pkcs1Bytes) throws
IOException {
+ final byte[] pkcs8Header = new byte[] {
+ 0x30, (byte)0x82,
+ 0, 0,
+ 0x02, 0x01, 0x00,
+ 0x30, 0x0d,
+ 0x06, 0x09,
+ 0x2a, (byte)0x86, 0x48, (byte)0x86, (byte)0xf7, 0x0d, 0x01, 0x01,
0x01,
+ 0x05, 0x00,
+ 0x04, (byte)0x82,
+ 0, 0
+ };
+
+ int pkcs1Length = pkcs1Bytes.length;
+ int totalLength = pkcs8Header.length + pkcs1Length;
+
+ pkcs8Header[2] = (byte)((totalLength - 4) >> 8);
+ pkcs8Header[3] = (byte)(totalLength - 4);
+ pkcs8Header[pkcs8Header.length - 2] = (byte)(pkcs1Length >> 8);
+ pkcs8Header[pkcs8Header.length - 1] = (byte)(pkcs1Length);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ out.write(pkcs8Header);
+ out.write(pkcs1Bytes);
+ return out.toByteArray();
+ }
+
+ public static TrustManagerFactory createTrustManagerFactory(KeyStore
keystore) throws Exception {
+ TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(keystore);
+ return trustManagerFactory;
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
new file mode 100644
index 0000000000..51a9160391
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.shared;
+
+import org.apache.streampipes.commons.environment.Environments;
+import
org.apache.streampipes.extensions.connectors.mqtt.security.SecurityUtils;
+
+import org.fusesource.mqtt.client.MQTT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+
+public class MqttBase {
+
+ protected final MqttConfig mqttConfig;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MqttBase.class);
+
+ public MqttBase(MqttConfig mqttConfig) {
+ this.mqttConfig = mqttConfig;
+ }
+
+ protected MQTT setupMqttClient() throws Exception {
+ MQTT mqtt = new MQTT();
+ mqtt.setHost(mqttConfig.getUrl());
+ mqtt.setConnectAttemptsMax(1);
+
+ if (mqttConfig.getAuthenticated()) {
+ mqtt.setUserName(mqttConfig.getUsername());
+ mqtt.setPassword(mqttConfig.getPassword());
+ }
+
+ if (tlsEnabled(new URI(mqttConfig.getUrl()))) {
+ configureTls(mqtt);
+ }
+
+ return mqtt;
+ }
+
+
+private static boolean tlsEnabled(URI brokerUri) {
+ String protocol = brokerUri.getScheme();
+ if (protocol == null) {
+ return false;
+ }
+ String proto = protocol.toLowerCase();
+ return proto.equals("ssl") || proto.equals("tls") || proto.equals("mqtts");
+ }
+ private void configureTls(MQTT mqtt) throws Exception {
+ LOG.info("Configuring TLS for MQTT connection...");
+ KeyStore keyStore = null;
+
+ var env = Environments.getEnvironment();
+ boolean acceptAllCerts =
env.getAllowSelfSignedCertificates().getValueOrDefault();
+
+ if (acceptAllCerts) {
+ LOG.info("Accepting all certificates...");
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(null, SecurityUtils.acceptAllCerts(), new
SecureRandom());
+ mqtt.setSslContext(sslContext);
+ return;
+ }
+
+ keyStore = loadKeyStore();
+ TrustManagerFactory trustManagerFactory =
SecurityUtils.createTrustManagerFactory(keyStore);
+
+ KeyManager[] keyManagers = null;
+ if (mqttConfig.getClientCertificatePath() != null &&
mqttConfig.getClientKeyPath() != null) {
+ keyManagers = SecurityUtils.loadClientKeyManagers(
+ mqttConfig.getClientCertificatePath(),
+ mqttConfig.getClientKeyPath());
+ }
+
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(keyManagers, trustManagerFactory.getTrustManagers(),
new SecureRandom());
+ mqtt.setSslContext(sslContext);
+ }
+
+ private KeyStore loadKeyStore() throws IOException,
NoSuchAlgorithmException, CertificateException, KeyStoreException {
+ try {
+ return SecurityUtils.loadServerKeyStore();
+ } catch (IOException | NoSuchAlgorithmException | CertificateException
| KeyStoreException e) {
+ LOG.error("Error loading keystore from file: {}", e);
+ throw e; // Re-throwing to handle it at the top level
+ }
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
index 4a3b30a224..25c8a9ae07 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java
@@ -17,6 +17,8 @@
*/
package org.apache.streampipes.extensions.connectors.mqtt.shared;
+import org.fusesource.mqtt.client.QoS;
+
public class MqttConfig {
private Boolean authenticated;
@@ -26,6 +28,23 @@ public class MqttConfig {
private String username;
private String password;
+ private String clientCertificate = null;
+ private String clientKey = null;
+ private Boolean tls = false;
+
+ private boolean isLastWill = false;
+ private QoS willQoS = QoS.AT_MOST_ONCE;
+ private Boolean willRetain = false;
+ private String willTopic = "";
+ private String willMessage = "";
+ private String mqttProtocolVersion = "3.1";
+ private QoS qos = QoS.AT_MOST_ONCE;
+ private long reconnectDelayMaxInMs = 10000L;
+ private boolean cleanSession = true;
+ private boolean retain = false;
+ private short keepAliveInSec = 60;
+ private String clientId = "";
+
public MqttConfig(String url, String topic) {
this.authenticated = false;
this.url = url;
@@ -39,23 +58,181 @@ public class MqttConfig {
this.password = password;
}
+ public MqttConfig(String url, String topic, Boolean tlsEnabled, String
clientCertificate, String clientKey) {
+ this.authenticated = false;
+ this.url = url;
+ this.topic = topic;
+ this.tls = tlsEnabled;
+ this.clientCertificate = clientCertificate;
+ this.clientKey = clientKey;
+ }
+
+ public MqttConfig(String url, String topic, String username, String
password, Boolean tlsEnabled) {
+ this(url, topic, username, password);
+ this.tls = tlsEnabled;
+ }
+
+ public MqttConfig(String url, String topic, Boolean tlsEnabled) {
+ this(url, topic);
+ this.tls = tlsEnabled;
+ }
+
public Boolean getAuthenticated() {
return authenticated;
}
+ public void setAuthenticated(Boolean authenticated) {
+ this.authenticated = authenticated;
+ }
+
public String getUrl() {
return url;
}
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
public String getTopic() {
return topic;
}
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
public String getUsername() {
return username;
}
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
public String getPassword() {
return password;
}
-}
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getClientCertificatePath() {
+ return clientCertificate;
+ }
+
+ public void setClientCertificatePath(String clientCertificate) {
+ this.clientCertificate = clientCertificate;
+ }
+
+ public String getClientKeyPath() {
+ return clientKey;
+ }
+
+ public void setClientKey(String clientKey) {
+ this.clientKey = clientKey;
+ }
+
+ public Boolean getTlsEnabled() {
+ return tls;
+ }
+
+ public void setTlsEnabled(Boolean tlsEnabled) {
+ this.tls = tlsEnabled;
+ }
+
+ public boolean isLastWill() {
+ return isLastWill;
+ }
+
+ public void setLastWill(boolean lastWill) {
+ this.isLastWill = lastWill;
+ }
+
+ public QoS getWillQoS() {
+ return willQoS;
+ }
+
+ public void setWillQoS(QoS willQoS) {
+ this.willQoS = willQoS;
+ }
+
+ public Boolean getWillRetain() {
+ return willRetain;
+ }
+
+ public void setWillRetain(Boolean willRetain) {
+ this.willRetain = willRetain;
+ }
+
+ public String getWillTopic() {
+ return willTopic;
+ }
+
+ public void setWillTopic(String willTopic) {
+ this.willTopic = willTopic;
+ }
+
+ public String getWillMessage() {
+ return willMessage;
+ }
+
+ public void setWillMessage(String willMessage) {
+ this.willMessage = willMessage;
+ }
+
+ public String getMqttProtocolVersion() {
+ return mqttProtocolVersion;
+ }
+
+ public void setMqttProtocolVersion(String mqttProtocolVersion) {
+ this.mqttProtocolVersion = mqttProtocolVersion;
+ }
+
+ public QoS getQos() {
+ return qos;
+ }
+
+ public void setQos(QoS qos) {
+ this.qos = qos;
+ }
+
+ public long getReconnectDelayMaxInMs() {
+ return reconnectDelayMaxInMs;
+ }
+
+ public void setReconnectDelayMaxInMs(long reconnectDelayMaxInMs) {
+ this.reconnectDelayMaxInMs = reconnectDelayMaxInMs;
+ }
+
+ public boolean isCleanSession() {
+ return cleanSession;
+ }
+
+ public void setCleanSession(boolean cleanSession) {
+ this.cleanSession = cleanSession;
+ }
+
+ public boolean isRetain() {
+ return retain;
+ }
+
+ public void setRetain(boolean retain) {
+ this.retain = retain;
+ }
+
+ public short getKeepAliveInSec() {
+ return keepAliveInSec;
+ }
+
+ public void setKeepAliveInSec(short keepAliveInSec) {
+ this.keepAliveInSec = keepAliveInSec;
+ }
+}
\ No newline at end of file
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
index 6b4b4a7e55..b579fc5f16 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java
@@ -18,26 +18,56 @@
package org.apache.streampipes.extensions.connectors.mqtt.shared;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Label;
import org.apache.streampipes.sdk.helpers.Labels;
+import org.fusesource.mqtt.client.QoS;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
public class MqttConnectUtils {
/**
* Keys of user configuration parameters
*/
+ // Adapter
public static final String ACCESS_MODE = "access-mode";
public static final String ANONYMOUS_ACCESS = "anonymous-alternative";
public static final String USERNAME_ACCESS = "username-alternative";
+ public static final String CLIENT_CERT_ACCESS = "client-cert-alternative";
public static final String USERNAME_GROUP = "username-group";
+ public static final String CLIENT_CERT_GROUP = "client-cert-group";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
+ public static final String CLIENTCERT = "clientcert";
+ public static final String CLIENTKEY = "clientkey";
public static final String BROKER_URL = "broker_url";
public static final String TOPIC = "topic";
+ // Pubisher
+ public static final String QOS_LEVEL_KEY = "qos-level";
+ public static final String CLEAN_SESSION_KEY = "clean-session";
+ public static final String WILL_RETAIN = "will-retain";
+ public static final String RECONNECT_PERIOD_IN_SEC = "reconnect-period";
+ public static final String WILL_MODE = "lwt-mode";
+ public static final String NO_WILL_ALTERNATIVE = "no-lwt-alternative";
+ public static final String WILL_ALTERNATIVE = "lwt-alternative";
+ public static final String WILL_GROUP = "lwt-group";
+ public static final String WILL_TOPIC = "lwt-topic";
+ public static final String WILL_MESSAGE = "lwt-message";
+ public static final String WILL_QOS = "lwt-qos-level";
+ public static final String RETAIN = "retain";
+ public static final String KEEP_ALIVE_IN_SEC = "keep-alive";
+ public static final String MQTT_COMPLIANT = "mqtt-version-compliant";
public static Label getAccessModeLabel() {
return Labels.withId(ACCESS_MODE);
@@ -51,17 +81,66 @@ public class MqttConnectUtils {
return Labels.withId(TOPIC);
}
- public static StaticPropertyAlternative getAlternativesOne() {
+ public static Label getQosLevelLabel() {
+ return Labels.withId(QOS_LEVEL_KEY);
+ }
+
+ public static Label getRetainLabel() {
+ return Labels.withId(RETAIN);
+ }
+
+ public static Label getCleanSessionLabel() {
+ return Labels.withId(CLEAN_SESSION_KEY);
+ }
+
+ public static Label getReconnectPeriodLabel() {
+ return Labels.withId(RECONNECT_PERIOD_IN_SEC);
+ }
+
+ public static Label getKeepAliveLabel() {
+ return Labels.withId(KEEP_ALIVE_IN_SEC);
+ }
+
+ public static Label getMqttComplient() {
+ return Labels.withId(MQTT_COMPLIANT);
+ }
+
+ public static Label getWillModeLabel() {
+ return Labels.withId(WILL_MODE);
+ }
+
+ public static StaticPropertyAlternative getNoWillAlternative() {
+ return Alternatives.from(Labels.withId(NO_WILL_ALTERNATIVE), true);
+ }
+
+ public static StaticPropertyAlternative getWillAlternative() {
+ return Alternatives.from(Labels.withId(WILL_ALTERNATIVE),
+ StaticProperties.group(Labels.withId(WILL_GROUP),
+ StaticProperties.stringFreeTextProperty(Labels.withId(WILL_TOPIC)),
+
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_MESSAGE)),
+ StaticProperties.singleValueSelection(Labels.withId(WILL_RETAIN),
+ Arrays.asList(
+ new Option("Yes", false),
+ new Option("No", true))),
+ StaticProperties.singleValueSelection(
+ Labels.withId(WILL_QOS),
+ Arrays.asList(
+ new Option("0 - at-most-once", true),
+ new Option("1 - at-least-once", false),
+ new Option("2 - exactly-once", false)))));
+ }
+
+ public static StaticPropertyAlternative getAnonymousAccess() {
return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
}
- public static StaticPropertyAlternative getAlternativesOne(boolean selected)
{
+ public static StaticPropertyAlternative getAnonymousAccess(boolean selected)
{
return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS), selected);
}
- public static StaticPropertyAlternative getAlternativesTwo() {
+ public static StaticPropertyAlternative getUsernameAccess() {
return Alternatives.from(Labels.withId(USERNAME_ACCESS),
StaticProperties.group(Labels.withId(USERNAME_GROUP),
StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME)),
@@ -69,13 +148,71 @@ public class MqttConnectUtils {
}
+ public static List<Option> getQOSLevelSelection() {
+ return Arrays.asList(
+ new Option("0 - at-most-once", false),
+ new Option("1 - at-least-once", true),
+ new Option("2 - exactly-once", false));
+ }
+
+ public static List<Option> getRetainSelection() {
+ return Arrays.asList(
+ new Option("Yes", false),
+ new Option("No", true));
+ }
+
+ public static List<Option> getCleanSessionSelection() {
+ return Arrays.asList(
+ new Option("Yes", true),
+ new Option("No", false));
+ }
+
+ public static List<Option> getMqttSelection() {
+ return Arrays.asList(
+ new Option("Yes", true),
+ new Option("No", false));
+ }
+
+ public static StaticPropertyAlternative getClientCertAccess() {
+ var group = StaticProperties.group(
+ Labels.withId(CLIENT_CERT_GROUP),
+ StaticProperties.stringFreeTextProperty(Labels.withId(CLIENTCERT),
true, false),
+ StaticProperties.secretValue(Labels.withId(CLIENTKEY)));
+ group.setHorizontalRendering(false);
+ return Alternatives.from(Labels.withId(CLIENT_CERT_ACCESS), group);
+
+ }
+
public static MqttConfig getMqttConfig(IParameterExtractor extractor) {
return getMqttConfig(extractor, null);
}
+ public static String getProtocol(String brokeruri) {
+ String protocol = null;
+
+ try {
+ URI uri = new URI(brokeruri);
+ protocol = uri.getScheme();
+ } catch (URISyntaxException e) {
+
+ }
+
+ return protocol;
+ }
+
+ public static boolean tlsEnabled(String protocol) {
+ if (protocol == null) {
+ return false;
+ }
+ String proto = protocol.toLowerCase();
+ return proto.equals("ssl") || proto.equals("tls") || proto.equals("mqtts");
+ }
+
public static MqttConfig getMqttConfig(IParameterExtractor extractor, String
topicInput) {
MqttConfig mqttConfig;
String brokerUrl = extractor.singleValueParameter(BROKER_URL,
String.class);
+ String protocol = getProtocol(brokerUrl);
+ boolean tlsEnabled = tlsEnabled(protocol);
String topic;
if (topicInput == null) {
@@ -88,13 +225,92 @@ public class MqttConnectUtils {
if (selectedAlternative.equals(ANONYMOUS_ACCESS)) {
mqttConfig = new MqttConfig(brokerUrl, topic);
+ if (tlsEnabled) {
+ mqttConfig = new MqttConfig(brokerUrl, topic, true);
+ }
+ } else if (selectedAlternative.equals(CLIENT_CERT_ACCESS)) {
+
+ String clientcert = extractor.singleValueParameter(CLIENTCERT,
String.class);
+ String clientkey = extractor.secretValue(CLIENTKEY);
+ // TWO way auth so TLS needs to be enabled
+ mqttConfig = new MqttConfig(brokerUrl, topic, true, clientcert,
clientkey);
+
} else {
String username = extractor.singleValueParameter(USERNAME, String.class);
String password = extractor.secretValue(PASSWORD);
- mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
+ if (tlsEnabled) {
+ mqttConfig = new MqttConfig(brokerUrl, topic, username, password,
true);
+ } else {
+ mqttConfig = new MqttConfig(brokerUrl, topic, username, password);
+ }
}
return mqttConfig;
}
+ public static QoS extractQoSFromString(String s) {
+ int qos = Integer.parseInt(s.replaceAll("\\D+", ""));
+ switch (qos) {
+ case 0:
+ return QoS.AT_MOST_ONCE;
+ case 1:
+ return QoS.AT_LEAST_ONCE;
+ case 2:
+ return QoS.EXACTLY_ONCE;
+ }
+ throw new SpRuntimeException("Could not retrieve QoS level: QoS " + qos);
+ }
+
+
+ public static boolean extractBoolean(String s) {
+ switch (s) {
+ case "Yes":
+ return true;
+ case "No":
+ return false;
+ }
+ throw new SpRuntimeException("Could not map string value to boolean: " +
s);
+ }
+
+ public static long fromSecToMs(Long value) {
+ return value * 1000;
+ }
+
+
+ public static MqttConfig extractDataSinkParams(IParameterExtractor
extractor) {
+
+
+ MqttConfig mqttConfig = getMqttConfig(extractor);
+
+ mqttConfig.setQos(MqttConnectUtils.extractQoSFromString(
+ extractor.selectedSingleValue(QOS_LEVEL_KEY, String.class)));
+
+ mqttConfig.setClientId(UUID.randomUUID().toString());
+
+ mqttConfig.setReconnectDelayMaxInMs(MqttConnectUtils
+ .fromSecToMs(extractor.singleValueParameter(RECONNECT_PERIOD_IN_SEC,
Long.class)));
+
+
mqttConfig.setKeepAliveInSec(extractor.singleValueParameter(KEEP_ALIVE_IN_SEC,
Short.class));
+
+ mqttConfig
+
.setCleanSession(MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(CLEAN_SESSION_KEY,
String.class)));
+
mqttConfig.setRetain(MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(RETAIN,
String.class)));
+
+ boolean isCompliant =
MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(MQTT_COMPLIANT,
String.class));
+ if (isCompliant) {
+ mqttConfig.setMqttProtocolVersion("3.1.1");
+ }
+
+
+ String willMode = extractor.selectedAlternativeInternalId(WILL_MODE);
+ if (willMode.equals(WILL_ALTERNATIVE)) {
+ mqttConfig.setLastWill(true);
+ mqttConfig.setWillTopic(extractor.singleValueParameter(WILL_TOPIC,
String.class));
+ mqttConfig.setWillMessage(extractor.singleValueParameter(WILL_MESSAGE,
String.class));
+
mqttConfig.setWillQoS(MqttConnectUtils.extractQoSFromString(extractor.selectedSingleValue(WILL_QOS,
String.class)));
+
mqttConfig.setWillRetain(MqttConnectUtils.extractBoolean(extractor.selectedSingleValue(WILL_RETAIN,
String.class)));
+ }
+
+ return mqttConfig;
+ }
}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
index db81aab8ba..19895b80f6 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
@@ -24,63 +24,64 @@ import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class MqttConsumer implements Runnable {
+public class MqttConsumer extends MqttBase implements Runnable {
- private final InternalEventProcessor<byte[]> consumer;
- private boolean running;
- private int maxElementsToReceive = -1;
- private int messageCount = 0;
+ private final InternalEventProcessor<byte[]> consumer;
+ private boolean running;
+ private int maxElementsToReceive = -1;
+ private int messageCount = 0;
- private final MqttConfig mqttConfig;
+ private static final Logger LOG =
LoggerFactory.getLogger(MqttConsumer.class);
- public MqttConsumer(MqttConfig mqttConfig,
- InternalEventProcessor<byte[]> consumer) {
- this.mqttConfig = mqttConfig;
- this.consumer = consumer;
- }
+ public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]>
consumer) {
+ super(mqttConfig);
+ this.consumer = consumer;
+ }
- public MqttConsumer(MqttConfig mqttConfig,
- InternalEventProcessor<byte[]> consumer,
- int maxElementsToReceive) {
- this(mqttConfig, consumer);
- this.maxElementsToReceive = maxElementsToReceive;
- }
+ public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]>
consumer, int maxElementsToReceive) {
+ this(mqttConfig, consumer);
+ this.maxElementsToReceive = maxElementsToReceive;
+ }
- @Override
- public void run() {
- this.running = true;
- MQTT mqtt = new MQTT();
- try {
- mqtt.setHost(mqttConfig.getUrl());
- mqtt.setConnectAttemptsMax(1);
- if (mqttConfig.getAuthenticated()) {
- mqtt.setUserName(mqttConfig.getUsername());
- mqtt.setPassword(mqttConfig.getPassword());
- }
- BlockingConnection connection = mqtt.blockingConnection();
- connection.connect();
- Topic[] topics = {new Topic(mqttConfig.getTopic(), QoS.AT_LEAST_ONCE)};
- byte[] qoses = connection.subscribe(topics);
+ @Override
+ public void run() {
+ this.running = true;
+ try {
+ MQTT mqtt = super.setupMqttClient();
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ subscribeToTopic(connection);
+ processMessages(connection);
+ connection.disconnect();
+ } catch (Exception e) {
+ LOG.error("Error in MQTT consumer: ", e);
+ throw new RuntimeException("Error when receiving data from MQTT",
e);
+ }
+ }
- while (running && ((maxElementsToReceive == -1) || (this.messageCount <=
maxElementsToReceive))) {
- Message message = connection.receive();
- byte[] payload = message.getPayload();
- consumer.onEvent(payload);
- message.ack();
- this.messageCount++;
- }
- connection.disconnect();
- } catch (Exception e) {
- throw new RuntimeException("Error when receiving data from MQTT", e);
+ private void processMessages(BlockingConnection connection) throws
Exception {
+ while (running && (maxElementsToReceive == -1 || messageCount <
maxElementsToReceive)) {
+ Message message = connection.receive();
+ byte[] payload = message.getPayload();
+ consumer.onEvent(payload);
+ message.ack();
+ messageCount++;
+ }
}
- }
- public void close() {
- this.running = false;
- }
+ private void subscribeToTopic(BlockingConnection connection) throws
Exception {
+ Topic[] topics = { new Topic(super.mqttConfig.getTopic(),
QoS.AT_LEAST_ONCE) };
+ connection.subscribe(topics);
+ }
- public Integer getMessageCount() {
- return messageCount;
- }
-}
+ public void close() {
+ this.running = false;
+ }
+
+ public Integer getMessageCount() {
+ return messageCount;
+ }
+}
\ No newline at end of file
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
new file mode 100644
index 0000000000..b3c8e2d0de
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.shared;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
+import org.apache.streampipes.model.runtime.Event;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.net.URI;
+
+
+public class MqttPublisher extends MqttBase {
+
+ private URI uri;
+ private MQTT mqtt;
+ private BlockingConnection conn;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MqttPublisher.class);
+
+ public MqttPublisher(IDataSinkParameters params) {
+
+ super(MqttConnectUtils.extractDataSinkParams(params.extractor()));
+ try {
+ this.mqtt = super.setupMqttClient();
+
+ } catch (Exception e) {
+ LOG.error("Error in MQTT consumer: ", e);
+ throw new RuntimeException("Error when receiving data from MQTT",
e);
+ }
+
+ }
+
+ /**
+ * Start blocking connection to MQTT broker.
+ */
+ public void connect() {
+ try {
+ this.conn = mqtt.blockingConnection();
+ this.conn.connect();
+ } catch (Exception e) {
+ throw new SpRuntimeException("Could not connect to MQTT broker: "
+ + uri.toString() + ", " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Publish received event to MQTT broker.
+ *
+ * @param event event to be published
+ */
+ public void publish(Event event) {
+ JsonDataFormatDefinition dataFormatDefinition = new
JsonDataFormatDefinition();
+ byte[] payload = new
String(dataFormatDefinition.fromMap(event.getRaw())).getBytes();
+ try {
+ this.conn.publish(super.mqttConfig.getTopic(), payload,
super.mqttConfig.getQos(), super.mqttConfig.isRetain());
+ } catch (Exception e) {
+ throw new SpRuntimeException("Could not publish to MQTT broker: "
+ + uri.toString() + ", " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Disconnect from MQTT broker.
+ */
+ public void disconnect() {
+ try {
+ if (this.conn.isConnected()) {
+ this.conn.disconnect();
+ }
+ } catch (Exception e) {
+ throw new SpRuntimeException("Could not disconnect from MQTT broker: "
+ + uri.toString() + ", " + e.getMessage(), e);
+ }
+ }
+
+}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
index 273e23ddf0..f6c509ab6e 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/MqttPublisherSink.java
@@ -22,136 +22,74 @@ import
org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import
org.apache.streampipes.extensions.connectors.mqtt.sink.common.MqttClient;
+import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttPublisher;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
-import java.util.Arrays;
-
public class MqttPublisherSink implements IStreamPipesDataSink {
+ public static final String ID =
"org.apache.streampipes.sinks.brokers.jvm.mqtt";
+
+ private static final int DEFAULT_MQTT_PORT = 1883;
+ private static final int DEFAULT_RECONNECT_PERIOD = 30;
+ private static final int DEFAULT_KEEP_ALIVE = 30;
- private static final int DEFAULT_MQTT_PORT = 1883;
- private static final int DEFAULT_RECONNECT_PERIOD = 30;
- private static final int DEFAULT_KEEP_ALIVE = 30;
- public static final String TOPIC = "topic";
- public static final String HOST = "host";
- public static final String PORT = "port";
- public static final String AUTH_MODE = "auth-mode";
- public static final String NO_AUTH_ALTERNATIVE = "no-auth-alternative";
- public static final String AUTH_ALTERNATIVE = "basic-auth-alternative";
- public static final String USERNAME_GROUP = "username-group";
- public static final String USERNAME = "username";
- public static final String PASSWORD = "password";
- public static final String QOS_LEVEL_KEY = "qos-level";
- public static final String CLEAN_SESSION_KEY = "clean-session";
- public static final String WILL_RETAIN = "will-retain";
- public static final String ENCRYPTION_MODE = "encryption-mode";
- public static final String RECONNECT_PERIOD_IN_SEC = "reconnect-period";
- public static final String WILL_MODE = "lwt-mode";
- public static final String NO_WILL_ALTERNATIVE = "no-lwt-alternative";
- public static final String WILL_ALTERNATIVE = "lwt-alternative";
- public static final String WILL_GROUP = "lwt-group";
- public static final String WILL_TOPIC = "lwt-topic";
- public static final String WILL_MESSAGE = "lwt-message";
- public static final String WILL_QOS = "lwt-qos-level";
- public static final String RETAIN = "retain";
- public static final String KEEP_ALIVE_IN_SEC = "keep-alive";
- public static final String MQTT_COMPLIANT = "mqtt-version-compliant";
+ private MqttPublisher mqttClient;
- private MqttClient mqttClient;
- @Override
- public IDataSinkConfiguration declareConfig() {
- return DataSinkConfiguration.create(
- MqttPublisherSink::new,
-
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.mqtt", 0)
- .category(DataSinkType.MESSAGING)
- .withLocales(Locales.EN)
- .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
- .requiredStream(StreamRequirementsBuilder.any())
- .requiredTextParameter(Labels.withId(TOPIC))
- .requiredTextParameter(Labels.withId(HOST))
- .requiredIntegerParameter(Labels.withId(PORT), DEFAULT_MQTT_PORT)
- .requiredAlternatives(
- Labels.withId(AUTH_MODE),
- Alternatives.from(Labels.withId(NO_AUTH_ALTERNATIVE), true),
- Alternatives.from(Labels.withId(AUTH_ALTERNATIVE),
- StaticProperties.group(Labels.withId(USERNAME_GROUP),
-
StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME)),
-
StaticProperties.secretValue(Labels.withId(PASSWORD)))))
- .requiredSingleValueSelection(
- Labels.withId(ENCRYPTION_MODE),
- Arrays.asList(
- new Option("TCP", true),
- // SSL not yet supported
- new Option("SSL/TLS", false)))
- .requiredSingleValueSelection(
- Labels.withId(QOS_LEVEL_KEY),
- Arrays.asList(
- new Option("0 - at-most-once", false),
- new Option("1 - at-least-once", true),
- new Option("2 - exactly-once", false)))
- .requiredSingleValueSelection(
- Labels.withId(RETAIN),
- Arrays.asList(
- new Option("Yes", false),
- new Option("No", true)))
- .requiredSingleValueSelection(
- Labels.withId(CLEAN_SESSION_KEY),
- Arrays.asList(
- new Option("Yes", true),
- new Option("No", false)))
- .requiredIntegerParameter(Labels.withId(RECONNECT_PERIOD_IN_SEC),
DEFAULT_RECONNECT_PERIOD)
- .requiredIntegerParameter(Labels.withId(KEEP_ALIVE_IN_SEC),
DEFAULT_KEEP_ALIVE)
- .requiredSingleValueSelection(
- Labels.withId(MQTT_COMPLIANT),
- Arrays.asList(
- new Option("Yes", true),
- new Option("No", false)))
- .requiredAlternatives(
- Labels.withId(WILL_MODE),
- Alternatives.from(Labels.withId(NO_WILL_ALTERNATIVE), true),
- Alternatives.from(Labels.withId(WILL_ALTERNATIVE),
- StaticProperties.group(Labels.withId(WILL_GROUP),
-
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_TOPIC)),
-
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_MESSAGE)),
-
StaticProperties.singleValueSelection(Labels.withId(WILL_RETAIN),
- Arrays.asList(
- new Option("Yes", false),
- new Option("No", true))),
- StaticProperties.singleValueSelection(
- Labels.withId(WILL_QOS),
- Arrays.asList(
- new Option("0 - at-most-once", true),
- new Option("1 - at-least-once", false),
- new Option("2 - exactly-once", false))))))
- .build()
- );
- }
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ MqttPublisherSink::new,
+
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.mqtt", 1)
+ .category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .requiredStream(StreamRequirementsBuilder.any())
+
.requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
+ .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAnonymousAccess(),
+ MqttConnectUtils.getUsernameAccess(),
MqttConnectUtils.getClientCertAccess())
+ .requiredTextParameter(MqttConnectUtils.getTopicLabel())
+ .requiredSingleValueSelection(
+ MqttConnectUtils.getQosLevelLabel(),
+ MqttConnectUtils.getQOSLevelSelection())
+ .requiredSingleValueSelection(
+ MqttConnectUtils.getRetainLabel(),
+ MqttConnectUtils.getRetainSelection())
+ .requiredSingleValueSelection(
+ MqttConnectUtils.getCleanSessionLabel(),
+ MqttConnectUtils.getCleanSessionSelection())
+
.requiredIntegerParameter(MqttConnectUtils.getReconnectPeriodLabel(),
DEFAULT_RECONNECT_PERIOD)
+
.requiredIntegerParameter(MqttConnectUtils.getKeepAliveLabel(),
DEFAULT_KEEP_ALIVE)
+ .requiredSingleValueSelection(
+ MqttConnectUtils.getMqttComplient(),
+ MqttConnectUtils.getMqttSelection())
+ .requiredAlternatives(
+ MqttConnectUtils.getWillModeLabel(),
+ MqttConnectUtils.getNoWillAlternative(),
+ MqttConnectUtils.getWillAlternative())
+ .build());
+ }
- @Override
- public void onPipelineStarted(IDataSinkParameters params,
EventSinkRuntimeContext runtimeContext) {
- this.mqttClient = new MqttClient(params);
- this.mqttClient.connect();
- }
+ @Override
+ public void onPipelineStarted(IDataSinkParameters params,
EventSinkRuntimeContext runtimeContext) {
+ this.mqttClient = new MqttPublisher(params);
+ this.mqttClient.connect();
+ }
- @Override
- public void onEvent(Event event) throws SpRuntimeException {
- this.mqttClient.publish(event);
- }
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ this.mqttClient.publish(event);
+ }
- @Override
- public void onPipelineStopped() {
- this.mqttClient.disconnect();
- }
+ @Override
+ public void onPipelineStopped() {
+ this.mqttClient.disconnect();
+ }
}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java
deleted file mode 100644
index 21b5e786df..0000000000
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttClient.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.extensions.connectors.mqtt.sink.common;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.JsonDataFormatDefinition;
-import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-import org.apache.streampipes.model.runtime.Event;
-
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-
-import java.net.URI;
-
-public class MqttClient {
-
- private final MqttOptions options;
- private URI uri;
- private MQTT mqtt;
- private BlockingConnection conn;
-
- public MqttClient(IDataSinkParameters params) {
- this.options = new MqttOptions(params);
- this.createMqttClient();
- }
-
- /**
- * Create new MQTT client
- */
- public void createMqttClient() {
- this.mqtt = new MQTT();
- this.uri = MqttUtils.makeMqttServerUri(options.getProtocol(),
options.getHost(), options.getPort());
- try {
- /**
- * Sets the url for connecting to the MQTT broker, e.g. {@code:
tcp://localhost:1883}.
- */
- mqtt.setHost(uri);
-
- // authentication
- if (options.isBasicAuth()) {
- /**
- * The username for authenticated sessions.
- */
- mqtt.setUserName(options.getUsername());
- /**
- * The password for authenticated sessions.
- */
- mqtt.setPassword(options.getPassword());
- }
-
- /**
- * The client id used when connecting to the MQTT broker.
- */
- mqtt.setClientId(options.getClientId());
-
- /**
- * Set to false if you want the MQTT server to persist topic
subscriptions and ack positions across
- * client sessions. Defaults to true.
- */
- mqtt.setCleanSession(options.isCleanSession());
-
- /**
- * The maximum amount of time in ms to wait between reconnect attempts.
Defaults to 30,000.
- */
- mqtt.setReconnectDelayMax(options.getReconnectDelayMaxInMs());
-
- /**
- * Configures the Keep Alive timer in seconds. Defines the maximum time
interval between messages
- * received from a client. It enables the server to detect that the
network connection to a client has
- * dropped, without having to wait for the long TCP/IP timeout.
- */
- mqtt.setKeepAlive(options.getKeepAliveInSec());
-
- /**
- * Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the
3.1 protocol version.
- */
- mqtt.setVersion(options.getMqttProtocolVersion());
-
- // last will and testament options
- if (options.isLastWill()) {
- /**
- * If set the server will publish the client's Will message to the
specified topics if the client has
- * an unexpected disconnection.
- */
- mqtt.setWillTopic(options.getWillTopic());
-
- /**
- * Sets the quality of service to use for the Will message. Defaults
to QoS.AT_MOST_ONCE.
- */
- mqtt.setWillQos(options.getWillQoS());
-
- /**
- * The Will message to send. Defaults to a zero length message.
- */
- mqtt.setWillMessage(options.getWillMessage());
-
- /**
- * Set to true if you want the Will to be published with the retain
option.
- */
- mqtt.setWillRetain(options.getWillRetain());
- }
- } catch (Exception e) {
- throw new SpRuntimeException("Failed to initialize MQTT Client: " +
e.getMessage(), e);
- }
- }
-
- /**
- * Start blocking connection to MQTT broker.
- */
- public void connect() {
- try {
- this.conn = mqtt.blockingConnection();
- this.conn.connect();
- } catch (Exception e) {
- throw new SpRuntimeException("Could not connect to MQTT broker: "
- + uri.toString() + ", " + e.getMessage(), e);
- }
- }
-
- /**
- * Publish received event to MQTT broker.
- *
- * @param event event to be published
- */
- public void publish(Event event) {
- JsonDataFormatDefinition dataFormatDefinition = new
JsonDataFormatDefinition();
- byte[] payload = new
String(dataFormatDefinition.fromMap(event.getRaw())).getBytes();
- try {
- this.conn.publish(options.getTopic(), payload, options.getQos(),
options.isRetain());
- } catch (Exception e) {
- throw new SpRuntimeException("Could not publish to MQTT broker: "
- + uri.toString() + ", " + e.getMessage(), e);
- }
- }
-
- /**
- * Disconnect from MQTT broker.
- */
- public void disconnect() {
- try {
- if (this.conn.isConnected()) {
- this.conn.disconnect();
- }
- } catch (Exception e) {
- throw new SpRuntimeException("Could not disconnect from MQTT broker: "
- + uri.toString() + ", " + e.getMessage(), e);
- }
- }
-
-}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttOptions.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttOptions.java
deleted file mode 100644
index d6823c843c..0000000000
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttOptions.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.extensions.connectors.mqtt.sink.common;
-
-import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
-
-import org.fusesource.mqtt.client.QoS;
-
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.AUTH_ALTERNATIVE;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.AUTH_MODE;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.CLEAN_SESSION_KEY;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.ENCRYPTION_MODE;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.HOST;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.KEEP_ALIVE_IN_SEC;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.MQTT_COMPLIANT;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.PASSWORD;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.PORT;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.QOS_LEVEL_KEY;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.RECONNECT_PERIOD_IN_SEC;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.RETAIN;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.TOPIC;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.USERNAME;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_ALTERNATIVE;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_MESSAGE;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_MODE;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_QOS;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_RETAIN;
-import static
org.apache.streampipes.extensions.connectors.mqtt.sink.MqttPublisherSink.WILL_TOPIC;
-
-public class MqttOptions {
-
- private final String clientId;
- private final String host;
- private final int port;
- private final String topic;
- private final String protocol;
- private final QoS qos;
- private final long reconnectDelayMaxInMs;
- private final boolean cleanSession;
- private final boolean retain;
- private final short keepAliveInSec;
-
- private String username = "";
- private String password = "";
- private boolean isBasicAuth = false;
- private boolean isLastWill = false;
- private QoS willQoS = QoS.AT_MOST_ONCE;
- private Boolean willRetain = false;
- private String willTopic = "";
- private String willMessage = "";
- private String mqttProtocolVersion = "3.1";
-
- public MqttOptions(IDataSinkParameters params) {
- var extract = params.extractor();
-
- this.clientId =
MqttUtils.runningInstanceId(params.getModel().getElementId());
- this.topic = extract.singleValueParameter(TOPIC, String.class);
- this.host = extract.singleValueParameter(HOST, String.class);
- this.port = extract.singleValueParameter(PORT, Integer.class);
- this.protocol = extract.selectedSingleValue(ENCRYPTION_MODE, String.class);
-
- this.qos =
MqttUtils.extractQoSFromString(extract.selectedSingleValue(QOS_LEVEL_KEY,
String.class));
- this.reconnectDelayMaxInMs =
-
MqttUtils.fromSecToMs(extract.singleValueParameter(RECONNECT_PERIOD_IN_SEC,
Long.class));
- this.keepAliveInSec = extract.singleValueParameter(KEEP_ALIVE_IN_SEC,
Short.class);
- this.cleanSession =
MqttUtils.extractBoolean(extract.selectedSingleValue(CLEAN_SESSION_KEY,
String.class));
- this.retain = MqttUtils.extractBoolean(extract.selectedSingleValue(RETAIN,
String.class));
-
- boolean isCompliant =
MqttUtils.extractBoolean(extract.selectedSingleValue(MQTT_COMPLIANT,
String.class));
- if (isCompliant) {
- this.mqttProtocolVersion = "3.1.1";
- }
-
- String accessMode = extract.selectedAlternativeInternalId(AUTH_MODE);
- if (accessMode.equals(AUTH_ALTERNATIVE)) {
- this.isBasicAuth = true;
- this.username = extract.singleValueParameter(USERNAME, String.class);
- this.password = extract.secretValue(PASSWORD);
- }
-
- String willMode = extract.selectedAlternativeInternalId(WILL_MODE);
- if (willMode.equals(WILL_ALTERNATIVE)) {
- this.isLastWill = true;
- this.willTopic = extract.singleValueParameter(WILL_TOPIC, String.class);
- this.willMessage = extract.singleValueParameter(WILL_MESSAGE,
String.class);
- this.willQoS =
MqttUtils.extractQoSFromString(extract.selectedSingleValue(WILL_QOS,
String.class));
- this.willRetain =
MqttUtils.extractBoolean(extract.selectedSingleValue(WILL_RETAIN,
String.class));
- }
- }
-
- public String getClientId() {
- return clientId;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public String getProtocol() {
- return protocol;
- }
-
- public QoS getQos() {
- return qos;
- }
-
- public long getReconnectDelayMaxInMs() {
- return reconnectDelayMaxInMs;
- }
-
- public boolean isCleanSession() {
- return cleanSession;
- }
-
- public boolean isRetain() {
- return retain;
- }
-
- public short getKeepAliveInSec() {
- return keepAliveInSec;
- }
-
- public String getUsername() {
- return username;
- }
-
- public String getPassword() {
- return password;
- }
-
- public boolean isBasicAuth() {
- return isBasicAuth;
- }
-
- public boolean isLastWill() {
- return isLastWill;
- }
-
- public QoS getWillQoS() {
- return willQoS;
- }
-
- public Boolean getWillRetain() {
- return willRetain;
- }
-
- public String getWillTopic() {
- return willTopic;
- }
-
- public String getWillMessage() {
- return willMessage;
- }
-
- public String getMqttProtocolVersion() {
- return mqttProtocolVersion;
- }
-}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttUtils.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttUtils.java
deleted file mode 100644
index 9c3233df0c..0000000000
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/sink/common/MqttUtils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.extensions.connectors.mqtt.sink.common;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-
-import org.fusesource.mqtt.client.QoS;
-
-import java.net.URI;
-
-public class MqttUtils {
-
- private static final String TCP = "TCP";
- private static final String SSL_TLS = "SSL/TLS";
- private static final String TCP_PROTOCOL = "tcp://";
- private static final String SSL_PROTOCOL = "ssl://";
- private static final String COLON = ":";
-
- public MqttUtils() {
- }
-
- // remove non-digits
- public static QoS extractQoSFromString(String s) {
- int qos = Integer.parseInt(s.replaceAll("\\D+", ""));
- switch (qos) {
- case 0:
- return QoS.AT_MOST_ONCE;
- case 1:
- return QoS.AT_LEAST_ONCE;
- case 2:
- return QoS.EXACTLY_ONCE;
- }
- throw new SpRuntimeException("Could not retrieve QoS level: QoS " + qos);
- }
-
- public static String runningInstanceId(String elementId) {
- return elementId.substring(elementId.lastIndexOf(".") + 1);
- }
-
- public static URI makeMqttServerUri(String protocol, String host, int port) {
- if (protocol.contains(TCP)) {
- return URI.create(TCP_PROTOCOL + host + COLON + port);
- } else if (protocol.contains(SSL_TLS)) {
- return URI.create(SSL_PROTOCOL + host + COLON + port);
- }
- throw new SpRuntimeException("Connection protocol not supported! Use
tcp:// or ssl://");
- }
-
- public static boolean extractBoolean(String s) {
- switch (s) {
- case "Yes":
- return true;
- case "No":
- return false;
- }
- throw new SpRuntimeException("Could not map string value to boolean: " +
s);
- }
-
- public static long fromSecToMs(Long value) {
- return value * 1000;
- }
-}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
index ec5f30804a..b71d0247a9 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/documentation.md
@@ -37,7 +37,7 @@ The MQTT protocol adapter enables StreamPipes to consume
messages from an MQTT b
## Configuration
### Broker Settings
-* **Broker URL**: The URL of the MQTT broker (e.g.,
tcp://test-server.com:1883). The protocol (tcp://) and port are required.
+* **Broker URL**: The URL of the MQTT broker (e.g., tcp://test-server.com:1883
for tcp or ssl://test-server.com:8883 for TLS). The protocol (tcp://) or
(ssl://) and port are required.
### Topic Settings
* **Topic**: The MQTT topic to subscribe to (e.g., test/topic)
@@ -48,6 +48,9 @@ The MQTT protocol adapter enables StreamPipes to consume
messages from an MQTT b
* **Username/Password**: Basic authentication with username and password
* **Username**: The username for authentication
* **Password**: The password for authentication
+ * **Client Certificate**
+ * **Certificate PEM**: Public Key in PEM format
+ * **Private Key PEM**: Private (RSA) Key in PEM format (without password !)
***
@@ -61,7 +64,7 @@ The MQTT protocol adapter enables StreamPipes to consume
messages from an MQTT b
* **Security**:
* Basic authentication support
* TCP protocol support
- * SSL/TLS support (coming soon)
+ * SSL/TLS support
* **Protocol Support**:
* MQTT 3.1
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
index 53b1da5689..3ba2f8dce8 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.mqtt/strings.en
@@ -19,8 +19,8 @@
org.apache.streampipes.connect.iiot.protocol.stream.mqtt.title=MQTT
org.apache.streampipes.connect.iiot.protocol.stream.mqtt.description=Consumes
messages from a broker using the MQTT protocol
-access-mode.title=Access Mode
-access-mode.description=0
+access-mode.title=User Authentication
+access-mode.description=Choose an authentication method for the user
anonymous-alternative.title=Unauthenticated
anonymous-alternative.description=
@@ -31,14 +31,23 @@ username-alternative.description=
username-group.title=User Group
username-group.description=
+client-cert-alternative.title= Client Certificate
+client-cert-alternative.description=
+
username.title=Username
username.description=
password.title=Password
password.description=
+clientcert.title=Certificate PEM
+clientcert.description = Public key in PEM format
+
+clientkey.title=Private Key PEM
+clientkey.description=Private key in PEM format
+
broker_url.title=Broker URL
-broker_url.description=Example: tcp://test-server.com:1883 (Protocol required.
Port required)"
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required.
Port required), with TLS ssl://test-server.com:8883 (Protocol required. Port
required)"
topic.title=Topic
topic.description=Example: test/topic
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
index 431612c40a..c27528b4fc 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
@@ -18,12 +18,8 @@
org.apache.streampipes.sinks.brokers.jvm.mqtt.title=MQTT Publisher
org.apache.streampipes.sinks.brokers.jvm.mqtt.description=Publishes events to
a MQTT topic
-host.title=Host
-host.description=IP address or DNS of MQTT broker
-
-port.title=Port
-port.description=Port of MQTT broker (default 1883)
-
+broker_url.title=Broker URL
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required.
Port required), with TLS ssl://test-server.com:8883 (Protocol required. Port
required)"
topic.title=Topic
topic.description=Enter MQTT topic
@@ -33,17 +29,32 @@ username.description=The username to authenticate with the
broker
password.title=Password
password.description=The password to authenticate with the broker
-auth-mode.title=Authentication
-auth-mode.description=Unauthenticated or basic auth
+clientcert.title=Certificate PEM
+clientcert.description = Public key in PEM format
+
+clientkey.title=Private Key PEM
+clientkey.description=Private key in PEM format
+
+access-mode.title=User Authentication
+access-mode.description=Choose an authentication method for the user
+
-no-auth-alternative.title=Unauthenticated
-no-auth-alternative.description=No authentication
+anonymous-alternative.title=Unauthenticated
+anonymous-alternative.description=
-basic-auth-alternative.title=Basic auth
-basic-auth-alternative.description=Username and password
+username-alternative.title=Username/Password
+username-alternative.description=
+
+username-group.title=User Group
+username-group.description=
+
+client-cert-alternative.title= Client Certificate
+client-cert-alternative.description=
username-group.title=Username and password
+cert-group.title=Client Certificate
+
qos-level.title=QoS
qos-level.description=Quality of Service level (default: at-least-once)
@@ -53,9 +64,6 @@ clean-session.description=Deselect to persist topic
subscriptions & ack position
will-retain.title=Retain will message?
will-retain.description=Select if you want the Will to be published with the
retain option (default: No)
-encryption-mode.title=Encryption
-encryption-mode.description=Select protocol. TCP (plaintext), SSL/TLS
(encrypted, not yet supported!)
-
reconnect-period.title=Reconnect period (seconds)
reconnect-period.description=Amount of time to wait between reconnect attempts
(default: 30s)
diff --git a/streampipes-integration-tests/pom.xml
b/streampipes-integration-tests/pom.xml
index 2430cb3fed..dd8a7fc524 100644
--- a/streampipes-integration-tests/pom.xml
+++ b/streampipes-integration-tests/pom.xml
@@ -33,7 +33,6 @@
<properties>
<owasp.check.skip>true</owasp.check.skip>
<maven.deploy.skip>false</maven.deploy.skip>
-
<pulsar.version>3.2.2</pulsar.version>
</properties>
@@ -134,13 +133,67 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <propertyExpansion>
-
checkstyle.config.base.path=${project.parent.basedir}/tools/maven
- </propertyExpansion>
+ <propertyExpansion>
+ checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+ </propertyExpansion>
+ </configuration>
+ </plugin>
+
+ <!-- Surefire Plugin for Test Execution -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.2.5</version>
+ <configuration>
+ <skipTests>true</skipTests> <!-- Skip tests during mvn clean package
-->
</configuration>
</plugin>
</plugins>
</build>
+ <profiles>
+ <!-- Self-Signed Certificate Profile -->
+ <profile>
+ <id>selfsigned</id>
+ <activation>
+ <activeByDefault>true</activeByDefault> <!-- You can change this to
false if you don't want it to be active by default -->
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.2.5</version>
+ <configuration>
+ <skipTests>false</skipTests>
+ <environmentVariables>
+
<SP_SECURITY_ALLOW_SELFSIGNED>true</SP_SECURITY_ALLOW_SELFSIGNED>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
-</project>
+ <!-- Keystore Profile -->
+ <profile>
+ <id>keystore</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.2.5</version>
+ <configuration>
+ <skipTests>false</skipTests>
+ <environmentVariables>
+
<SP_SECURITY_KEYSTORE_FILENAME>src/test/resources/cacerts.pfx</SP_SECURITY_KEYSTORE_FILENAME>
+
<SP_SECURITY_KEYSTORE_PASSWORD>changeit</SP_SECURITY_KEYSTORE_PASSWORD>
+ </environmentVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
\ No newline at end of file
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
index 63bdc9f1c5..932d54c0b9 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdapterTesterBase.java
@@ -38,6 +38,7 @@ public abstract class AdapterTesterBase implements
AutoCloseable {
private int counter;
+
/**
Executes the test procedure for the adapter integration test.
This method performs the necessary actions to test the full integraion of
adapters with third party services.
@@ -55,8 +56,7 @@ public abstract class AdapterTesterBase implements
AutoCloseable {
public void run() throws Exception {
// prepare the third party docker service for the test (e.g. MQTT Broker)
startAdapterService();
-
- // generate the AdapterConfiguration for the test adapter
+ // generate the AdapterConfiguraton for the test adapter
IAdapterConfiguration adapterConfiguration = prepareAdapter();
// start the adapter instance
@@ -70,7 +70,7 @@ public abstract class AdapterTesterBase implements
AutoCloseable {
// send events to thrird party service
publishEvents(expectedEvents);
-
+
// validate that events where send correctly
validate(expectedEvents);
}
@@ -91,8 +91,11 @@ public abstract class AdapterTesterBase implements
AutoCloseable {
adapter.onAdapterStarted(extractor, (event -> {
// This collector validates that the events are sent correctly and
within the right order
+
assertTrue(Maps.difference(event,
expectedEvents.get(counter)).areEqual());
counter++;
+
+
}), null);
return adapter;
@@ -138,6 +141,8 @@ public abstract class AdapterTesterBase implements
AutoCloseable {
*/
public void validate(List<Map<String, Object>> expectedEvents) throws
InterruptedException {
int retry = 0;
+
+
while (counter != expectedEvents.size() && retry < 5) {
Thread.sleep(1000);
retry++;
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersIntegrationTest.java
similarity index 75%
rename from
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
rename to
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersIntegrationTest.java
index dad51a0991..3b311d4369 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersTest.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/AdaptersIntegrationTest.java
@@ -17,30 +17,53 @@
*/
package org.apache.streampipes.integration.adapters;
+
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
-public class AdaptersTest {
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class AdaptersIntegrationTest {
@Test
- public void testPulsarAdapter() throws Exception {
- try (PulsarAdapterTester pulsarAdapterTester = new PulsarAdapterTester()) {
- pulsarAdapterTester.run();
+ @Order(1)
+ public void testMqttAdapter() throws Exception {
+ try (MqttAdapterTester mqttAdapterTester = new MqttAdapterTester()) {
+ mqttAdapterTester.run();
}
}
+ @Test
+ @Order(2)
+ public void testMqttTLSAdapter() throws Exception {
+
+try (MqttAdapterTLSTester mqttAdapterTLSTester = new MqttAdapterTLSTester()) {
+ mqttAdapterTLSTester.run();
+ }
+ }
+
@Test
- public void testMqttAdapter() throws Exception {
- try (MqttAdapterTester mqttAdapterTester = new MqttAdapterTester()) {
- mqttAdapterTester.run();
+ @Order(3)
+ public void testPulsarAdapter() throws Exception {
+ try (PulsarAdapterTester pulsarAdapterTester = new PulsarAdapterTester()) {
+ pulsarAdapterTester.run();
}
}
+
+
+
@Test
+ @Order(4)
public void testKafkaAdapter() throws Exception {
+
try (KafkaAdapterTester kafkaAdapterTester = new KafkaAdapterTester()) {
kafkaAdapterTester.run();
}
+
+
}
}
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
new file mode 100644
index 0000000000..bb34078ca3
--- /dev/null
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.integration.adapters;
+
+import org.apache.streampipes.integration.containers.MosquittoContainer;
+import org.apache.streampipes.messaging.mqtt.MqttPublisher;
+import org.apache.streampipes.model.grounding.MqttTransportProtocol;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.List;
+import java.util.Map;
+
+public class MQTTPublisherUtils {
+
+ public static void publishEvents(MqttPublisher publisher, List<Map<String,
Object>> events) {
+ var objectMapper = new ObjectMapper();
+
+ events.forEach(event -> {
+
+ try {
+ var serializedEvent = objectMapper.writeValueAsBytes(event);
+ publisher.publish(serializedEvent);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ publisher.disconnect();
+ }
+
+ @NotNull
+ public static MqttPublisher getMqttPublisher(MosquittoContainer
mosquittoContainer, String topic) {
+ MqttTransportProtocol mqttSettings = new MqttTransportProtocol(
+ mosquittoContainer.getBrokerHost(),
+ mosquittoContainer.getBrokerPort(),
+ topic);
+ MqttPublisher publisher = new MqttPublisher(mqttSettings);
+ publisher.connect();
+ return publisher;
+ }
+
+}
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTLSTester.java
similarity index 79%
copy from
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
copy to
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTLSTester.java
index 4b94590f35..7bf210b708 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTLSTester.java
@@ -28,13 +28,9 @@ import
org.apache.streampipes.integration.containers.MosquittoDevContainer;
import org.apache.streampipes.integration.utils.Utils;
import org.apache.streampipes.manager.template.AdapterTemplateHandler;
import org.apache.streampipes.messaging.mqtt.MqttPublisher;
-import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import org.apache.streampipes.model.template.PipelineElementTemplate;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
@@ -42,7 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class MqttAdapterTester extends AdapterTesterBase {
+public class MqttAdapterTLSTester extends AdapterTesterBase {
MosquittoContainer mosquittoContainer;
@@ -66,7 +62,8 @@ public class MqttAdapterTester extends AdapterTesterBase {
List<Map<String, Object>> configs = new ArrayList<>();
configs.add(Map.of(MqttConnectUtils.TOPIC, TOPIC));
- configs.add(Map.of(MqttConnectUtils.BROKER_URL,
mosquittoContainer.getBrokerUrl()));
+ configs.add(Map.of(MqttConnectUtils.BROKER_URL,
mosquittoContainer.getBrokerUrlTLS()));
+
var template = new PipelineElementTemplate("name", "description", configs);
@@ -84,10 +81,11 @@ public class MqttAdapterTester extends AdapterTesterBase {
.get(0)
.setSelected(true);
+
// Set format to Json
((StaticPropertyAlternatives) (desc)
.getConfig()
- .get(3))
+ .get(3))
.getAlternatives()
.get(0)
.setSelected(true);
@@ -107,31 +105,14 @@ public class MqttAdapterTester extends AdapterTesterBase {
@Override
- public void publishEvents(List<Map<String, Object>> events) {
- var publisher = getMqttPublisher();
- var objectMapper = new ObjectMapper();
-
- events.forEach(event -> {
- try {
- var serializedEvent = objectMapper.writeValueAsBytes(event);
- publisher.publish(serializedEvent);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- });
-
- publisher.disconnect();
+ public void publishEvents(List<Map<String, Object>> events) {
+ var pub = getMqttPublisher();
+ MQTTPublisherUtils.publishEvents(pub, events);
}
@NotNull
private MqttPublisher getMqttPublisher() {
- MqttTransportProtocol mqttSettings = new MqttTransportProtocol(
- mosquittoContainer.getBrokerHost(),
- mosquittoContainer.getBrokerPort(),
- TOPIC);
- MqttPublisher publisher = new MqttPublisher(mqttSettings);
- publisher.connect();
- return publisher;
+ return MQTTPublisherUtils.getMqttPublisher(mosquittoContainer, TOPIC);
}
@Override
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
index 4b94590f35..23be3ea134 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MqttAdapterTester.java
@@ -28,13 +28,9 @@ import
org.apache.streampipes.integration.containers.MosquittoDevContainer;
import org.apache.streampipes.integration.utils.Utils;
import org.apache.streampipes.manager.template.AdapterTemplateHandler;
import org.apache.streampipes.messaging.mqtt.MqttPublisher;
-import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import org.apache.streampipes.model.template.PipelineElementTemplate;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
@@ -84,6 +80,7 @@ public class MqttAdapterTester extends AdapterTesterBase {
.get(0)
.setSelected(true);
+
// Set format to Json
((StaticPropertyAlternatives) (desc)
.getConfig()
@@ -105,33 +102,15 @@ public class MqttAdapterTester extends AdapterTesterBase {
return Utils.getSimpleTestEvents();
}
-
@Override
- public void publishEvents(List<Map<String, Object>> events) {
- var publisher = getMqttPublisher();
- var objectMapper = new ObjectMapper();
-
- events.forEach(event -> {
- try {
- var serializedEvent = objectMapper.writeValueAsBytes(event);
- publisher.publish(serializedEvent);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- });
-
- publisher.disconnect();
+ public void publishEvents(List<Map<String, Object>> events) {
+ var pub = getMqttPublisher();
+ MQTTPublisherUtils.publishEvents(pub, events);
}
@NotNull
private MqttPublisher getMqttPublisher() {
- MqttTransportProtocol mqttSettings = new MqttTransportProtocol(
- mosquittoContainer.getBrokerHost(),
- mosquittoContainer.getBrokerPort(),
- TOPIC);
- MqttPublisher publisher = new MqttPublisher(mqttSettings);
- publisher.connect();
- return publisher;
+ return MQTTPublisherUtils.getMqttPublisher(mosquittoContainer, TOPIC);
}
@Override
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTest.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataIntegrationTest.java
similarity index 95%
rename from
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTest.java
rename to
streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataIntegrationTest.java
index 0115b0bc3f..069947d16a 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataTest.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/client/ClientLiveDataIntegrationTest.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.integration.client;
import org.junit.jupiter.api.Test;
-public class ClientLiveDataTest {
+public class ClientLiveDataIntegrationTest {
@Test
public void testNatsClient() throws Exception {
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
index a22de39ec1..690634fed9 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/MosquittoContainer.java
@@ -20,23 +20,33 @@ package org.apache.streampipes.integration.containers;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.Wait;
public class MosquittoContainer extends GenericContainer<MosquittoContainer> {
- protected static final int MOSQUITTO_PORT = 1883;
+ protected static final Integer[] MOSQUITTO_PORTS = {1883, 8883};
public MosquittoContainer() {
super("eclipse-mosquitto:latest");
}
public void start() {
- this.waitStrategy = Wait.forLogMessage(".*listen socket on port 1883.*",
1);
- this.withExposedPorts(MOSQUITTO_PORT);
+ this.withExposedPorts(MOSQUITTO_PORTS);
this.withClasspathResourceMapping(
"mosquitto.conf",
"/mosquitto/config/mosquitto.conf",
BindMode.READ_ONLY);
+ this.withClasspathResourceMapping(
+ "mosquitto.crt",
+ "/mosquitto/config/mosquitto.crt",
+ BindMode.READ_ONLY);
+ this.withClasspathResourceMapping(
+ "mosquitto.key",
+ "/mosquitto/config/mosquitto.key",
+ BindMode.READ_ONLY);
+ this.withClasspathResourceMapping(
+ "passwd",
+ "/mosquitto/config/passwd",
+ BindMode.READ_ONLY);
super.start();
}
@@ -45,10 +55,17 @@ public class MosquittoContainer extends
GenericContainer<MosquittoContainer> {
}
public Integer getBrokerPort() {
- return getMappedPort(MOSQUITTO_PORT);
+ return getMappedPort(MOSQUITTO_PORTS[0]);
+ }
+ public Integer getBrokerTLSPort() {
+ return getMappedPort(MOSQUITTO_PORTS[1]);
}
public String getBrokerUrl() {
return "tcp://" + getBrokerHost() + ":" + getBrokerPort();
}
+
+ public String getBrokerUrlTLS() {
+ return "ssl://" + getBrokerHost() + ":" + getBrokerTLSPort();
+ }
}
diff --git a/streampipes-integration-tests/src/test/resources/cacerts.pfx
b/streampipes-integration-tests/src/test/resources/cacerts.pfx
new file mode 100644
index 0000000000..acc6512475
Binary files /dev/null and
b/streampipes-integration-tests/src/test/resources/cacerts.pfx differ
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto
copy.conf b/streampipes-integration-tests/src/test/resources/mosquitto copy.conf
new file mode 100644
index 0000000000..fb0917ee2a
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto copy.conf
@@ -0,0 +1,43 @@
+# mosquitto.conf
+# Basic listener configuration (for non-TLS)
+listener 1883
+allow_anonymous false
+
+# TLS listener (for secure MQTT connection)
+listener 8883
+cafile /mosquitto/config/mosquitto.crt
+certfile /mosquitto/config/mosquitto.crt
+keyfile /mosquitto/config/mosquitto.key
+allow_anonymous true # Optional: you can enforce authentication on TLS as well
+
+# WebSocket listener (non-TLS)
+listener 9001
+protocol websockets
+allow_anonymous false
+
+# WebSocket listener (TLS)
+listener 9002
+protocol websockets
+cafile /mosquitto/config/mosquitto.crt
+certfile /mosquitto/config/mosquitto.crt
+keyfile /mosquitto/config/mosquitto.key
+allow_anonymous true # Optional: enforce authentication for WebSockets over
TLS as well
+
+# Point to password file (for username/password authentication)
+password_file /mosquitto/config/passwd
+
+# Persistence settings (to retain message state)
+persistence true
+persistence_location /mosquitto/data/
+
+# Logging configuration
+log_dest file /mosquitto/log/mosquitto.log
+log_type error
+log_type warning
+log_type notice
+log_type information
+
+# Uncomment and adjust paths if using additional configurations (like custom
log destinations or file locations)
+#log_dest stdout
+#log_dest syslog
+#log_dest topic
\ No newline at end of file
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.conf
b/streampipes-integration-tests/src/test/resources/mosquitto.conf
index 3c4318c1ad..dc0aea6993 100644
--- a/streampipes-integration-tests/src/test/resources/mosquitto.conf
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.conf
@@ -16,6 +16,14 @@
#
#
-allow_anonymous true
listener 1883
-log_type all
+allow_anonymous true
+
+
+listener 8883
+cafile /mosquitto/config/mosquitto.crt
+certfile /mosquitto/config/mosquitto.crt
+keyfile /mosquitto/config/mosquitto.key
+allow_anonymous true
+
+password_file /mosquitto/config/passwd
\ No newline at end of file
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.crt
b/streampipes-integration-tests/src/test/resources/mosquitto.crt
new file mode 100644
index 0000000000..273c47cf21
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.crt
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDYDCCAkgCCQD+NxhAtLaXiDANBgkqhkiG9w0BAQsFADByMQswCQYDVQQGEwJV
+UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEVMBMGA1UEBwwMU2FuRnJhbmNpc2NvMRIw
+EAYDVQQKDAlNeUNvbXBhbnkxDzANBgNVBAsMBk15RGVwdDESMBAGA1UEAwwJbG9j
+YWxob3N0MB4XDTI1MTAyNzEzNTQzMFoXDTI2MTAyNzEzNTQzMFowcjELMAkGA1UE
+BhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFTATBgNVBAcMDFNhbkZyYW5jaXNj
+bzESMBAGA1UECgwJTXlDb21wYW55MQ8wDQYDVQQLDAZNeURlcHQxEjAQBgNVBAMM
+CWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAP5kU4a2
+3XWkcFYTISsgJ2x59tSzcYn05EHNT/sr3j4lKSMNFO3/wLimchuoQxb08DE/9P4h
+KkzzdWM8TiOWo2usfNcQnRUfgvjnWkSwsTHlDqrH7cRQldvWWBzbm27y+I0fu7IN
+4Lw4X1jTLRE7cy5XWkmKHWDUlFYzn6CACi0BzcXO9eYqkcncbVe4rIVGB0BThM80
+JtXuWKwDyTpJAobdCY8RKwvqIPwo265UOidGeMrsC4A5TFGh8oobCArYgIGl/QoZ
+pEpOq2NOjykJqsRKu/Kxrom1bRj2Qp8K/9jtIUeOPubqBNMeh83ojgdgfA5ZwUxq
+vEz2teJeJe8N3jMCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA624V/aM1Dyyhj/oS
++irtAZJFPAHkyaWJkiIeWMpQtY9kkYPIrO5oIdZ9oVZdBFW2rs6KvFf14FT5vTyU
+raq+FZ5UMUK53sH52KeqKbeejVZCaOG9jnys02mkhI0Gx+l3lWtrhN8xqzvcfh9g
+aYMLfh5yAJ6d5hkKwFMdJsNh/PUBhW0onRYfVq304PaDaM9Mtbzk7tbijr5KNgsB
+4vOPn09MC+L6KOSxfMB9urHeeAgtHyURIQpJjnsaG0UdSCOTenRrxqqweOeyo+Q1
+/zVkf3EIQ66CFkjxoPdjXtUeA320pZPw9R45DoWXUHwsLT7Ddj9nngm9e2Es+wGd
+5gQGIQ==
+-----END CERTIFICATE-----
diff --git a/streampipes-integration-tests/src/test/resources/mosquitto.key
b/streampipes-integration-tests/src/test/resources/mosquitto.key
new file mode 100644
index 0000000000..3b1ef10f59
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/mosquitto.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQD+ZFOGtt11pHBW
+EyErICdsefbUs3GJ9ORBzU/7K94+JSkjDRTt/8C4pnIbqEMW9PAxP/T+ISpM83Vj
+PE4jlqNrrHzXEJ0VH4L451pEsLEx5Q6qx+3EUJXb1lgc25tu8viNH7uyDeC8OF9Y
+0y0RO3MuV1pJih1g1JRWM5+ggAotAc3FzvXmKpHJ3G1XuKyFRgdAU4TPNCbV7lis
+A8k6SQKG3QmPESsL6iD8KNuuVDonRnjK7AuAOUxRofKKGwgK2ICBpf0KGaRKTqtj
+To8pCarESrvysa6JtW0Y9kKfCv/Y7SFHjj7m6gTTHofN6I4HYHwOWcFMarxM9rXi
+XiXvDd4zAgMBAAECggEAQQOI6Tvg20j8QLNA3dGo4atF7tQxZy2EDGEZWLq8YKuE
+mOWl/LFJIqe/L9xP0RUmMaADz9LQCbyKuqLV4XiFKWZ6vUgMrTJReaU+x6FUl8jP
+d8wCsxJZSka8XBuv3KoR5Zc/k+DCF7hcfcnykZ3c8PH4LCU1HuMVSfaFjDJM53U1
+HeH/S9rOB60zTSFeegx+dq5WcCUGAGz4zGhbGaM47vd0etj5JYQwMrc0aCW2L+io
+LMF75T2lbXqNvDc6o/SJQVhoFxfkVCzh1ACh/toy1R/cOuLalBsdfqDynvQd+UN2
+xiSYBzCbP/mpmNt1FS56PvAiSSKHTrO0dhsD4f3yuQKBgQD/5UvAqEtnl+2KOHiX
+6ox6Rktoveqm6LcDRZRpXSfXG0o5Z78tOs00tnNsnwbCVMnBu8XbDHS+qYW6eiFM
+6Gq50bI+hNzK/NKm8r/teDN5nXqkpraX36MtV9JSv5lcVWFcOJx/z3nUBQJKcu+P
+A7HN6K/b/UdX+c6cY3NBsYT7JQKBgQD+ft+ZmhjKBIT3SuSAde97WdpsJqHrFFB1
+N3akYl/3AbzamoM5ET3ZoG5rkczAkSIPu8cIiIf4fk6zTgf2iuFemgoRdjabN02s
+uUYTfeAOKRRkWY1aopYpC60UuaoliIG6LowKcooXVQ72x2ZnSmz7IPcrtJWTWgjT
+NI0bKE6gdwKBgQDH3lyQmeJrg2rxbrIiVfxq9MSphszkmRd44rvMoAoiJRqQQ8w6
+k5b7+RWmXX92AaukOfmL4eq9kML2p7Wi0FWr1XGXC0c49MfDxg7Kd/wcnTfRqrUr
+Ym2dWN7Z6vTp/XYSBdWWroLFazQi2irqVURnQ7s35Ff5CxCpbbP0N6daUQKBgQDo
+NLEmQID/yrHbxTzKrVDuVqTB61nv2WA0I4AgKxZulOpQ94xlxIKPkB9QDP8qcQII
+IwhOk+ykYfLDDZ6caEmL/LbVCex3ITXBNGdpH4AQy5Csoz0jhpfGKb4p2+IQTwY2
+74OdgLbY5SY7KuMXucPIO2LrQOD9SrgkpZ1eOx/KrwKBgHm/mklfnvz25ps9LFBm
+AgFBIv5n/vq26o5GK5K8sAdN43T27K9WAtlgnOz0KO1i8HSHj8jdtm9NtKSJBAMg
+ImCvK32/YlP9R6Lo77M1POC658YCI4Yd1DXGwFz/YHgVYteVWjGlAIW26f9XI+pL
+Rp0UHFAdSaXiYQDiwwCbDYLn
+-----END PRIVATE KEY-----
diff --git a/streampipes-integration-tests/src/test/resources/passwd
b/streampipes-integration-tests/src/test/resources/passwd
new file mode 100644
index 0000000000..dd3b2e2657
--- /dev/null
+++ b/streampipes-integration-tests/src/test/resources/passwd
@@ -0,0 +1 @@
+username:$7$101$Kb+Z4WgvFpg/VLZB$6TYUfv4iH0NqBLaZe5ApIOdBVDQ3Vmd0XH6/2DW0B8aHkZWGZRXy2gPZnf8K9n55uKiuMxiDIDAms+8FA1s/bQ==
diff --git
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
index 1793698efb..1ca0db7d46 100644
---
a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
+++
b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/StaticProperties.java
@@ -33,6 +33,7 @@ import
org.apache.streampipes.model.staticproperty.RuntimeResolvableGroupStaticP
import
org.apache.streampipes.model.staticproperty.RuntimeResolvableOneOfStaticProperty;
import
org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
@@ -64,6 +65,20 @@ public class StaticProperties {
return alternativesContainer;
}
+ public static SlideToggleStaticProperty toggleAlternative (Label label,
boolean defaultValue){
+
+ SlideToggleStaticProperty slideToggle = new SlideToggleStaticProperty(
+ label.getInternalId(),
+ label.getLabel(),
+ label.getDescription(),
+ defaultValue);
+
+ slideToggle.setSelected(defaultValue);
+
+ return slideToggle;
+
+ }
+
public static MappingPropertyUnary mappingPropertyUnary(Label label,
RequirementsSelector requirementsSelector,
PropertyScope
propertyScope) {
MappingPropertyUnary mp = new MappingPropertyUnary(label.getInternalId(),
label