This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 
3353-support-security-policies-signsignandencrypt-in-opc-ua-adapter-and-sink
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3353-support-security-policies-signsignandencrypt-in-opc-ua-adapter-and-sink
 by this push:
     new 7219703e36 feat(#3353): Support sign and encrypt security modes in 
OPC-UA adapter and sink
7219703e36 is described below

commit 7219703e361da95c2c2c26cd22a70d66a1f6335a
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Nov 25 17:57:12 2024 +0100

    feat(#3353): Support sign and encrypt security modes in OPC-UA adapter and 
sink
---
 pom.xml                                            |   2 +-
 .../apache/streampipes/commons/constants/Envs.java |   7 +-
 .../commons/environment/DefaultEnvironment.java    |  19 ++++
 .../commons/environment/Environment.java           |  13 +++
 .../streampipes-connectors-opcua/pom.xml           |   4 +-
 .../opcua/OpcUaConnectorsModuleExport.java         |  18 ++-
 .../connectors/opcua/adapter/OpcUaAdapter.java     |  46 ++++----
 .../connectors/opcua/adapter/OpcUaNodeBrowser.java |   7 +-
 .../opcua/adapter/OpcUaNodeMetadataExtractor.java  |   6 +-
 ...pOpcUaClient.java => ConnectedOpcUaClient.java} |  62 +++--------
 .../opcua/client/OpcUaClientProvider.java          |  75 +++++++++++++
 .../connectors/opcua/client/SpOpcUaClient.java     | 121 +--------------------
 .../config/MiloOpcUaConfigurationProvider.java     |  67 ++----------
 .../connectors/opcua/config/OpcUaConfig.java       |  44 ++++----
 .../opcua/config/SharedUserConfiguration.java      |  28 +++--
 .../opcua/config/SpOpcUaConfigExtractor.java       |  39 +++++--
 .../config/identity/AnonymousIdentityConfig.java   |  35 ++++++
 .../opcua/config/identity/IdentityConfig.java      |  26 +++++
 .../identity/UsernamePasswordIdentityConfig.java   |  44 ++++++++
 .../opcua/config/security/KeyStoreLoader.java      |  87 +++++++++++++++
 .../opcua/config/security/SecurityConfig.java      | 110 +++++++++++++++++++
 .../opcua/migration/OpcUaAdapterMigrationV4.java   | 121 +++++++++++++++++++++
 .../opcua/migration/OpcUaSinkMigrationV1.java      |  48 ++++++++
 .../extensions/connectors/opcua/sink/OpcUa.java    |  53 +++++----
 .../connectors/opcua/sink/OpcUaParameters.java     |  36 +-----
 .../connectors/opcua/sink/OpcUaSink.java           |  18 ++-
 .../connectors/opcua/utils/OpcUaUtil.java          |  51 +++++----
 .../connectors/opcua/utils/SecurityUtils.java      |  48 ++++++++
 .../documentation.md                               |  53 +++++++--
 .../strings.en                                     |  14 ++-
 .../documentation.md                               |  50 ++++++++-
 .../strings.en                                     |  12 ++
 ...data-explorer-widget-data-settings.component.ts |   6 +-
 33 files changed, 977 insertions(+), 393 deletions(-)

diff --git a/pom.xml b/pom.xml
index c1285c0a95..e059400d30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
         <commons-pool2.version>2.12.0</commons-pool2.version>
         <commons-text.version>1.12.0</commons-text.version>
         <ditto-client.version>1.0.0</ditto-client.version>
-        <eclipse.milo.version>0.6.9</eclipse.milo.version>
+        <eclipse.milo.version>0.6.14</eclipse.milo.version>
         <file-management.version>3.1.0</file-management.version>
         <flink.version>1.13.5</flink.version>
         <fogsy-qudt.version>1.0</fogsy-qudt.version>
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7ad56fe402..5723633b43 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -108,7 +108,12 @@ public enum Envs {
 
   // expects a comma separated string of service names
   SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
-  SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", "");
+  SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", ""),
+
+  SP_OPCUA_SECURITY_DIR("SP_OPCUA_SECURITY_DIR", "/opc-ua-security"),
+  SP_OPCUA_KEYSTORE_FILE("SP_OPCUA_KEYSTORE_FILE", "keystore.pfx"),
+  SP_OPCUA_KEYSTORE_PASSWORD("SP_OPCUA_KEYSTORE_PASSWORD", "password"),
+  SP_OPCUA_APPLICATION_URI("SP_OPCUA_APPLICATION_URI", 
"urn:org:apache:streampipes:opcua:client");
 
   private final String envVariableName;
   private String defaultValue;
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 36ed402360..018cd5e6f4 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -328,4 +328,23 @@ public class DefaultEnvironment implements Environment {
     return new StringEnvironmentVariable(Envs.SP_ALLOWED_UPLOAD_FILETYPES);
   }
 
+  @Override
+  public StringEnvironmentVariable getOpcUaSecurityDir() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_SECURITY_DIR);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOpcUaKeystoreFile() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_FILE);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOpcUaKeystorePassword() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_PASSWORD);
+  }
+
+  @Override
+  public StringEnvironmentVariable getOpcUaApplicationUri() {
+    return new StringEnvironmentVariable(Envs.SP_OPCUA_APPLICATION_URI);
+  }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index d1d6efab67..7a99b96283 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -36,7 +36,9 @@ public interface Environment {
   IntEnvironmentVariable getServicePort();
 
   StringEnvironmentVariable getSpCoreScheme();
+
   StringEnvironmentVariable getSpCoreHost();
+
   IntEnvironmentVariable getSpCorePort();
 
   // Time series storage env variables
@@ -144,12 +146,15 @@ public interface Environment {
 
   // Broker defaults
   StringEnvironmentVariable getKafkaHost();
+
   IntEnvironmentVariable getKafkaPort();
 
   StringEnvironmentVariable getMqttHost();
+
   IntEnvironmentVariable getMqttPort();
 
   StringEnvironmentVariable getNatsHost();
+
   IntEnvironmentVariable getNatsPort();
 
   StringEnvironmentVariable getPulsarUrl();
@@ -158,4 +163,12 @@ public interface Environment {
 
   StringEnvironmentVariable getAllowedUploadFiletypes();
 
+  StringEnvironmentVariable getOpcUaSecurityDir();
+
+  StringEnvironmentVariable getOpcUaKeystoreFile();
+
+  StringEnvironmentVariable getOpcUaKeystorePassword();
+
+  StringEnvironmentVariable getOpcUaApplicationUri();
+
 }
diff --git a/streampipes-extensions/streampipes-connectors-opcua/pom.xml 
b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
index 71eaf09917..673e6858c6 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
@@ -21,9 +21,9 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.streampipes</groupId>
-        <artifactId>streampipes-parent</artifactId>
+        <artifactId>streampipes-extensions</artifactId>
         <version>0.97.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
+        <relativePath>../pom.xml</relativePath>
     </parent>
 
     <artifactId>streampipes-connectors-opcua</artifactId>
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
index 63aacdab58..03fc8448a4 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
@@ -23,25 +23,35 @@ import 
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import 
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
 import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
 import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
+import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV4;
+import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV1;
 import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
 
 import java.util.List;
 
 public class OpcUaConnectorsModuleExport implements IExtensionModuleExport {
+
+  private final OpcUaClientProvider clientProvider;
+
+  public OpcUaConnectorsModuleExport() {
+    this.clientProvider = new OpcUaClientProvider();
+  }
+
   @Override
   public List<StreamPipesAdapter> adapters() {
     return List.of(
-        new OpcUaAdapter()
+        new OpcUaAdapter(clientProvider)
     );
   }
 
   @Override
   public List<IStreamPipesPipelineElement<?>> pipelineElements() {
     return List.of(
-        new OpcUaSink()
+        new OpcUaSink(clientProvider)
     );
   }
 
@@ -50,7 +60,9 @@ public class OpcUaConnectorsModuleExport implements 
IExtensionModuleExport {
     return List.of(
         new OpcUaAdapterMigrationV1(),
         new OpcUaAdapterMigrationV2(),
-        new OpcUaAdapterMigrationV3()
+        new OpcUaAdapterMigrationV3(),
+        new OpcUaAdapterMigrationV4(),
+        new OpcUaSinkMigrationV1()
     );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
index baf5cfe95b..526ff7bb24 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
@@ -29,7 +29,8 @@ import 
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeCont
 import 
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import 
org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import 
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import 
org.apache.streampipes.extensions.connectors.opcua.config.OpcUaAdapterConfig;
 import 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
 import 
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
@@ -42,7 +43,6 @@ import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
-import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -66,7 +66,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
-import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.SUBSCRIPTION_MODE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil.getSchema;
@@ -78,7 +77,9 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
   private static final Logger LOG = 
LoggerFactory.getLogger(OpcUaAdapter.class);
 
   private int pullingIntervalMilliSeconds;
-  private SpOpcUaClient<OpcUaAdapterConfig> spOpcUaClient;
+  private final OpcUaClientProvider clientProvider;
+  private ConnectedOpcUaClient connectedClient;
+  private OpcUaAdapterConfig opcUaAdapterConfig;
   private List<OpcNode> allNodes;
   private List<NodeId> allNodeIds;
   private int numberProperties;
@@ -92,15 +93,14 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
    */
   private final Map<String, String> nodeIdToLabelMapping;
 
-  public OpcUaAdapter() {
-    super();
+  public OpcUaAdapter(OpcUaClientProvider clientProvider) {
+    this.clientProvider = clientProvider;
     this.numberProperties = 0;
     this.event = new HashMap<>();
     this.nodeIdToLabelMapping = new HashMap<>();
   }
 
   private void prepareAdapter(IAdapterParameterExtractor extractor) throws 
AdapterException {
-
     this.allNodeIds = new ArrayList<>();
     List<String> deleteKeys = extractor
         .getAdapterDescription()
@@ -111,9 +111,9 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
         .collect(Collectors.toList());
 
     try {
-      this.spOpcUaClient.connect();
+      this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
       OpcUaNodeBrowser browserClient =
-          new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), 
this.spOpcUaClient.getSpOpcConfig());
+          new OpcUaNodeBrowser(this.connectedClient.getClient(), 
this.opcUaAdapterConfig);
       this.allNodes = browserClient.findNodes(deleteKeys);
 
 
@@ -121,11 +121,11 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
         this.allNodeIds.add(node.getNodeId());
       }
 
-      if (spOpcUaClient.getSpOpcConfig().inPullMode()) {
-        this.pullingIntervalMilliSeconds = 
spOpcUaClient.getSpOpcConfig().getPullIntervalMilliSeconds();
+      if (opcUaAdapterConfig.inPullMode()) {
+        this.pullingIntervalMilliSeconds = 
opcUaAdapterConfig.getPullIntervalMilliSeconds();
       } else {
         this.numberProperties = this.allNodeIds.size();
-        this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
+        this.connectedClient.createListSubscription(this.allNodeIds, this);
       }
 
       this.allNodes.forEach(node -> 
this.nodeIdToLabelMapping.put(node.getNodeId().toString(), node.getLabel()));
@@ -139,7 +139,7 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
   @Override
   public void pullData() throws ExecutionException, RuntimeException, 
InterruptedException, TimeoutException {
     var response =
-        this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, 
this.allNodeIds);
+        this.connectedClient.getClient().readValues(0, 
TimestampsToReturn.Both, this.allNodeIds);
     boolean badStatusCodeReceived = false;
     boolean emptyValueReceived = false;
     List<DataValue> returnValues =
@@ -168,7 +168,7 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
 
   private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
     return badStatusCodeReceived
-        && this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
+        && this.opcUaAdapterConfig.getIncompleteEventStrategy()
         .equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
   }
 
@@ -208,13 +208,13 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
   public void onAdapterStarted(IAdapterParameterExtractor extractor,
                                IEventCollector collector,
                                IAdapterRuntimeContext adapterRuntimeContext) 
throws AdapterException {
-    this.spOpcUaClient = new SpOpcUaClient<>(
-        
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor())
-    );
+    this.opcUaAdapterConfig =
+        
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor());
+    //this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
     this.collector = collector;
     this.prepareAdapter(extractor);
 
-    if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+    if (this.opcUaAdapterConfig.inPullMode()) {
       this.pullAdapterScheduler = new PullAdapterScheduler();
       this.pullAdapterScheduler.schedule(this, 
extractor.getAdapterDescription().getElementId());
     }
@@ -223,9 +223,9 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
   @Override
   public void onAdapterStopped(IAdapterParameterExtractor extractor,
                                IAdapterRuntimeContext adapterRuntimeContext) 
throws AdapterException {
-    this.spOpcUaClient.disconnect();
+    clientProvider.releaseClient(this.opcUaAdapterConfig);
 
-    if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+    if (this.opcUaAdapterConfig.inPullMode()) {
       this.pullAdapterScheduler.shutdown();
     }
   }
@@ -233,12 +233,12 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
   @Override
   public StaticProperty resolveConfiguration(String staticPropertyInternalName,
                                              IStaticPropertyExtractor 
extractor) throws SpConfigurationException {
-    return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+    return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, 
extractor);
   }
 
   @Override
   public IAdapterConfiguration declareConfig() {
-    var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
+    var builder = AdapterConfigurationBuilder.create(ID, 4, () -> new 
OpcUaAdapter(clientProvider))
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withLocales(Locales.EN)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
@@ -255,6 +255,6 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
   @Override
   public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
                                        IAdapterGuessSchemaContext 
adapterGuessSchemaContext) throws AdapterException {
-    return getSchema(extractor);
+    return getSchema(clientProvider, extractor);
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
index e7dde9d990..2a150fa355 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
@@ -25,6 +25,7 @@ import 
org.apache.streampipes.model.staticproperty.TreeInputNode;
 
 import org.eclipse.milo.opcua.sdk.client.AddressSpace;
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
@@ -45,13 +46,13 @@ import java.util.stream.Collectors;
 
 public class OpcUaNodeBrowser {
 
-  private final OpcUaClient client;
+  private final UaClient client;
   private final OpcUaConfig spOpcConfig;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(OpcUaNodeBrowser.class);
 
   public OpcUaNodeBrowser(
-      OpcUaClient client,
+      UaClient client,
       OpcUaConfig spOpcUaClientConfig
   ) {
     this.client = client;
@@ -127,7 +128,7 @@ public class OpcUaNodeBrowser {
   }
 
   private List<TreeInputNode> findChildren(
-      OpcUaClient client,
+      UaClient client,
       NodeId nodeId
   ) throws UaException {
     return client
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
index 04dda6eb62..bcaf11a476 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.adapter;
 
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
 import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
 import org.eclipse.milo.opcua.stack.core.StatusCodes;
@@ -32,12 +32,12 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public class OpcUaNodeMetadataExtractor {
-  private final OpcUaClient client;
+  private final UaClient client;
   private final UaNode node;
 
   private final Map<String, Object> metadata;
 
-  public OpcUaNodeMetadataExtractor(OpcUaClient client, UaNode node) {
+  public OpcUaNodeMetadataExtractor(UaClient client, UaNode node) {
     this.client = client;
     this.node = node;
     this.metadata = new HashMap<>();
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
similarity index 79%
copy from 
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
copy to 
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
index 95fda81dbd..d71d85646f 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
@@ -18,19 +18,13 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.client;
 
-
-import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
-import 
org.apache.streampipes.extensions.connectors.opcua.config.MiloOpcUaConfigurationProvider;
-import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
 
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
-import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
 import 
org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
 import org.eclipse.milo.opcua.stack.core.AttributeId;
-import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
@@ -44,53 +38,21 @@ import 
org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
 
-/***
- * Wrapper class for all OPC UA specific stuff.
- */
-public class SpOpcUaClient<T extends OpcUaConfig> {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SpOpcUaClient.class);
-
-  private OpcUaClient client;
-  private final T spOpcConfig;
+public class ConnectedOpcUaClient {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(ConnectedOpcUaClient.class);
+  private final UaClient client;
   private static final AtomicLong clientHandles = new AtomicLong(1L);
 
-  public SpOpcUaClient(T config) {
-    this.spOpcConfig = config;
-  }
-
-  /***
-   *
-   * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
-   */
-  public OpcUaClient getClient() {
-    return this.client;
-  }
-
-  /***
-   * Establishes appropriate connection to OPC UA endpoint depending on the 
{@link SpOpcUaClient} instance
-   *
-   * @throws UaException An exception occurring during OPC connection
-   */
-  public void connect()
-      throws UaException, ExecutionException, InterruptedException, 
SpConfigurationException, URISyntaxException {
-    OpcUaClientConfig clientConfig = new 
MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
-    this.client = OpcUaClient.create(clientConfig);
-    client.connect().get();
-  }
-
-  public void disconnect() {
-    client.disconnect();
+  public ConnectedOpcUaClient(UaClient client) {
+    this.client = client;
   }
 
   /***
@@ -178,7 +140,15 @@ public class SpOpcUaClient<T extends OpcUaConfig> {
     }
   }
 
-  public T getSpOpcConfig() {
-    return spOpcConfig;
+  /***
+   *
+   * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
+   */
+  public UaClient getClient() {
+    return this.client;
+  }
+
+  public void disconnect() {
+    client.disconnect();
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
new file mode 100644
index 0000000000..b3f907bd4e
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.client;
+
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+public class OpcUaClientProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(OpcUaClientProvider.class);
+
+  private final Map<String, ConnectedOpcUaClient> clients = new 
ConcurrentHashMap<>();
+  private final Map<String, Integer> consumers = new ConcurrentHashMap<>();
+
+  public synchronized <T extends OpcUaConfig> ConnectedOpcUaClient getClient(T 
config)
+      throws UaException, SpConfigurationException, URISyntaxException, 
ExecutionException, InterruptedException {
+    var serverId = config.getUniqueServerId();
+    if (clients.containsKey(serverId)) {
+      LOG.debug("Adding new consumer to client {}", serverId);
+      consumers.put(serverId, consumers.get(config.getUniqueServerId()) + 1);
+      return clients.get(serverId);
+    } else {
+      LOG.debug("Creating new client {}", serverId);
+      var connectedClient = new SpOpcUaClient<>(config).connect();
+      clients.put(serverId, connectedClient);
+      consumers.put(serverId, 1);
+      return connectedClient;
+    }
+  }
+
+  public <T extends OpcUaConfig> void releaseClient(T config) {
+    String serverId = config.getUniqueServerId();
+    LOG.debug("Releasing client {}", serverId);
+
+    synchronized (this) {
+      consumers.computeIfPresent(serverId, (key, count) -> {
+        int updatedCount = count - 1;
+        if (updatedCount <= 0) {
+          LOG.debug("Disconnecting client {}", serverId);
+          if (clients.containsKey(serverId)) {
+            clients.get(serverId).disconnect();
+            clients.remove(serverId);
+          }
+          return null;
+        }
+        return updatedCount;
+      });
+    }
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
index 95fda81dbd..a17d373b66 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
@@ -20,38 +20,17 @@ package 
org.apache.streampipes.extensions.connectors.opcua.client;
 
 
 import org.apache.streampipes.commons.exceptions.SpConfigurationException;
-import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
 import 
org.apache.streampipes.extensions.connectors.opcua.config.MiloOpcUaConfigurationProvider;
 import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
 
 import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
-import 
org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
-import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.UaException;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
-import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
-import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
-import 
org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
-import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static 
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
 
 /***
  * Wrapper class for all OPC UA specific stuff.
@@ -63,120 +42,26 @@ public class SpOpcUaClient<T extends OpcUaConfig> {
   private OpcUaClient client;
   private final T spOpcConfig;
 
-  private static final AtomicLong clientHandles = new AtomicLong(1L);
-
   public SpOpcUaClient(T config) {
     this.spOpcConfig = config;
   }
 
-  /***
-   *
-   * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
-   */
-  public OpcUaClient getClient() {
-    return this.client;
-  }
-
   /***
    * Establishes appropriate connection to OPC UA endpoint depending on the 
{@link SpOpcUaClient} instance
    *
    * @throws UaException An exception occurring during OPC connection
    */
-  public void connect()
+  public ConnectedOpcUaClient connect()
       throws UaException, ExecutionException, InterruptedException, 
SpConfigurationException, URISyntaxException {
     OpcUaClientConfig clientConfig = new 
MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
-    this.client = OpcUaClient.create(clientConfig);
+    var client = OpcUaClient.create(clientConfig);
     client.connect().get();
+    return new ConnectedOpcUaClient(client);
   }
 
-  public void disconnect() {
-    client.disconnect();
-  }
-
-  /***
-   * Register subscriptions for given OPC UA nodes
-   * @param nodes List of {@link 
org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
-   * @param opcUaAdapter current instance of {@link OpcUaAdapter}
-   * @throws Exception
-   */
-  public void createListSubscription(List<NodeId> nodes,
-                                     OpcUaAdapter opcUaAdapter) throws 
Exception {
-    client.getSubscriptionManager().addSubscriptionListener(new 
UaSubscriptionManager.SubscriptionListener() {
-      @Override
-      public void onSubscriptionTransferFailed(UaSubscription subscription, 
StatusCode statusCode) {
-        LOG.warn("Transfer for subscriptionId={} failed: {}", 
subscription.getSubscriptionId(), statusCode);
-        try {
-          initSubscription(nodes, opcUaAdapter);
-        } catch (Exception e) {
-          LOG.error("Re-creating the subscription failed", e);
-        }
-      }
-    });
-
-    initSubscription(nodes, opcUaAdapter);
-  }
-
-
-  public void initSubscription(List<NodeId> nodes,
-                               OpcUaAdapter opcUaAdapter) throws Exception {
-    /*
-     * create a subscription @ 1000ms
-     */
-    UaSubscription subscription = 
this.client.getSubscriptionManager().createSubscription(1000.0).get();
-
-    List<CompletableFuture<DataValue>> values = new ArrayList<>();
-
-    for (NodeId node : nodes) {
-      values.add(this.client.readValue(0, TimestampsToReturn.Both, node));
-    }
-
-    for (CompletableFuture<DataValue> value : values) {
-      if (value.get().getValue().toString().contains("null")) {
-        LOG.error("Node has no value");
-      }
-    }
 
 
-    List<ReadValueId> readValues = new ArrayList<>();
-    // Read a specific value attribute
-    for (NodeId node : nodes) {
-      readValues.add(new ReadValueId(node, AttributeId.Value.uid(), null, 
QualifiedName.NULL_VALUE));
-    }
 
-    List<MonitoredItemCreateRequest> requests = new ArrayList<>();
-
-    for (ReadValueId readValue : readValues) {
-      // important: client handle must be unique per item
-      UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
-      MonitoringParameters parameters = new MonitoringParameters(
-          clientHandle,
-          1000.0,     // sampling interval
-          null,      // filter, null means use default
-          uint(10),   // queue size
-          true         // discard oldest
-      );
-
-      requests.add(new MonitoredItemCreateRequest(readValue, 
MonitoringMode.Reporting, parameters));
-    }
-
-    UaSubscription.ItemCreationCallback onItemCreated =
-        (item, i) -> item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
-    List<UaMonitoredItem> items = subscription.createMonitoredItems(
-        TimestampsToReturn.Both,
-        requests,
-        onItemCreated
-    ).get();
-
-    for (UaMonitoredItem item : items) {
-      NodeId tagId = item.getReadValueId().getNodeId();
-      if (item.getStatusCode().isGood()) {
-        LOG.info("item created for nodeId=" + tagId);
-      } else {
-        LOG.error("failed to create item for " + 
item.getReadValueId().getNodeId() + item.getStatusCode());
-      }
-    }
-  }
 
   public T getSpOpcConfig() {
     return spOpcConfig;
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
index 9813d52fbb..8db41a8e11 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
@@ -18,19 +18,15 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.config;
 
+import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.commons.exceptions.SpConfigurationException;
 
 import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
-import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
 import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
-import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
 import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
 
-import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -39,65 +35,16 @@ public class MiloOpcUaConfigurationProvider {
   public OpcUaClientConfig makeClientConfig(OpcUaConfig spOpcConfig)
       throws ExecutionException, InterruptedException, 
SpConfigurationException, URISyntaxException {
     String opcServerUrl = spOpcConfig.getOpcServerURL();
+    String applicationUri = 
Environments.getEnvironment().getOpcUaApplicationUri().getValueOrDefault();
     List<EndpointDescription> endpoints = 
DiscoveryClient.getEndpoints(opcServerUrl).get();
-    String host = opcServerUrl.split("://")[1].split(":")[0];
 
-    EndpointDescription tmpEndpoint = endpoints
-        .stream()
-        .filter(e -> 
e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
-        .findFirst()
-        .orElseThrow(() -> new SpConfigurationException("No endpoint with 
security policy none"));
+    var builder = OpcUaClientConfig.builder()
+        .setApplicationName(LocalizedText.english("Apache StreamPipes"))
+        .setApplicationUri(applicationUri);
 
-    tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
-    endpoints = Collections.singletonList(tmpEndpoint);
+    spOpcConfig.getSecurityConfig().configureSecurityPolicy(opcServerUrl, 
endpoints, builder);
+    spOpcConfig.getIdentityConfig().configureIdentity(builder);
 
-    EndpointDescription endpoint = endpoints
-        .stream()
-        .filter(e -> 
e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
-        .findFirst().orElseThrow(() -> new SpConfigurationException("no 
desired endpoints returned"));
-
-    return buildConfig(endpoint, spOpcConfig);
-  }
-
-  private OpcUaClientConfig buildConfig(EndpointDescription endpoint,
-                                        OpcUaConfig spOpcConfig) {
-
-    OpcUaClientConfigBuilder builder = defaultBuilder(endpoint);
-    if (!spOpcConfig.isUnauthenticated()) {
-      builder.setIdentityProvider(new 
UsernameProvider(spOpcConfig.getUsername(), spOpcConfig.getPassword()));
-    }
     return builder.build();
   }
-
-  private OpcUaClientConfigBuilder defaultBuilder(EndpointDescription 
endpoint) {
-    return OpcUaClientConfig.builder()
-        .setApplicationName(LocalizedText.english("eclipse milo opc-ua 
client"))
-        .setApplicationUri("urn:eclipse:milo:examples:client")
-        .setEndpoint(endpoint);
-  }
-
-  private EndpointDescription updateEndpointUrl(
-      EndpointDescription original, String hostname) throws URISyntaxException 
{
-
-    URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
-
-    String endpointUrl = String.format(
-        "%s://%s:%s%s",
-        uri.getScheme(),
-        hostname,
-        uri.getPort(),
-        uri.getPath()
-    );
-
-    return new EndpointDescription(
-        endpointUrl,
-        original.getServer(),
-        original.getServerCertificate(),
-        original.getSecurityMode(),
-        original.getSecurityPolicyUri(),
-        original.getUserIdentityTokens(),
-        original.getTransportProfileUri(),
-        original.getSecurityLevel()
-    );
-  }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
index 6db027d861..128f1bc1f9 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
@@ -18,15 +18,21 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.config;
 
+import 
org.apache.streampipes.extensions.connectors.opcua.config.identity.IdentityConfig;
+import 
org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
+
 import java.util.List;
 
 public class OpcUaConfig {
 
   private String opcServerURL;
-  private boolean unauthenticated;
-  private String username;
-  private String password;
   private List<String> selectedNodeNames;
+  private IdentityConfig identityConfig;
+  private SecurityConfig securityPolicyConfig;
+
+  public OpcUaConfig() {
+
+  }
 
   public String getOpcServerURL() {
     return opcServerURL;
@@ -36,35 +42,31 @@ public class OpcUaConfig {
     this.opcServerURL = opcServerURL;
   }
 
-  public boolean isUnauthenticated() {
-    return unauthenticated;
-  }
-
-  public void setUnauthenticated(boolean unauthenticated) {
-    this.unauthenticated = unauthenticated;
+  public List<String> getSelectedNodeNames() {
+    return selectedNodeNames;
   }
 
-  public String getUsername() {
-    return username;
+  public void setSelectedNodeNames(List<String> selectedNodeNames) {
+    this.selectedNodeNames = selectedNodeNames;
   }
 
-  public void setUsername(String username) {
-    this.username = username;
+  public IdentityConfig getIdentityConfig() {
+    return identityConfig;
   }
 
-  public String getPassword() {
-    return password;
+  public void setIdentityConfig(IdentityConfig identityConfig) {
+    this.identityConfig = identityConfig;
   }
 
-  public void setPassword(String password) {
-    this.password = password;
+  public SecurityConfig getSecurityConfig() {
+    return securityPolicyConfig;
   }
 
-  public List<String> getSelectedNodeNames() {
-    return selectedNodeNames;
+  public void setSecurityConfig(SecurityConfig securityPolicyConfig) {
+    this.securityPolicyConfig = securityPolicyConfig;
   }
 
-  public void setSelectedNodeNames(List<String> selectedNodeNames) {
-    this.selectedNodeNames = selectedNodeNames;
+  public String getUniqueServerId() {
+    return String.format("%s-%s-%s", opcServerURL, securityPolicyConfig, 
identityConfig);
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
index 9b5f541071..1059ce7e38 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.config;
 
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
 import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
 import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
@@ -29,7 +30,6 @@ import org.apache.streampipes.sdk.helpers.Labels;
 import java.util.List;
 
 import static 
org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter.PULL_GROUP;
-import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.HOST_PORT;
@@ -41,7 +41,6 @@ import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_URL;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
-import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
 
@@ -51,14 +50,27 @@ public class SharedUserConfiguration {
   public static final String INCOMPLETE_OPTION_IGNORE = "ignore-event";
   public static final String INCOMPLETE_OPTION_SEND = "send-event";
 
+  public static final String SECURITY_MODE = "securityMode";
+  public static final String SECURITY_POLICY = "securityPolicy";
+  public static final String USER_AUTHENTICATION = "userAuthentication";
+  public static final String USER_AUTHENTICATION_ANONYMOUS = "anonymous";
+
   public static void 
appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBuilder<?, ?> 
builder,
                                              boolean adapterConfig) {
 
     var dependsOn = getDependsOn(adapterConfig);
 
     builder
-        .requiredAlternatives(Labels.withId(ACCESS_MODE),
-            Alternatives.from(Labels.withId(UNAUTHENTICATED)),
+        .requiredSingleValueSelection(
+            Labels.withId(SECURITY_MODE),
+            SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new 
Option(mode.k, mode.v)).toList()
+        )
+        .requiredSingleValueSelection(
+            Labels.withId(SECURITY_POLICY),
+            SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new 
Option(p.name())).toList()
+        )
+        .requiredAlternatives(Labels.withId(USER_AUTHENTICATION),
+            Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
             Alternatives.from(Labels.withId(USERNAME_GROUP),
                 StaticProperties.group(
                     Labels.withId(USERNAME_GROUP),
@@ -104,7 +116,7 @@ public class SharedUserConfiguration {
 
   public static OneOfStaticProperty getIncompleteEventConfig() {
     return StaticProperties.singleValueSelection(
-      Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
+        Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
         List.of(
             new Option("Ignore (only complete messages are sent)", 
INCOMPLETE_OPTION_IGNORE),
             new Option("Send (incomplete messages are sent)", 
INCOMPLETE_OPTION_SEND)
@@ -115,10 +127,12 @@ public class SharedUserConfiguration {
   public static List<String> getDependsOn(boolean adapterConfig) {
     return adapterConfig ? List.of(
         ADAPTER_TYPE.name(),
-        ACCESS_MODE.name(),
+        SECURITY_MODE,
+        SECURITY_POLICY,
         OPC_HOST_OR_URL.name()
     ) : List.of(
-        ACCESS_MODE.name(),
+        SECURITY_MODE,
+        SECURITY_POLICY,
         OPC_HOST_OR_URL.name());
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
index 6741a3e6f8..333dbadc06 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
@@ -20,11 +20,16 @@ package 
org.apache.streampipes.extensions.connectors.opcua.config;
 
 import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import 
org.apache.streampipes.extensions.connectors.opcua.config.identity.AnonymousIdentityConfig;
+import 
org.apache.streampipes.extensions.connectors.opcua.config.identity.UsernamePasswordIdentityConfig;
+import 
org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
 import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
 
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
 import java.util.List;
 
-import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_HOST_OR_URL;
@@ -35,7 +40,6 @@ import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
-import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
 
 public class SpOpcUaConfigExtractor {
@@ -71,20 +75,31 @@ public class SpOpcUaConfigExtractor {
   }
 
   public static <T extends OpcUaConfig> T 
extractSharedConfig(IParameterExtractor extractor,
-                                                               T config) {
+                                                              T config) {
 
     String selectedAlternativeConnection =
         extractor.selectedAlternativeInternalId(OPC_HOST_OR_URL.name());
+
     String selectedAlternativeAuthentication =
-        extractor.selectedAlternativeInternalId(ACCESS_MODE.name());
+        
extractor.selectedAlternativeInternalId(SharedUserConfiguration.USER_AUTHENTICATION);
+
     List<String> selectedNodeNames =
         extractor.selectedTreeNodesInternalNames(AVAILABLE_NODES.name(), 
String.class);
-
     config.setSelectedNodeNames(selectedNodeNames);
 
-    boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
-    boolean unauthenticated = 
selectedAlternativeAuthentication.equals(UNAUTHENTICATED.name());
+    String selectedSecurityMode = extractor.selectedSingleValueInternalName(
+        SharedUserConfiguration.SECURITY_MODE,
+        String.class
+    );
+    String selectedSecurityPolicy = extractor.selectedSingleValue(
+        SharedUserConfiguration.SECURITY_POLICY,
+        String.class
+    );
+    config.setSecurityConfig(new SecurityConfig(
+        MessageSecurityMode.valueOf(selectedSecurityMode),
+        SecurityPolicy.valueOf(selectedSecurityPolicy)));
 
+    boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
     if (useURL) {
       String serverAddress =
           extractor.singleValueParameter(OPC_SERVER_URL.name(), String.class);
@@ -97,15 +112,15 @@ public class SpOpcUaConfigExtractor {
       config.setOpcServerURL(serverAddress + ":" + port);
     }
 
+    boolean unauthenticated = selectedAlternativeAuthentication.equals(
+        SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS
+    );
     if (unauthenticated) {
-      config.setUnauthenticated(true);
+      config.setIdentityConfig(new AnonymousIdentityConfig());
     } else {
       String username = extractor.singleValueParameter(USERNAME.name(), 
String.class);
       String password = extractor.secretValue(PASSWORD.name());
-
-      config.setUsername(username);
-      config.setPassword(password);
-      config.setUnauthenticated(false);
+      config.setIdentityConfig(new UsernamePasswordIdentityConfig(username, 
password));
     }
 
     return config;
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
new file mode 100644
index 0000000000..ecaad661df
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+
+public class AnonymousIdentityConfig implements IdentityConfig {
+
+  @Override
+  public void configureIdentity(OpcUaClientConfigBuilder builder) {
+    builder.setIdentityProvider(new AnonymousProvider());
+  }
+
+  @Override
+  public String toString() {
+    return "anonymous";
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
new file mode 100644
index 0000000000..db51fd3449
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+
+public interface IdentityConfig {
+
+  void configureIdentity(OpcUaClientConfigBuilder builder);
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
new file mode 100644
index 0000000000..825fe1fd9a
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+
+public class UsernamePasswordIdentityConfig implements IdentityConfig {
+
+  private final String username;
+  private final String password;
+
+  public UsernamePasswordIdentityConfig(String username, String password) {
+    this.username = username;
+    this.password = password;
+  }
+
+  @Override
+  public void configureIdentity(OpcUaClientConfigBuilder builder) {
+    builder.setIdentityProvider(new UsernameProvider(username, password));
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s-%S", username, DigestUtils.sha256Hex(password));
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
new file mode 100644
index 0000000000..75955fcbd8
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+
+public class KeyStoreLoader {
+
+  private static final String CLIENT_ALIAS = "apache-streampipes";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KeyStoreLoader.class);
+
+  private X509Certificate[] clientCertificateChain;
+  private X509Certificate clientCertificate;
+  private KeyPair clientKeyPair;
+
+  KeyStoreLoader load(Environment env,
+                      Path securityDir) throws Exception {
+    var keyStore = KeyStore.getInstance("PKCS12");
+    var keyStoreFile = env.getOpcUaKeystoreFile().getValueOrDefault();
+    var keyStorePassword = env.getOpcUaKeystorePassword().getValueOrDefault();
+    Path serverKeyStore = securityDir.resolve(keyStoreFile);
+    char[] serverKeyStorePassword = keyStorePassword.toCharArray();
+
+    LOG.info("Loading KeyStore at {}", serverKeyStore);
+
+    try (InputStream in = Files.newInputStream(serverKeyStore)) {
+      keyStore.load(in, serverKeyStorePassword);
+    }
+
+    Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, 
serverKeyStorePassword);
+    if (clientPrivateKey instanceof PrivateKey) {
+      clientCertificate = (X509Certificate) 
keyStore.getCertificate(CLIENT_ALIAS);
+
+      clientCertificateChain = 
Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+          .map(X509Certificate.class::cast)
+          .toArray(X509Certificate[]::new);
+
+      PublicKey serverPublicKey = clientCertificate.getPublicKey();
+      clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) 
clientPrivateKey);
+    }
+
+    return this;
+  }
+
+  X509Certificate getClientCertificate() {
+    return clientCertificate;
+  }
+
+  public X509Certificate[] getClientCertificateChain() {
+    return clientCertificateChain;
+  }
+
+  KeyPair getClientKeyPair() {
+    return clientKeyPair;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
new file mode 100644
index 0000000000..b64660ebe9
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import 
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class SecurityConfig {
+
+  private final MessageSecurityMode securityMode;
+  private final SecurityPolicy securityPolicy;
+
+  public SecurityConfig(MessageSecurityMode securityMode,
+                        SecurityPolicy securityPolicy) {
+    this.securityMode = securityMode;
+    this.securityPolicy = securityPolicy;
+  }
+
+  public void configureSecurityPolicy(String opcServerUrl,
+                                      List<EndpointDescription> endpoints,
+                                      OpcUaClientConfigBuilder builder)
+      throws SpConfigurationException, URISyntaxException {
+    String host = opcServerUrl.split("://")[1].split(":")[0];
+
+    EndpointDescription tmpEndpoint = endpoints
+        .stream()
+        .filter(e -> e.getSecurityMode() == securityMode)
+        .filter(e -> e.getSecurityPolicyUri().equals(securityPolicy.getUri()))
+        .findFirst()
+        .orElseThrow(() ->
+            new SpConfigurationException("No endpoint available with security 
mode {} and security policy {}")
+        );
+
+    tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
+
+    if (securityMode != MessageSecurityMode.None) {
+      try {
+        var env = Environments.getEnvironment();
+        var securityDir = 
Paths.get(env.getOpcUaSecurityDir().getValueOrDefault());
+        var trustListManager = new 
DefaultTrustListManager(securityDir.resolve("pki").toFile());
+
+        var certificateValidator = new 
DefaultClientCertificateValidator(trustListManager);
+        var loader = new KeyStoreLoader().load(env, securityDir);
+        builder.setKeyPair(loader.getClientKeyPair());
+        builder.setCertificate(loader.getClientCertificate());
+        builder.setCertificateChain(loader.getClientCertificateChain());
+        builder.setCertificateValidator(certificateValidator);
+      } catch (Exception e) {
+        throw new SpConfigurationException(
+            "Failed to load keystore - check that all required environment 
variables "
+                + "are defined and the keystore exists",
+            e
+        );
+      }
+    }
+
+    builder.setEndpoint(tmpEndpoint);
+  }
+
+  private EndpointDescription updateEndpointUrl(EndpointDescription original,
+                                                String hostname) throws 
URISyntaxException {
+
+    URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
+
+    String endpointUrl = String.format("%s://%s:%s%s", uri.getScheme(), 
hostname, uri.getPort(), uri.getPath());
+
+    return new EndpointDescription(
+        endpointUrl,
+        original.getServer(),
+        original.getServerCertificate(),
+        original.getSecurityMode(),
+        original.getSecurityPolicyUri(),
+        original.getUserIdentityTokens(),
+        original.getTransportProfileUri(),
+        original.getSecurityLevel());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s-%s", securityMode, securityPolicy);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
new file mode 100644
index 0000000000..d4311bf6d5
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import java.util.List;
+
+import static 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_MODE;
+import static 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_POLICY;
+import static 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION;
+import static 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS;
+import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
+import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
+import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
+
+public class OpcUaAdapterMigrationV4 implements IAdapterMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        OpcUaAdapter.ID,
+        SpServiceTagPrefix.ADAPTER,
+        3,
+        4
+    );
+  }
+
+  @Override
+  public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element,
+                                                     IStaticPropertyExtractor 
extractor) throws RuntimeException {
+    var config = element.getConfig();
+    element.setConfig(migrate(config, extractor));
+
+    return MigrationResult.success(element);
+  }
+
+  public List<StaticProperty> migrate(List<StaticProperty> staticProperties,
+                                      IParameterExtractor extractor) {
+    var securityMode =
+        StaticProperties.singleValueSelection(
+            Labels.withId(SECURITY_MODE),
+            SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new 
Option(mode.k, mode.v)).toList()
+        );
+    securityMode.getOptions().get(0).setSelected(true);
+
+    var securityPolicy = StaticProperties.singleValueSelection(
+        Labels.withId(SECURITY_POLICY),
+        SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new 
Option(p.name())).toList()
+    );
+    securityPolicy.getOptions().get(0).setSelected(true);
+
+    boolean anonymous = true;
+    var currentAuthSettings = extractor.selectedAlternativeInternalId(
+        "ACCESS_MODE"
+    );
+    if (currentAuthSettings.equals("USERNAME_GROUP")) {
+      anonymous = false;
+    }
+    var authentication = 
StaticProperties.alternatives(Labels.withId(USER_AUTHENTICATION),
+        Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
+        Alternatives.from(Labels.withId(USERNAME_GROUP),
+            StaticProperties.group(
+                Labels.withId(USERNAME_GROUP),
+                StaticProperties.stringFreeTextProperty(
+                    Labels.withId(USERNAME)),
+                StaticProperties.secretValue(Labels.withId(PASSWORD))
+            ))
+    );
+    if (anonymous) {
+      authentication.getAlternatives().get(0).setSelected(true);
+    } else {
+      authentication.getAlternatives().get(1).setSelected(true);
+      var username = extractor.singleValueParameter("USERNAME", String.class);
+      var password = extractor.secretValue("PASSWORD");
+      var group = (StaticPropertyGroup) 
authentication.getAlternatives().get(1).getStaticProperty();
+      ((FreeTextStaticProperty) 
group.getStaticProperties().get(0)).setValue(username);
+      ((SecretStaticProperty) 
group.getStaticProperties().get(1)).setValue(password);
+      ((SecretStaticProperty) 
group.getStaticProperties().get(1)).setEncrypted(false);
+    }
+
+    // remove old authentication property, add new properties for 
securityMode, policy and authentication options
+    staticProperties.remove(1);
+    staticProperties.add(1, securityMode);
+    staticProperties.add(2, securityPolicy);
+    staticProperties.add(3, authentication);
+
+    return staticProperties;
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
new file mode 100644
index 0000000000..bc24f4d88d
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+
+public class OpcUaSinkMigrationV1 implements IDataSinkMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        OpcUaSink.ID,
+        SpServiceTagPrefix.DATA_SINK,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
+                                                     
IDataSinkParameterExtractor extractor) throws RuntimeException {
+    var config = element.getStaticProperties();
+    var migratedConfigs = new OpcUaAdapterMigrationV4().migrate(config, 
extractor);
+    element.setStaticProperties(migratedConfigs);
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
index 78babe0027..e5c9485bf7 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
@@ -19,12 +19,13 @@
 package org.apache.streampipes.extensions.connectors.opcua.sink;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import 
org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import 
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.PrimitiveField;
 import org.apache.streampipes.vocabulary.XSD;
 
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -42,7 +43,8 @@ public class OpcUa {
 
   private static final Logger LOG = LoggerFactory.getLogger(OpcUa.class);
 
-  private OpcUaClient opcUaClient;
+  private ConnectedOpcUaClient connectedClient;
+  private final OpcUaConfig opcUaConfig;
   private OpcUaParameters params;
 
   private NodeId node;
@@ -73,24 +75,30 @@ public class OpcUa {
     compatibleDataTypes.put(String.class, new Class[]{String.class});
   }
 
-  public void onInvocation(OpcUaParameters params) throws
+  private final OpcUaClientProvider clientProvider;
+
+  public OpcUa(OpcUaClientProvider clientProvider,
+               OpcUaParameters params) {
+    this.clientProvider = clientProvider;
+    this.opcUaConfig = params.config();
+  }
+
+  public void onInvocation() throws
       SpRuntimeException {
 
     try {
-      this.params = params;
-      this.node = NodeId.parse(params.getSelectedNode());
-      opcUaClient = new SpOpcUaClient<>(params.getConfig()).getClient();
-      opcUaClient.connect().get();
+      this.node = NodeId.parse(params.selectedNode());
+      this.connectedClient = clientProvider.getClient(opcUaConfig);
 
     } catch (Exception e) {
-      throw new SpRuntimeException("Could not connect to OPC-UA server: " + 
params.getConfig().getOpcServerURL());
+      throw new SpRuntimeException("Could not connect to OPC-UA server: " + 
params.config().getOpcServerURL());
     }
 
     // check whether input data type and target data type are compatible
     try {
-      Variant value = 
opcUaClient.getAddressSpace().getVariableNode(node).readValue().getValue();
+      Variant value = 
this.connectedClient.getClient().getAddressSpace().getVariableNode(node).readValue().getValue();
       targetDataType = value.getValue().getClass();
-      sourceDataType = XSDMatchings.get(params.getMappingPropertyType());
+      sourceDataType = XSDMatchings.get(params.mappingPropertyType());
       if (!sourceDataType.equals(targetDataType)) {
         if 
(Arrays.stream(compatibleDataTypes.get(sourceDataType)).noneMatch(dt -> 
dt.equals(targetDataType))) {
           throw new SpRuntimeException("Data Type of event of target node are 
not compatible");
@@ -107,40 +115,45 @@ public class OpcUa {
     Variant v = getValue(inputEvent);
 
     if (v == null) {
-      LOG.error("Mapping property type: " + 
this.params.getMappingPropertyType() + " is not supported");
+      LOG.error("Mapping property type: " + this.params.mappingPropertyType() 
+ " is not supported");
     } else {
 
       DataValue value = new DataValue(v);
-      CompletableFuture<StatusCode> f = opcUaClient.writeValue(node, value);
+      CompletableFuture<StatusCode> f = 
this.connectedClient.getClient().writeValue(node, value);
 
       try {
         StatusCode status = f.get();
         if (status.isBad()) {
           if (status.getValue() == 0x80740000L) {
-            LOG.error("Type missmatch! Tried to write value of type: " + 
this.params.getMappingPropertyType()
+            LOG.error("Type missmatch! Tried to write value of type {} ", 
this.params.mappingPropertyType()
                 + " but server did not accept this");
           } else if (status.getValue() == 0x803B0000L) {
             LOG.error("Wrong access level. Not allowed to write to nodes");
           }
           LOG.error(
-              "Value: " + value.getValue().toString() + " could not be written 
to node Id: "
-                  + node.getIdentifier() + " on " + "OPC-UA server: " + 
params.getConfig().getOpcServerURL());
+              "Value: {} could not be written to node Id {} on OPC-UA server 
{}",
+              value.getValue().toString(),
+              node.getIdentifier(),
+              params.config().getOpcServerURL());
         }
       } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Exception: Value: " + value.getValue().toString() + " could 
not be written to node Id: "
-            + node.getIdentifier() + " on " + "OPC-UA server: " + 
params.getConfig().getOpcServerURL());
+        LOG.error(
+            "Exception: Value {} could not be written to node Id {} on OPC_UA 
server {}",
+            value.getValue().toString(),
+            node.getIdentifier(),
+            params.config().getOpcServerURL());
       }
     }
   }
 
   public void onDetach() throws SpRuntimeException {
-    opcUaClient.disconnect();
+    clientProvider.releaseClient(opcUaConfig);
   }
 
   private Variant getValue(Event inputEvent) {
     Variant result = null;
     PrimitiveField propertyPrimitive =
-        
inputEvent.getFieldBySelector(this.params.getMappingPropertySelector()).getAsPrimitive();
+        
inputEvent.getFieldBySelector(this.params.mappingPropertySelector()).getAsPrimitive();
 
     if (targetDataType.equals(Integer.class)) {
       result = new Variant(propertyPrimitive.getAsInt());
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
index 15906af4aa..438bf154e8 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
@@ -20,36 +20,8 @@ package 
org.apache.streampipes.extensions.connectors.opcua.sink;
 
 import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
 
-public final class OpcUaParameters {
-  private final String selectedNode;
-  private final String mappingPropertySelector;
-  private final String mappingPropertyType;
-
-  private final OpcUaConfig config;
-
-  public OpcUaParameters(OpcUaConfig config,
-                         String mappingPropertySelector,
-                         String mappingPropertyType,
-                         String selectedNode) {
-    this.config = config;
-    this.mappingPropertySelector = mappingPropertySelector;
-    this.mappingPropertyType = mappingPropertyType;
-    this.selectedNode = selectedNode;
-  }
-
-  public String getSelectedNode() {
-    return selectedNode;
-  }
-
-  public String getMappingPropertySelector() {
-    return mappingPropertySelector;
-  }
-
-  public String getMappingPropertyType() {
-    return mappingPropertyType;
-  }
-
-  public OpcUaConfig getConfig() {
-    return config;
-  }
+public record OpcUaParameters(OpcUaConfig config,
+                              String mappingPropertySelector,
+                              String mappingPropertyType,
+                              String selectedNode) {
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
index 50a6069cdf..b81f20d49c 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
@@ -26,6 +26,7 @@ import 
org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
 import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
+import 
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
 import 
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
 import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
@@ -45,11 +46,18 @@ import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
 
 public class OpcUaSink implements IStreamPipesDataSink, SupportsRuntimeConfig {
 
+  public static final String ID = 
"org.apache.streampipes.sinks.databases.jvm.opcua";
+
   private OpcUa opcUa;
+  private final OpcUaClientProvider clientProvider;
+
+  public OpcUaSink(OpcUaClientProvider clientProvider) {
+    this.clientProvider = clientProvider;
+  }
 
   @Override
   public IDataSinkConfiguration declareConfig() {
-    var builder = 
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua", 0)
+    var builder = DataSinkBuilder.create(ID, 0)
         .withLocales(Locales.EN)
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .category(DataSinkType.FORWARD)
@@ -62,7 +70,7 @@ public class OpcUaSink implements IStreamPipesDataSink, 
SupportsRuntimeConfig {
     SharedUserConfiguration.appendSharedOpcUaConfig(builder, false);
 
     return DataSinkConfiguration.create(
-        OpcUaSink::new,
+        () -> new OpcUaSink(clientProvider),
         builder.build()
     );
   }
@@ -89,8 +97,8 @@ public class OpcUaSink implements IStreamPipesDataSink, 
SupportsRuntimeConfig {
         config.getSelectedNodeNames().get(0)
     );
 
-    this.opcUa = new OpcUa();
-    this.opcUa.onInvocation(params);
+    this.opcUa = new OpcUa(clientProvider, params);
+    this.opcUa.onInvocation();
   }
 
   @Override
@@ -106,6 +114,6 @@ public class OpcUaSink implements IStreamPipesDataSink, 
SupportsRuntimeConfig {
   @Override
   public StaticProperty resolveConfiguration(String staticPropertyInternalName,
                                              IStaticPropertyExtractor 
extractor) throws SpConfigurationException {
-    return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+    return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName, 
extractor);
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
index aa49a7a5f8..8cd76401f9 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
@@ -25,8 +25,9 @@ import 
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtracto
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import 
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
 import 
org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaNodeBrowser;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import 
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
 import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+import 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
 import 
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
 import org.apache.streampipes.extensions.connectors.opcua.model.OpcNode;
 import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
@@ -37,7 +38,7 @@ import 
org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputSta
 import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
 
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
 import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
@@ -78,7 +79,8 @@ public class OpcUaUtil {
    * @throws AdapterException
    * @throws ParseException
    */
-  public static GuessSchema getSchema(IAdapterParameterExtractor extractor)
+  public static GuessSchema getSchema(OpcUaClientProvider clientProvider,
+                                      IAdapterParameterExtractor extractor)
       throws AdapterException, ParseException {
     var builder = GuessSchemaBuilder.create();
     EventSchema eventSchema = new EventSchema();
@@ -86,14 +88,13 @@ public class OpcUaUtil {
     Map<String, FieldStatusInfo> fieldStatusInfos = new HashMap<>();
     List<EventProperty> allProperties = new ArrayList<>();
 
-    SpOpcUaClient<OpcUaConfig> spOpcUaClient = new SpOpcUaClient<>(
-        
SpOpcUaConfigExtractor.extractSharedConfig(extractor.getStaticPropertyExtractor(),
 new OpcUaConfig())
+    var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(
+        extractor.getStaticPropertyExtractor(), new OpcUaConfig()
     );
-
     try {
-      spOpcUaClient.connect();
+      var connectedClient = clientProvider.getClient(opcUaConfig);
       OpcUaNodeBrowser nodeBrowser =
-          new OpcUaNodeBrowser(spOpcUaClient.getClient(), 
spOpcUaClient.getSpOpcConfig());
+          new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
       List<OpcNode> selectedNodes = nodeBrowser.findNodes();
 
       if (!selectedNodes.isEmpty()) {
@@ -116,12 +117,12 @@ public class OpcUaUtil {
       var nodeIds = selectedNodes.stream()
                                  .map(OpcNode::getNodeId)
                                  .collect(Collectors.toList());
-      var response = spOpcUaClient.getClient()
+      var response = connectedClient.getClient()
                                   .readValues(0, TimestampsToReturn.Both, 
nodeIds);
 
       var returnValues = response.get();
 
-      spOpcUaClient.disconnect();
+      //clientProvider.releaseClient(opcUaConfig);
 
       makeEventPreview(selectedNodes, eventPreview, fieldStatusInfos, 
returnValues);
 
@@ -129,7 +130,9 @@ public class OpcUaUtil {
     } catch (Exception e) {
       throw new AdapterException("Could not guess schema for opc node:  " + 
e.getMessage(), e);
     } finally {
-      spOpcUaClient.disconnect();
+      // TODO
+      //spOpcUaClient.disconnect();
+      clientProvider.releaseClient(opcUaConfig);
     }
 
     eventSchema.setEventProperties(allProperties);
@@ -172,7 +175,8 @@ public class OpcUaUtil {
    * @param parameterExtractor to extract parameters from the OPC UA config
    * @return {@code List<Option>} with available node names for the given OPC 
UA configuration
    */
-  public static RuntimeResolvableTreeInputStaticProperty resolveConfig(String 
internalName,
+  public static RuntimeResolvableTreeInputStaticProperty 
resolveConfig(OpcUaClientProvider clientProvider,
+                                                                       String 
internalName,
                                                                        
IStaticPropertyExtractor parameterExtractor)
       throws SpConfigurationException {
 
@@ -181,19 +185,18 @@ public class OpcUaUtil {
     // access mode and host/url have to be selected
     try {
       
parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
-      
parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
+      
parameterExtractor.selectedSingleValueInternalName(SharedUserConfiguration.SECURITY_MODE,
 String.class);
+      
parameterExtractor.selectedSingleValue(SharedUserConfiguration.SECURITY_POLICY, 
String.class);
     } catch (NullPointerException nullPointerException) {
       return config;
     }
 
-    SpOpcUaClient spOpcUaClient = new SpOpcUaClient(
-        SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new 
OpcUaConfig())
-    );
+    var opcUaConfig = 
SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new 
OpcUaConfig());
 
     try {
-      spOpcUaClient.connect();
+      var connectedClient = clientProvider.getClient(opcUaConfig);
       OpcUaNodeBrowser nodeBrowser =
-          new OpcUaNodeBrowser(spOpcUaClient.getClient(), 
spOpcUaClient.getSpOpcConfig());
+          new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
 
       var nodes = 
nodeBrowser.buildNodeTreeFromOrigin(config.getNextBaseNodeToResolve());
       if (Objects.isNull(config.getNextBaseNodeToResolve())) {
@@ -204,7 +207,7 @@ public class OpcUaUtil {
 
       if (!config.getSelectedNodesInternalNames().isEmpty()) {
         config.setSelectedNodesInternalNames(
-            filterMissingNodes(spOpcUaClient.getClient(), 
config.getSelectedNodesInternalNames())
+            filterMissingNodes(connectedClient.getClient(), 
config.getSelectedNodesInternalNames())
         );
       }
 
@@ -215,13 +218,15 @@ public class OpcUaUtil {
     } catch (ExecutionException | InterruptedException | URISyntaxException e) 
{
       throw new SpConfigurationException("Could not connect to the OPC UA 
server with the provided settings", e);
     } finally {
-      if (spOpcUaClient.getClient() != null) {
-        spOpcUaClient.disconnect();
-      }
+      clientProvider.releaseClient(opcUaConfig);
+      // TODO
+//      if (spOpcUaClient.getClient() != null) {
+//        spOpcUaClient.disconnect();
+//      }
     }
   }
 
-  public static List<String> filterMissingNodes(OpcUaClient opcUaClient,
+  public static List<String> filterMissingNodes(UaClient opcUaClient,
                                                 List<String> selectedNodes) {
     return selectedNodes.stream().filter(selectedNode -> {
       try {
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
new file mode 100644
index 0000000000..576f3eeb6f
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.utils;
+
+import org.apache.streampipes.model.Tuple2;
+
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
+import java.util.List;
+
+public class SecurityUtils {
+
+  public static List<Tuple2<String, String>> getAvailableSecurityModes() {
+    return List.of(
+        new Tuple2<>("None", MessageSecurityMode.None.name()),
+        new Tuple2<>("Sign", MessageSecurityMode.Sign.name()),
+        new Tuple2<>("Sign & Encrypt", 
MessageSecurityMode.SignAndEncrypt.name())
+    );
+  }
+
+  public static List<SecurityPolicy> getAvailableSecurityPolicies() {
+    return List.of(
+        SecurityPolicy.None,
+        SecurityPolicy.Basic128Rsa15,
+        SecurityPolicy.Basic256,
+        SecurityPolicy.Basic256Sha256,
+        SecurityPolicy.Aes128_Sha256_RsaOaep,
+        SecurityPolicy.Aes256_Sha256_RsaPss
+    );
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
index 079243ab0f..185776e43a 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
@@ -26,7 +26,40 @@
 
 ## Description
 
-Reads values from an OPC-UA server repeatedly
+This adapter reads node values from an OPC-UA server.
+The adapter supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from 
the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode, 
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted 
certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of 
type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify 
itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client 
Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+├─ pki/
+│  ├─ issuers/
+│  ├─ rejected/
+│  ├─ trusted/
+│  │  ├─ certs/
+│  │  ├─ crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
 
 ***
 
@@ -40,7 +73,15 @@ Reads values from an OPC-UA server repeatedly
 
 Duration of the polling interval in seconds
 
-### Anonymous vs. Username/Password
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
 
 Choose whether you want to connect anonymously or authenticate using your 
credentials.
 
@@ -54,14 +95,6 @@ Where can the OPC UA server be found?
 &nbsp;&nbsp;&nbsp;&nbsp; **URL**: Specify the server's full `URL` (including 
port), can be with our without leading `opc.tcp://`<br/>
 &nbsp;&nbsp;&nbsp;&nbsp; **Host/Port**: Insert the `host` address (with or 
without leading `opc.tcp://`) and the `port`<br/>
 
-### Namespace Index
-
-Requires the index of the namespace you want to connect to.
-
-### Node ID
-
-The identifier of the node you want to read from, numbers and strings are both 
valid.
-
 ### Available Nodes
 
 Shows all available nodes once namespace index and node ID are given.
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
index 995045ec7a..15f54c5768 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
@@ -38,10 +38,22 @@ OPC_SERVER_HOST.description=Example: test-server.com, 
opc.tcp://test-server.com)
 OPC_SERVER_PORT.title=Port
 OPC_SERVER_PORT.description=Example: 4840
 
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if 
security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
+
 ACCESS_MODE.title=Security Level
 ACCESS_MODE.description=Select the OPC UA security level for the connection
 
-USERNAME_GROUP.title=Sign (username & password)
+USERNAME_GROUP.title=Username & Password
 USERNAME_GROUP.description=
 
 UNAUTHENTICATED.title=None
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
index 27221467c9..ee28512b27 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
@@ -26,7 +26,40 @@
 
 ## Description
 
-Allows to write events to an OPC-UA server.
+This data sink can be used to write values to an OPC-UA server.
+The sink supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from 
the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode,
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted 
certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of 
type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify 
itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client 
Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+├─ pki/
+│  ├─ issuers/
+│  ├─ rejected/
+│  ├─ trusted/
+│  │  ├─ certs/
+│  │  ├─ crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
 
 ***
 
@@ -46,6 +79,21 @@ The hostname of the OPC-UA server.
 
 The port of the OPC-UA server.
 
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
+
+Choose whether you want to connect anonymously or authenticate using your 
credentials.
+
+&nbsp;&nbsp;&nbsp;&nbsp; **Anonymous**: No further information required <br/>
+&nbsp;&nbsp;&nbsp;&nbsp; **Username/Password**: Insert your `username` and 
`password` to access the OPC UA server
+
 ### Namespace Index
 
 The namespace index in which the node should be written
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
index 7e78d16a1a..b26c09c598 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
@@ -56,3 +56,15 @@ AVAILABLE_NODES.description=Select the nodes that are 
relevant for you. Please e
 
 MAPPING_PROPERY.title=Field to write
 MAPPING_PROPERY.description=The field that should be written to the OPC-UA 
server
+
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if 
security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
diff --git 
a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
 
b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
index c7c1a4bfe7..8823d519b3 100644
--- 
a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
+++ 
b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
@@ -83,7 +83,11 @@ export class DataExplorerWidgetDataSettingsComponent 
implements OnInit {
             this.dataExplorerService.getAllPersistedDataStreams(),
             this.datalakeRestService.getAllMeasurementSeries(),
         ).subscribe(response => {
-            this.availablePipelines = response[0];
+            this.availablePipelines = response[0].filter(
+                p =>
+                    response[1].find(m => m.measureName === p.measureName) !==
+                    undefined,
+            );
             this.availableMeasurements = response[1];
 
             // replace pipeline event schemas. Reason: Available measures do 
not contain field for timestamp

Reply via email to