dominikriemer commented on code in PR #3934:
URL: https://github.com/apache/streampipes/pull/3934#discussion_r2535662390


##########
streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.iiot.adapters.oi4.migration;
+
+import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class Oi4AdapterMigrationV1 implements IAdapterMigrator {
+
+
+    @Override
+    public ModelMigratorConfig config() {
+        return new ModelMigratorConfig(
+                Oi4Adapter.ID,
+                SpServiceTagPrefix.ADAPTER,
+                0,
+                1);
+
+    }
+
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Oi4AdapterMigrationV1.class);
+
+    @Override
+    public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+            IStaticPropertyExtractor extractor) throws RuntimeException { 
+
+        
+        LOG.info("Migrate Broker URL ");

Review Comment:
   We can reduce the amount of logging here.



##########
streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.shared;
+
+import org.apache.streampipes.commons.environment.Environments;
+import 
org.apache.streampipes.extensions.connectors.mqtt.security.SecurityUtils;
+
+import com.hivemq.client.mqtt.MqttClient;
+import com.hivemq.client.mqtt.MqttClientSslConfig;
+import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.UUID;
+
+public class MqttBase {
+
+    protected final MqttConfig mqttConfig;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MqttBase.class);
+
+    public MqttBase(MqttConfig mqttConfig) {
+        this.mqttConfig = mqttConfig;
+    }
+
+    protected Mqtt3AsyncClient setupMqttClient() throws Exception {
+         URI brokerUri = new URI(mqttConfig.getUrl());
+        boolean tls = tlsEnabled(brokerUri);
+
+
+        var builder = MqttClient.builder()
+                .identifier(UUID.randomUUID().toString())
+                .serverHost(brokerUri.getHost())
+                .serverPort(resolvePort(brokerUri))
+                .useMqttVersion3(); 
+
+        if (mqttConfig.getAuthenticated()) {
+            builder.simpleAuth()
+                    .username(mqttConfig.getUsername())
+                    .password(mqttConfig.getPassword().getBytes())
+                    .applySimpleAuth();
+        }
+
+        if (tls) {
+            var sslContext = configureTls();
+            builder.sslConfig(sslContext);
+        }
+
+        Mqtt3AsyncClient client = builder.buildAsync();
+
+        return client;
+    }
+
+
+    private int resolvePort(URI uri) {
+        return uri.getPort();
+        
+    }
+
+
+private static boolean tlsEnabled(URI brokerUri) {

Review Comment:
   code formatting (indentation?)



##########
pom.xml:
##########
@@ -641,10 +643,19 @@
                 <artifactId>rdf4j-rio-rdfxml</artifactId>
                 <version>${rdf4j.version}</version>
             </dependency>
+
             <dependency>
-                <groupId>org.fusesource.mqtt-client</groupId>
-                <artifactId>mqtt-client</artifactId>
+      <groupId>org.reactivestreams</groupId>
+      <artifactId>reactive-streams</artifactId>
+      <version>${reactive-streams.version}</version>
+    </dependency>
+        <dependency>
+                   <groupId>com.hivemq</groupId>
+    <artifactId>hivemq-mqtt-client</artifactId>

Review Comment:
   looks like a formatting issue



##########
streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConnectUtils.java:
##########
@@ -51,31 +77,138 @@ public static Label getTopicLabel() {
     return Labels.withId(TOPIC);
   }
 
-  public static StaticPropertyAlternative getAlternativesOne() {
+  public static Label getQosLevelLabel() {
+    return Labels.withId(QOS_LEVEL_KEY);
+  }
+
+  public static Label getRetainLabel() {
+    return Labels.withId(RETAIN);
+  }
+
+  public static Label getCleanSessionLabel() {
+    return Labels.withId(CLEAN_SESSION_KEY);
+  }
+
+  public static Label getReconnectPeriodLabel() {
+    return Labels.withId(RECONNECT_PERIOD_IN_SEC);
+  }
+
+  public static Label getKeepAliveLabel() {
+    return Labels.withId(KEEP_ALIVE_IN_SEC);
+  }
+
+  public static Label getMqttComplient() {
+    return Labels.withId(MQTT_COMPLIANT);
+  }
+
+  public static Label getWillModeLabel() {
+    return Labels.withId(WILL_MODE);
+  }
+
+  public static StaticPropertyAlternative getNoWillAlternative() {
+    return Alternatives.from(Labels.withId(NO_WILL_ALTERNATIVE), true);
+  }
+
+  public static StaticPropertyAlternative getWillAlternative() {
+    return Alternatives.from(Labels.withId(WILL_ALTERNATIVE),
+        StaticProperties.group(Labels.withId(WILL_GROUP),
+            StaticProperties.stringFreeTextProperty(Labels.withId(WILL_TOPIC)),
+            
StaticProperties.stringFreeTextProperty(Labels.withId(WILL_MESSAGE)),
+            StaticProperties.singleValueSelection(Labels.withId(WILL_RETAIN),
+                Arrays.asList(
+                    new Option("Yes", false),
+                    new Option("No", true))),
+            StaticProperties.singleValueSelection(
+                Labels.withId(WILL_QOS),
+                Arrays.asList(
+                    new Option("0 - at-most-once", true),
+                    new Option("1 - at-least-once", false),
+                    new Option("2 - exactly-once", false)))));
+  }
+
+  public static StaticPropertyAlternative getAnonymousAccess() {
     return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
 
   }
 
-  public static StaticPropertyAlternative getAlternativesOne(boolean selected) 
{
+  public static StaticPropertyAlternative getAnonymousAccess(boolean selected) 
{
     return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS), selected);
 
   }
 
-  public static StaticPropertyAlternative getAlternativesTwo() {
+  public static StaticPropertyAlternative getUsernameAccess() {
     return Alternatives.from(Labels.withId(USERNAME_ACCESS),
         StaticProperties.group(Labels.withId(USERNAME_GROUP),
             StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME)),
             StaticProperties.secretValue(Labels.withId(PASSWORD))));
 
   }
 
+  public static List<Option> getQOSLevelSelection() {
+    return Arrays.asList(
+        new Option("0 - at-most-once", false),
+        new Option("1 - at-least-once", true),
+        new Option("2 - exactly-once", false));
+  }
+
+  public static List<Option> getRetainSelection() {
+    return Arrays.asList(
+        new Option("Yes", false),
+        new Option("No", true));
+  }
+
+  public static List<Option> getCleanSessionSelection() {
+    return Arrays.asList(
+        new Option("Yes", true),
+        new Option("No", false));
+  }
+
+  public static List<Option> getMqttSelection() {
+    return Arrays.asList(
+        new Option("Yes", true),
+        new Option("No", false));
+  }
+
+  public static StaticPropertyAlternative getClientCertAccess() {
+    var group = StaticProperties.group(
+        Labels.withId(CLIENT_CERT_GROUP),
+        StaticProperties.stringFreeTextProperty(Labels.withId(CLIENTCERT), 
true, false),
+        StaticProperties.secretValue(Labels.withId(CLIENTKEY)));
+    group.setHorizontalRendering(false);
+    return Alternatives.from(Labels.withId(CLIENT_CERT_ACCESS), group);
+
+  }
+
   public static MqttConfig getMqttConfig(IParameterExtractor extractor) {
     return getMqttConfig(extractor, null);
   }
 
+  public static String getProtocol(String brokeruri) {
+    String protocol = null;
+
+    try {
+      URI uri = new URI(brokeruri);
+      protocol = uri.getScheme();
+    } catch (URISyntaxException e) {
+

Review Comment:
   do we need any logging here?



##########
streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.adapters.oi4/strings.en:
##########
@@ -34,11 +34,20 @@ username-group.description=
 username.title=Username
 username.description=
 
+client-cert-alternative.title= Client Certificate
+client-cert-alternative.description=
+
+clientcert.title=Certificate PEM
+clientcert.description = Public key in PEM format
+
+clientkey.title=Private Key PEM
+clientkey.description=Private key in PEM format
+
 password.title=Password
 password.description=
 
 broker_url.title=Broker URL
-broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required)"
+broker_url.description=Example: tcp://test-server.com:1883 (Protocol required. 
Port required), with TLS ssl://test-server.com:8883 (Protocol required. Port 
required)"

Review Comment:
   Didn't we remove the Netio adapter?



##########
streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/migration/MQTTAdapterMigrationV1.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.mqtt.adapter.MqttProtocol;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import java.util.List;
+
+public class MQTTAdapterMigrationV1 implements IAdapterMigrator {
+
+
+    @Override
+    public ModelMigratorConfig config() {
+        return new ModelMigratorConfig(
+                MqttProtocol.ID,
+                SpServiceTagPrefix.ADAPTER,
+                0,
+                1);
+

Review Comment:
   ```suggestion
   ```



##########
streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/migration/Oi4AdapterMigrationV1.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.connect.iiot.adapters.oi4.migration;
+
+import org.apache.streampipes.connect.iiot.adapters.oi4.Oi4Adapter;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class Oi4AdapterMigrationV1 implements IAdapterMigrator {
+
+
+    @Override
+    public ModelMigratorConfig config() {
+        return new ModelMigratorConfig(
+                Oi4Adapter.ID,
+                SpServiceTagPrefix.ADAPTER,
+                0,
+                1);
+
+    }
+
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Oi4AdapterMigrationV1.class);
+
+    @Override
+    public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+            IStaticPropertyExtractor extractor) throws RuntimeException { 
+
+        
+        LOG.info("Migrate Broker URL ");
+
+       changeUrlDescription(element);
+
+        LOG.info("Migrate Access Mode ");
+
+        accessModeDescription(element);
+
+         LOG.info("Migrate Security ");
+
+        migrateSecurity((StaticPropertyAlternatives) 
element.getConfig().get(1));
+
+         LOG.info("Finsihed");

Review Comment:
   typo



##########
streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConfig.java:
##########
@@ -39,23 +59,181 @@ public MqttConfig(String url, String topic, String 
username, String password) {
     this.password = password;
   }
 
+  public MqttConfig(String url, String topic, Boolean tlsEnabled, String 
clientCertificate, String clientKey) {
+    this.authenticated = false;
+    this.url = url;
+    this.topic = topic;
+    this.tls = tlsEnabled;
+    this.clientCertificate = clientCertificate;
+    this.clientKey = clientKey;
+  }
+
+  public MqttConfig(String url, String topic, String username, String 
password, Boolean tlsEnabled) {
+    this(url, topic, username, password);
+    this.tls = tlsEnabled;
+  }
+
+  public MqttConfig(String url, String topic, Boolean tlsEnabled) {
+    this(url, topic);
+    this.tls = tlsEnabled;
+  }
+
   public Boolean getAuthenticated() {
     return authenticated;
   }
 
+  public void setAuthenticated(Boolean authenticated) {
+    this.authenticated = authenticated;
+  }
+
   public String getUrl() {
     return url;
   }
 
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
   public String getTopic() {
     return topic;
   }
 
+  public void setTopic(String topic) {
+    this.topic = topic;
+  }
+
   public String getUsername() {
     return username;
   }
 
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
   public String getPassword() {
     return password;
   }
-}
+    public String getClientId() {
+    return clientId;
+  }
+
+      public void setClientId(String clientId) {

Review Comment:
   code formatting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to