This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch
3353-support-security-policies-signsignandencrypt-in-opc-ua-adapter-and-sink
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3353-support-security-policies-signsignandencrypt-in-opc-ua-adapter-and-sink
by this push:
new 7219703e36 feat(#3353): Support sign and encrypt security modes in
OPC-UA adapter and sink
7219703e36 is described below
commit 7219703e361da95c2c2c26cd22a70d66a1f6335a
Author: Dominik Riemer <[email protected]>
AuthorDate: Mon Nov 25 17:57:12 2024 +0100
feat(#3353): Support sign and encrypt security modes in OPC-UA adapter and
sink
---
pom.xml | 2 +-
.../apache/streampipes/commons/constants/Envs.java | 7 +-
.../commons/environment/DefaultEnvironment.java | 19 ++++
.../commons/environment/Environment.java | 13 +++
.../streampipes-connectors-opcua/pom.xml | 4 +-
.../opcua/OpcUaConnectorsModuleExport.java | 18 ++-
.../connectors/opcua/adapter/OpcUaAdapter.java | 46 ++++----
.../connectors/opcua/adapter/OpcUaNodeBrowser.java | 7 +-
.../opcua/adapter/OpcUaNodeMetadataExtractor.java | 6 +-
...pOpcUaClient.java => ConnectedOpcUaClient.java} | 62 +++--------
.../opcua/client/OpcUaClientProvider.java | 75 +++++++++++++
.../connectors/opcua/client/SpOpcUaClient.java | 121 +--------------------
.../config/MiloOpcUaConfigurationProvider.java | 67 ++----------
.../connectors/opcua/config/OpcUaConfig.java | 44 ++++----
.../opcua/config/SharedUserConfiguration.java | 28 +++--
.../opcua/config/SpOpcUaConfigExtractor.java | 39 +++++--
.../config/identity/AnonymousIdentityConfig.java | 35 ++++++
.../opcua/config/identity/IdentityConfig.java | 26 +++++
.../identity/UsernamePasswordIdentityConfig.java | 44 ++++++++
.../opcua/config/security/KeyStoreLoader.java | 87 +++++++++++++++
.../opcua/config/security/SecurityConfig.java | 110 +++++++++++++++++++
.../opcua/migration/OpcUaAdapterMigrationV4.java | 121 +++++++++++++++++++++
.../opcua/migration/OpcUaSinkMigrationV1.java | 48 ++++++++
.../extensions/connectors/opcua/sink/OpcUa.java | 53 +++++----
.../connectors/opcua/sink/OpcUaParameters.java | 36 +-----
.../connectors/opcua/sink/OpcUaSink.java | 18 ++-
.../connectors/opcua/utils/OpcUaUtil.java | 51 +++++----
.../connectors/opcua/utils/SecurityUtils.java | 48 ++++++++
.../documentation.md | 53 +++++++--
.../strings.en | 14 ++-
.../documentation.md | 50 ++++++++-
.../strings.en | 12 ++
...data-explorer-widget-data-settings.component.ts | 6 +-
33 files changed, 977 insertions(+), 393 deletions(-)
diff --git a/pom.xml b/pom.xml
index c1285c0a95..e059400d30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<commons-pool2.version>2.12.0</commons-pool2.version>
<commons-text.version>1.12.0</commons-text.version>
<ditto-client.version>1.0.0</ditto-client.version>
- <eclipse.milo.version>0.6.9</eclipse.milo.version>
+ <eclipse.milo.version>0.6.14</eclipse.milo.version>
<file-management.version>3.1.0</file-management.version>
<flink.version>1.13.5</flink.version>
<fogsy-qudt.version>1.0</fogsy-qudt.version>
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 7ad56fe402..5723633b43 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -108,7 +108,12 @@ public enum Envs {
// expects a comma separated string of service names
SP_SERVICE_TAGS("SP_SERVICE_TAGS", ""),
- SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", "");
+ SP_ALLOWED_UPLOAD_FILETYPES("SP_ALLOWED_UPLOAD_FILETYPES", "", ""),
+
+ SP_OPCUA_SECURITY_DIR("SP_OPCUA_SECURITY_DIR", "/opc-ua-security"),
+ SP_OPCUA_KEYSTORE_FILE("SP_OPCUA_KEYSTORE_FILE", "keystore.pfx"),
+ SP_OPCUA_KEYSTORE_PASSWORD("SP_OPCUA_KEYSTORE_PASSWORD", "password"),
+ SP_OPCUA_APPLICATION_URI("SP_OPCUA_APPLICATION_URI",
"urn:org:apache:streampipes:opcua:client");
private final String envVariableName;
private String defaultValue;
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 36ed402360..018cd5e6f4 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -328,4 +328,23 @@ public class DefaultEnvironment implements Environment {
return new StringEnvironmentVariable(Envs.SP_ALLOWED_UPLOAD_FILETYPES);
}
+ @Override
+ public StringEnvironmentVariable getOpcUaSecurityDir() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_SECURITY_DIR);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOpcUaKeystoreFile() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_FILE);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOpcUaKeystorePassword() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_KEYSTORE_PASSWORD);
+ }
+
+ @Override
+ public StringEnvironmentVariable getOpcUaApplicationUri() {
+ return new StringEnvironmentVariable(Envs.SP_OPCUA_APPLICATION_URI);
+ }
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index d1d6efab67..7a99b96283 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -36,7 +36,9 @@ public interface Environment {
IntEnvironmentVariable getServicePort();
StringEnvironmentVariable getSpCoreScheme();
+
StringEnvironmentVariable getSpCoreHost();
+
IntEnvironmentVariable getSpCorePort();
// Time series storage env variables
@@ -144,12 +146,15 @@ public interface Environment {
// Broker defaults
StringEnvironmentVariable getKafkaHost();
+
IntEnvironmentVariable getKafkaPort();
StringEnvironmentVariable getMqttHost();
+
IntEnvironmentVariable getMqttPort();
StringEnvironmentVariable getNatsHost();
+
IntEnvironmentVariable getNatsPort();
StringEnvironmentVariable getPulsarUrl();
@@ -158,4 +163,12 @@ public interface Environment {
StringEnvironmentVariable getAllowedUploadFiletypes();
+ StringEnvironmentVariable getOpcUaSecurityDir();
+
+ StringEnvironmentVariable getOpcUaKeystoreFile();
+
+ StringEnvironmentVariable getOpcUaKeystorePassword();
+
+ StringEnvironmentVariable getOpcUaApplicationUri();
+
}
diff --git a/streampipes-extensions/streampipes-connectors-opcua/pom.xml
b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
index 71eaf09917..673e6858c6 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
@@ -21,9 +21,9 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-parent</artifactId>
+ <artifactId>streampipes-extensions</artifactId>
<version>0.97.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
+ <relativePath>../pom.xml</relativePath>
</parent>
<artifactId>streampipes-connectors-opcua</artifactId>
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
index 63aacdab58..03fc8448a4 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
@@ -23,25 +23,35 @@ import
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
+import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV4;
+import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaSinkMigrationV1;
import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
import java.util.List;
public class OpcUaConnectorsModuleExport implements IExtensionModuleExport {
+
+ private final OpcUaClientProvider clientProvider;
+
+ public OpcUaConnectorsModuleExport() {
+ this.clientProvider = new OpcUaClientProvider();
+ }
+
@Override
public List<StreamPipesAdapter> adapters() {
return List.of(
- new OpcUaAdapter()
+ new OpcUaAdapter(clientProvider)
);
}
@Override
public List<IStreamPipesPipelineElement<?>> pipelineElements() {
return List.of(
- new OpcUaSink()
+ new OpcUaSink(clientProvider)
);
}
@@ -50,7 +60,9 @@ public class OpcUaConnectorsModuleExport implements
IExtensionModuleExport {
return List.of(
new OpcUaAdapterMigrationV1(),
new OpcUaAdapterMigrationV2(),
- new OpcUaAdapterMigrationV3()
+ new OpcUaAdapterMigrationV3(),
+ new OpcUaAdapterMigrationV4(),
+ new OpcUaSinkMigrationV1()
);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
index baf5cfe95b..526ff7bb24 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
@@ -29,7 +29,8 @@ import
org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeCont
import
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import
org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import
org.apache.streampipes.extensions.connectors.opcua.config.OpcUaAdapterConfig;
import
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
@@ -42,7 +43,6 @@ import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.connect.rules.schema.DeleteRuleDescription;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.staticproperty.StaticProperty;
-import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -66,7 +66,6 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
-import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.SUBSCRIPTION_MODE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil.getSchema;
@@ -78,7 +77,9 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
private static final Logger LOG =
LoggerFactory.getLogger(OpcUaAdapter.class);
private int pullingIntervalMilliSeconds;
- private SpOpcUaClient<OpcUaAdapterConfig> spOpcUaClient;
+ private final OpcUaClientProvider clientProvider;
+ private ConnectedOpcUaClient connectedClient;
+ private OpcUaAdapterConfig opcUaAdapterConfig;
private List<OpcNode> allNodes;
private List<NodeId> allNodeIds;
private int numberProperties;
@@ -92,15 +93,14 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
*/
private final Map<String, String> nodeIdToLabelMapping;
- public OpcUaAdapter() {
- super();
+ public OpcUaAdapter(OpcUaClientProvider clientProvider) {
+ this.clientProvider = clientProvider;
this.numberProperties = 0;
this.event = new HashMap<>();
this.nodeIdToLabelMapping = new HashMap<>();
}
private void prepareAdapter(IAdapterParameterExtractor extractor) throws
AdapterException {
-
this.allNodeIds = new ArrayList<>();
List<String> deleteKeys = extractor
.getAdapterDescription()
@@ -111,9 +111,9 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
.collect(Collectors.toList());
try {
- this.spOpcUaClient.connect();
+ this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
OpcUaNodeBrowser browserClient =
- new OpcUaNodeBrowser(this.spOpcUaClient.getClient(),
this.spOpcUaClient.getSpOpcConfig());
+ new OpcUaNodeBrowser(this.connectedClient.getClient(),
this.opcUaAdapterConfig);
this.allNodes = browserClient.findNodes(deleteKeys);
@@ -121,11 +121,11 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
this.allNodeIds.add(node.getNodeId());
}
- if (spOpcUaClient.getSpOpcConfig().inPullMode()) {
- this.pullingIntervalMilliSeconds =
spOpcUaClient.getSpOpcConfig().getPullIntervalMilliSeconds();
+ if (opcUaAdapterConfig.inPullMode()) {
+ this.pullingIntervalMilliSeconds =
opcUaAdapterConfig.getPullIntervalMilliSeconds();
} else {
this.numberProperties = this.allNodeIds.size();
- this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
+ this.connectedClient.createListSubscription(this.allNodeIds, this);
}
this.allNodes.forEach(node ->
this.nodeIdToLabelMapping.put(node.getNodeId().toString(), node.getLabel()));
@@ -139,7 +139,7 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public void pullData() throws ExecutionException, RuntimeException,
InterruptedException, TimeoutException {
var response =
- this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both,
this.allNodeIds);
+ this.connectedClient.getClient().readValues(0,
TimestampsToReturn.Both, this.allNodeIds);
boolean badStatusCodeReceived = false;
boolean emptyValueReceived = false;
List<DataValue> returnValues =
@@ -168,7 +168,7 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
return badStatusCodeReceived
- && this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
+ && this.opcUaAdapterConfig.getIncompleteEventStrategy()
.equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
}
@@ -208,13 +208,13 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
- this.spOpcUaClient = new SpOpcUaClient<>(
-
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor())
- );
+ this.opcUaAdapterConfig =
+
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor());
+ //this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
this.collector = collector;
this.prepareAdapter(extractor);
- if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+ if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler = new PullAdapterScheduler();
this.pullAdapterScheduler.schedule(this,
extractor.getAdapterDescription().getElementId());
}
@@ -223,9 +223,9 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public void onAdapterStopped(IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
- this.spOpcUaClient.disconnect();
+ clientProvider.releaseClient(this.opcUaAdapterConfig);
- if (this.spOpcUaClient.getSpOpcConfig().inPullMode()) {
+ if (this.opcUaAdapterConfig.inPullMode()) {
this.pullAdapterScheduler.shutdown();
}
}
@@ -233,12 +233,12 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor
extractor) throws SpConfigurationException {
- return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+ return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName,
extractor);
}
@Override
public IAdapterConfiguration declareConfig() {
- var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
+ var builder = AdapterConfigurationBuilder.create(ID, 4, () -> new
OpcUaAdapter(clientProvider))
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
@@ -255,6 +255,6 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
IAdapterGuessSchemaContext
adapterGuessSchemaContext) throws AdapterException {
- return getSchema(extractor);
+ return getSchema(clientProvider, extractor);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
index e7dde9d990..2a150fa355 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
@@ -25,6 +25,7 @@ import
org.apache.streampipes.model.staticproperty.TreeInputNode;
import org.eclipse.milo.opcua.sdk.client.AddressSpace;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.Identifiers;
@@ -45,13 +46,13 @@ import java.util.stream.Collectors;
public class OpcUaNodeBrowser {
- private final OpcUaClient client;
+ private final UaClient client;
private final OpcUaConfig spOpcConfig;
private static final Logger LOG =
LoggerFactory.getLogger(OpcUaNodeBrowser.class);
public OpcUaNodeBrowser(
- OpcUaClient client,
+ UaClient client,
OpcUaConfig spOpcUaClientConfig
) {
this.client = client;
@@ -127,7 +128,7 @@ public class OpcUaNodeBrowser {
}
private List<TreeInputNode> findChildren(
- OpcUaClient client,
+ UaClient client,
NodeId nodeId
) throws UaException {
return client
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
index 04dda6eb62..bcaf11a476 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeMetadataExtractor.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.extensions.connectors.opcua.adapter;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
@@ -32,12 +32,12 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
public class OpcUaNodeMetadataExtractor {
- private final OpcUaClient client;
+ private final UaClient client;
private final UaNode node;
private final Map<String, Object> metadata;
- public OpcUaNodeMetadataExtractor(OpcUaClient client, UaNode node) {
+ public OpcUaNodeMetadataExtractor(UaClient client, UaNode node) {
this.client = client;
this.node = node;
this.metadata = new HashMap<>();
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
similarity index 79%
copy from
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
copy to
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
index 95fda81dbd..d71d85646f 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
@@ -18,19 +18,13 @@
package org.apache.streampipes.extensions.connectors.opcua.client;
-
-import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
-import
org.apache.streampipes.extensions.connectors.opcua.config.MiloOpcUaConfigurationProvider;
-import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
-import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import
org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.stack.core.AttributeId;
-import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
@@ -44,53 +38,21 @@ import
org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
-/***
- * Wrapper class for all OPC UA specific stuff.
- */
-public class SpOpcUaClient<T extends OpcUaConfig> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SpOpcUaClient.class);
-
- private OpcUaClient client;
- private final T spOpcConfig;
+public class ConnectedOpcUaClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConnectedOpcUaClient.class);
+ private final UaClient client;
private static final AtomicLong clientHandles = new AtomicLong(1L);
- public SpOpcUaClient(T config) {
- this.spOpcConfig = config;
- }
-
- /***
- *
- * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
- */
- public OpcUaClient getClient() {
- return this.client;
- }
-
- /***
- * Establishes appropriate connection to OPC UA endpoint depending on the
{@link SpOpcUaClient} instance
- *
- * @throws UaException An exception occurring during OPC connection
- */
- public void connect()
- throws UaException, ExecutionException, InterruptedException,
SpConfigurationException, URISyntaxException {
- OpcUaClientConfig clientConfig = new
MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
- this.client = OpcUaClient.create(clientConfig);
- client.connect().get();
- }
-
- public void disconnect() {
- client.disconnect();
+ public ConnectedOpcUaClient(UaClient client) {
+ this.client = client;
}
/***
@@ -178,7 +140,15 @@ public class SpOpcUaClient<T extends OpcUaConfig> {
}
}
- public T getSpOpcConfig() {
- return spOpcConfig;
+ /***
+ *
+ * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
+ */
+ public UaClient getClient() {
+ return this.client;
+ }
+
+ public void disconnect() {
+ client.disconnect();
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
new file mode 100644
index 0000000000..b3f907bd4e
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/OpcUaClientProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.client;
+
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+public class OpcUaClientProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OpcUaClientProvider.class);
+
+ private final Map<String, ConnectedOpcUaClient> clients = new
ConcurrentHashMap<>();
+ private final Map<String, Integer> consumers = new ConcurrentHashMap<>();
+
+ public synchronized <T extends OpcUaConfig> ConnectedOpcUaClient getClient(T
config)
+ throws UaException, SpConfigurationException, URISyntaxException,
ExecutionException, InterruptedException {
+ var serverId = config.getUniqueServerId();
+ if (clients.containsKey(serverId)) {
+ LOG.debug("Adding new consumer to client {}", serverId);
+ consumers.put(serverId, consumers.get(config.getUniqueServerId()) + 1);
+ return clients.get(serverId);
+ } else {
+ LOG.debug("Creating new client {}", serverId);
+ var connectedClient = new SpOpcUaClient<>(config).connect();
+ clients.put(serverId, connectedClient);
+ consumers.put(serverId, 1);
+ return connectedClient;
+ }
+ }
+
+ public <T extends OpcUaConfig> void releaseClient(T config) {
+ String serverId = config.getUniqueServerId();
+ LOG.debug("Releasing client {}", serverId);
+
+ synchronized (this) {
+ consumers.computeIfPresent(serverId, (key, count) -> {
+ int updatedCount = count - 1;
+ if (updatedCount <= 0) {
+ LOG.debug("Disconnecting client {}", serverId);
+ if (clients.containsKey(serverId)) {
+ clients.get(serverId).disconnect();
+ clients.remove(serverId);
+ }
+ return null;
+ }
+ return updatedCount;
+ });
+ }
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
index 95fda81dbd..a17d373b66 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
@@ -20,38 +20,17 @@ package
org.apache.streampipes.extensions.connectors.opcua.client;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
-import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import
org.apache.streampipes.extensions.connectors.opcua.config.MiloOpcUaConfigurationProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
-import
org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
-import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
-import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
-import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
-import
org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
-import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
-import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
/***
* Wrapper class for all OPC UA specific stuff.
@@ -63,120 +42,26 @@ public class SpOpcUaClient<T extends OpcUaConfig> {
private OpcUaClient client;
private final T spOpcConfig;
- private static final AtomicLong clientHandles = new AtomicLong(1L);
-
public SpOpcUaClient(T config) {
this.spOpcConfig = config;
}
- /***
- *
- * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
- */
- public OpcUaClient getClient() {
- return this.client;
- }
-
/***
* Establishes appropriate connection to OPC UA endpoint depending on the
{@link SpOpcUaClient} instance
*
* @throws UaException An exception occurring during OPC connection
*/
- public void connect()
+ public ConnectedOpcUaClient connect()
throws UaException, ExecutionException, InterruptedException,
SpConfigurationException, URISyntaxException {
OpcUaClientConfig clientConfig = new
MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
- this.client = OpcUaClient.create(clientConfig);
+ var client = OpcUaClient.create(clientConfig);
client.connect().get();
+ return new ConnectedOpcUaClient(client);
}
- public void disconnect() {
- client.disconnect();
- }
-
- /***
- * Register subscriptions for given OPC UA nodes
- * @param nodes List of {@link
org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
- * @param opcUaAdapter current instance of {@link OpcUaAdapter}
- * @throws Exception
- */
- public void createListSubscription(List<NodeId> nodes,
- OpcUaAdapter opcUaAdapter) throws
Exception {
- client.getSubscriptionManager().addSubscriptionListener(new
UaSubscriptionManager.SubscriptionListener() {
- @Override
- public void onSubscriptionTransferFailed(UaSubscription subscription,
StatusCode statusCode) {
- LOG.warn("Transfer for subscriptionId={} failed: {}",
subscription.getSubscriptionId(), statusCode);
- try {
- initSubscription(nodes, opcUaAdapter);
- } catch (Exception e) {
- LOG.error("Re-creating the subscription failed", e);
- }
- }
- });
-
- initSubscription(nodes, opcUaAdapter);
- }
-
-
- public void initSubscription(List<NodeId> nodes,
- OpcUaAdapter opcUaAdapter) throws Exception {
- /*
- * create a subscription @ 1000ms
- */
- UaSubscription subscription =
this.client.getSubscriptionManager().createSubscription(1000.0).get();
-
- List<CompletableFuture<DataValue>> values = new ArrayList<>();
-
- for (NodeId node : nodes) {
- values.add(this.client.readValue(0, TimestampsToReturn.Both, node));
- }
-
- for (CompletableFuture<DataValue> value : values) {
- if (value.get().getValue().toString().contains("null")) {
- LOG.error("Node has no value");
- }
- }
- List<ReadValueId> readValues = new ArrayList<>();
- // Read a specific value attribute
- for (NodeId node : nodes) {
- readValues.add(new ReadValueId(node, AttributeId.Value.uid(), null,
QualifiedName.NULL_VALUE));
- }
- List<MonitoredItemCreateRequest> requests = new ArrayList<>();
-
- for (ReadValueId readValue : readValues) {
- // important: client handle must be unique per item
- UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
- MonitoringParameters parameters = new MonitoringParameters(
- clientHandle,
- 1000.0, // sampling interval
- null, // filter, null means use default
- uint(10), // queue size
- true // discard oldest
- );
-
- requests.add(new MonitoredItemCreateRequest(readValue,
MonitoringMode.Reporting, parameters));
- }
-
- UaSubscription.ItemCreationCallback onItemCreated =
- (item, i) -> item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
- List<UaMonitoredItem> items = subscription.createMonitoredItems(
- TimestampsToReturn.Both,
- requests,
- onItemCreated
- ).get();
-
- for (UaMonitoredItem item : items) {
- NodeId tagId = item.getReadValueId().getNodeId();
- if (item.getStatusCode().isGood()) {
- LOG.info("item created for nodeId=" + tagId);
- } else {
- LOG.error("failed to create item for " +
item.getReadValueId().getNodeId() + item.getStatusCode());
- }
- }
- }
public T getSpOpcConfig() {
return spOpcConfig;
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
index 9813d52fbb..8db41a8e11 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/MiloOpcUaConfigurationProvider.java
@@ -18,19 +18,15 @@
package org.apache.streampipes.extensions.connectors.opcua.config;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
-import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
-import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -39,65 +35,16 @@ public class MiloOpcUaConfigurationProvider {
public OpcUaClientConfig makeClientConfig(OpcUaConfig spOpcConfig)
throws ExecutionException, InterruptedException,
SpConfigurationException, URISyntaxException {
String opcServerUrl = spOpcConfig.getOpcServerURL();
+ String applicationUri =
Environments.getEnvironment().getOpcUaApplicationUri().getValueOrDefault();
List<EndpointDescription> endpoints =
DiscoveryClient.getEndpoints(opcServerUrl).get();
- String host = opcServerUrl.split("://")[1].split(":")[0];
- EndpointDescription tmpEndpoint = endpoints
- .stream()
- .filter(e ->
e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
- .findFirst()
- .orElseThrow(() -> new SpConfigurationException("No endpoint with
security policy none"));
+ var builder = OpcUaClientConfig.builder()
+ .setApplicationName(LocalizedText.english("Apache StreamPipes"))
+ .setApplicationUri(applicationUri);
- tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
- endpoints = Collections.singletonList(tmpEndpoint);
+ spOpcConfig.getSecurityConfig().configureSecurityPolicy(opcServerUrl,
endpoints, builder);
+ spOpcConfig.getIdentityConfig().configureIdentity(builder);
- EndpointDescription endpoint = endpoints
- .stream()
- .filter(e ->
e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
- .findFirst().orElseThrow(() -> new SpConfigurationException("no
desired endpoints returned"));
-
- return buildConfig(endpoint, spOpcConfig);
- }
-
- private OpcUaClientConfig buildConfig(EndpointDescription endpoint,
- OpcUaConfig spOpcConfig) {
-
- OpcUaClientConfigBuilder builder = defaultBuilder(endpoint);
- if (!spOpcConfig.isUnauthenticated()) {
- builder.setIdentityProvider(new
UsernameProvider(spOpcConfig.getUsername(), spOpcConfig.getPassword()));
- }
return builder.build();
}
-
- private OpcUaClientConfigBuilder defaultBuilder(EndpointDescription
endpoint) {
- return OpcUaClientConfig.builder()
- .setApplicationName(LocalizedText.english("eclipse milo opc-ua
client"))
- .setApplicationUri("urn:eclipse:milo:examples:client")
- .setEndpoint(endpoint);
- }
-
- private EndpointDescription updateEndpointUrl(
- EndpointDescription original, String hostname) throws URISyntaxException
{
-
- URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
-
- String endpointUrl = String.format(
- "%s://%s:%s%s",
- uri.getScheme(),
- hostname,
- uri.getPort(),
- uri.getPath()
- );
-
- return new EndpointDescription(
- endpointUrl,
- original.getServer(),
- original.getServerCertificate(),
- original.getSecurityMode(),
- original.getSecurityPolicyUri(),
- original.getUserIdentityTokens(),
- original.getTransportProfileUri(),
- original.getSecurityLevel()
- );
- }
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
index 6db027d861..128f1bc1f9 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaConfig.java
@@ -18,15 +18,21 @@
package org.apache.streampipes.extensions.connectors.opcua.config;
+import
org.apache.streampipes.extensions.connectors.opcua.config.identity.IdentityConfig;
+import
org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
+
import java.util.List;
public class OpcUaConfig {
private String opcServerURL;
- private boolean unauthenticated;
- private String username;
- private String password;
private List<String> selectedNodeNames;
+ private IdentityConfig identityConfig;
+ private SecurityConfig securityPolicyConfig;
+
+ public OpcUaConfig() {
+
+ }
public String getOpcServerURL() {
return opcServerURL;
@@ -36,35 +42,31 @@ public class OpcUaConfig {
this.opcServerURL = opcServerURL;
}
- public boolean isUnauthenticated() {
- return unauthenticated;
- }
-
- public void setUnauthenticated(boolean unauthenticated) {
- this.unauthenticated = unauthenticated;
+ public List<String> getSelectedNodeNames() {
+ return selectedNodeNames;
}
- public String getUsername() {
- return username;
+ public void setSelectedNodeNames(List<String> selectedNodeNames) {
+ this.selectedNodeNames = selectedNodeNames;
}
- public void setUsername(String username) {
- this.username = username;
+ public IdentityConfig getIdentityConfig() {
+ return identityConfig;
}
- public String getPassword() {
- return password;
+ public void setIdentityConfig(IdentityConfig identityConfig) {
+ this.identityConfig = identityConfig;
}
- public void setPassword(String password) {
- this.password = password;
+ public SecurityConfig getSecurityConfig() {
+ return securityPolicyConfig;
}
- public List<String> getSelectedNodeNames() {
- return selectedNodeNames;
+ public void setSecurityConfig(SecurityConfig securityPolicyConfig) {
+ this.securityPolicyConfig = securityPolicyConfig;
}
- public void setSelectedNodeNames(List<String> selectedNodeNames) {
- this.selectedNodeNames = selectedNodeNames;
+ public String getUniqueServerId() {
+ return String.format("%s-%s-%s", opcServerURL, securityPolicyConfig,
identityConfig);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
index 9b5f541071..1059ce7e38 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.extensions.connectors.opcua.config;
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
@@ -29,7 +30,6 @@ import org.apache.streampipes.sdk.helpers.Labels;
import java.util.List;
import static
org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter.PULL_GROUP;
-import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.HOST_PORT;
@@ -41,7 +41,6 @@ import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_URL;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
-import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
@@ -51,14 +50,27 @@ public class SharedUserConfiguration {
public static final String INCOMPLETE_OPTION_IGNORE = "ignore-event";
public static final String INCOMPLETE_OPTION_SEND = "send-event";
+ public static final String SECURITY_MODE = "securityMode";
+ public static final String SECURITY_POLICY = "securityPolicy";
+ public static final String USER_AUTHENTICATION = "userAuthentication";
+ public static final String USER_AUTHENTICATION_ANONYMOUS = "anonymous";
+
public static void
appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBuilder<?, ?>
builder,
boolean adapterConfig) {
var dependsOn = getDependsOn(adapterConfig);
builder
- .requiredAlternatives(Labels.withId(ACCESS_MODE),
- Alternatives.from(Labels.withId(UNAUTHENTICATED)),
+ .requiredSingleValueSelection(
+ Labels.withId(SECURITY_MODE),
+ SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new
Option(mode.k, mode.v)).toList()
+ )
+ .requiredSingleValueSelection(
+ Labels.withId(SECURITY_POLICY),
+ SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new
Option(p.name())).toList()
+ )
+ .requiredAlternatives(Labels.withId(USER_AUTHENTICATION),
+ Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
Alternatives.from(Labels.withId(USERNAME_GROUP),
StaticProperties.group(
Labels.withId(USERNAME_GROUP),
@@ -104,7 +116,7 @@ public class SharedUserConfiguration {
public static OneOfStaticProperty getIncompleteEventConfig() {
return StaticProperties.singleValueSelection(
- Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
+ Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
List.of(
new Option("Ignore (only complete messages are sent)",
INCOMPLETE_OPTION_IGNORE),
new Option("Send (incomplete messages are sent)",
INCOMPLETE_OPTION_SEND)
@@ -115,10 +127,12 @@ public class SharedUserConfiguration {
public static List<String> getDependsOn(boolean adapterConfig) {
return adapterConfig ? List.of(
ADAPTER_TYPE.name(),
- ACCESS_MODE.name(),
+ SECURITY_MODE,
+ SECURITY_POLICY,
OPC_HOST_OR_URL.name()
) : List.of(
- ACCESS_MODE.name(),
+ SECURITY_MODE,
+ SECURITY_POLICY,
OPC_HOST_OR_URL.name());
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
index 6741a3e6f8..333dbadc06 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
@@ -20,11 +20,16 @@ package
org.apache.streampipes.extensions.connectors.opcua.config;
import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import
org.apache.streampipes.extensions.connectors.opcua.config.identity.AnonymousIdentityConfig;
+import
org.apache.streampipes.extensions.connectors.opcua.config.identity.UsernamePasswordIdentityConfig;
+import
org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
import java.util.List;
-import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_HOST_OR_URL;
@@ -35,7 +40,6 @@ import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
-import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
public class SpOpcUaConfigExtractor {
@@ -71,20 +75,31 @@ public class SpOpcUaConfigExtractor {
}
public static <T extends OpcUaConfig> T
extractSharedConfig(IParameterExtractor extractor,
- T config) {
+ T config) {
String selectedAlternativeConnection =
extractor.selectedAlternativeInternalId(OPC_HOST_OR_URL.name());
+
String selectedAlternativeAuthentication =
- extractor.selectedAlternativeInternalId(ACCESS_MODE.name());
+
extractor.selectedAlternativeInternalId(SharedUserConfiguration.USER_AUTHENTICATION);
+
List<String> selectedNodeNames =
extractor.selectedTreeNodesInternalNames(AVAILABLE_NODES.name(),
String.class);
-
config.setSelectedNodeNames(selectedNodeNames);
- boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
- boolean unauthenticated =
selectedAlternativeAuthentication.equals(UNAUTHENTICATED.name());
+ String selectedSecurityMode = extractor.selectedSingleValueInternalName(
+ SharedUserConfiguration.SECURITY_MODE,
+ String.class
+ );
+ String selectedSecurityPolicy = extractor.selectedSingleValue(
+ SharedUserConfiguration.SECURITY_POLICY,
+ String.class
+ );
+ config.setSecurityConfig(new SecurityConfig(
+ MessageSecurityMode.valueOf(selectedSecurityMode),
+ SecurityPolicy.valueOf(selectedSecurityPolicy)));
+ boolean useURL = selectedAlternativeConnection.equals(OPC_URL.name());
if (useURL) {
String serverAddress =
extractor.singleValueParameter(OPC_SERVER_URL.name(), String.class);
@@ -97,15 +112,15 @@ public class SpOpcUaConfigExtractor {
config.setOpcServerURL(serverAddress + ":" + port);
}
+ boolean unauthenticated = selectedAlternativeAuthentication.equals(
+ SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS
+ );
if (unauthenticated) {
- config.setUnauthenticated(true);
+ config.setIdentityConfig(new AnonymousIdentityConfig());
} else {
String username = extractor.singleValueParameter(USERNAME.name(),
String.class);
String password = extractor.secretValue(PASSWORD.name());
-
- config.setUsername(username);
- config.setPassword(password);
- config.setUnauthenticated(false);
+ config.setIdentityConfig(new UsernamePasswordIdentityConfig(username,
password));
}
return config;
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
new file mode 100644
index 0000000000..ecaad661df
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/AnonymousIdentityConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+
+public class AnonymousIdentityConfig implements IdentityConfig {
+
+ @Override
+ public void configureIdentity(OpcUaClientConfigBuilder builder) {
+ builder.setIdentityProvider(new AnonymousProvider());
+ }
+
+ @Override
+ public String toString() {
+ return "anonymous";
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
new file mode 100644
index 0000000000..db51fd3449
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/IdentityConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+
+public interface IdentityConfig {
+
+ void configureIdentity(OpcUaClientConfigBuilder builder);
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
new file mode 100644
index 0000000000..825fe1fd9a
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/identity/UsernamePasswordIdentityConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.identity;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+
+public class UsernamePasswordIdentityConfig implements IdentityConfig {
+
+ private final String username;
+ private final String password;
+
+ public UsernamePasswordIdentityConfig(String username, String password) {
+ this.username = username;
+ this.password = password;
+ }
+
+ @Override
+ public void configureIdentity(OpcUaClientConfigBuilder builder) {
+ builder.setIdentityProvider(new UsernameProvider(username, password));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s-%S", username, DigestUtils.sha256Hex(password));
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
new file mode 100644
index 0000000000..75955fcbd8
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/KeyStoreLoader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environment;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+
+public class KeyStoreLoader {
+
+ private static final String CLIENT_ALIAS = "apache-streampipes";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KeyStoreLoader.class);
+
+ private X509Certificate[] clientCertificateChain;
+ private X509Certificate clientCertificate;
+ private KeyPair clientKeyPair;
+
+ KeyStoreLoader load(Environment env,
+ Path securityDir) throws Exception {
+ var keyStore = KeyStore.getInstance("PKCS12");
+ var keyStoreFile = env.getOpcUaKeystoreFile().getValueOrDefault();
+ var keyStorePassword = env.getOpcUaKeystorePassword().getValueOrDefault();
+ Path serverKeyStore = securityDir.resolve(keyStoreFile);
+ char[] serverKeyStorePassword = keyStorePassword.toCharArray();
+
+ LOG.info("Loading KeyStore at {}", serverKeyStore);
+
+ try (InputStream in = Files.newInputStream(serverKeyStore)) {
+ keyStore.load(in, serverKeyStorePassword);
+ }
+
+ Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS,
serverKeyStorePassword);
+ if (clientPrivateKey instanceof PrivateKey) {
+ clientCertificate = (X509Certificate)
keyStore.getCertificate(CLIENT_ALIAS);
+
+ clientCertificateChain =
Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+ .map(X509Certificate.class::cast)
+ .toArray(X509Certificate[]::new);
+
+ PublicKey serverPublicKey = clientCertificate.getPublicKey();
+ clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey)
clientPrivateKey);
+ }
+
+ return this;
+ }
+
+ X509Certificate getClientCertificate() {
+ return clientCertificate;
+ }
+
+ public X509Certificate[] getClientCertificateChain() {
+ return clientCertificateChain;
+ }
+
+ KeyPair getClientKeyPair() {
+ return clientKeyPair;
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
new file mode 100644
index 0000000000..b64660ebe9
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/SecurityConfig.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.config.security;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import
org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class SecurityConfig {
+
+ private final MessageSecurityMode securityMode;
+ private final SecurityPolicy securityPolicy;
+
+ public SecurityConfig(MessageSecurityMode securityMode,
+ SecurityPolicy securityPolicy) {
+ this.securityMode = securityMode;
+ this.securityPolicy = securityPolicy;
+ }
+
+ public void configureSecurityPolicy(String opcServerUrl,
+ List<EndpointDescription> endpoints,
+ OpcUaClientConfigBuilder builder)
+ throws SpConfigurationException, URISyntaxException {
+ String host = opcServerUrl.split("://")[1].split(":")[0];
+
+ EndpointDescription tmpEndpoint = endpoints
+ .stream()
+ .filter(e -> e.getSecurityMode() == securityMode)
+ .filter(e -> e.getSecurityPolicyUri().equals(securityPolicy.getUri()))
+ .findFirst()
+ .orElseThrow(() ->
+ new SpConfigurationException("No endpoint available with security
mode {} and security policy {}")
+ );
+
+ tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
+
+ if (securityMode != MessageSecurityMode.None) {
+ try {
+ var env = Environments.getEnvironment();
+ var securityDir =
Paths.get(env.getOpcUaSecurityDir().getValueOrDefault());
+ var trustListManager = new
DefaultTrustListManager(securityDir.resolve("pki").toFile());
+
+ var certificateValidator = new
DefaultClientCertificateValidator(trustListManager);
+ var loader = new KeyStoreLoader().load(env, securityDir);
+ builder.setKeyPair(loader.getClientKeyPair());
+ builder.setCertificate(loader.getClientCertificate());
+ builder.setCertificateChain(loader.getClientCertificateChain());
+ builder.setCertificateValidator(certificateValidator);
+ } catch (Exception e) {
+ throw new SpConfigurationException(
+ "Failed to load keystore - check that all required environment
variables "
+ + "are defined and the keystore exists",
+ e
+ );
+ }
+ }
+
+ builder.setEndpoint(tmpEndpoint);
+ }
+
+ private EndpointDescription updateEndpointUrl(EndpointDescription original,
+ String hostname) throws
URISyntaxException {
+
+ URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
+
+ String endpointUrl = String.format("%s://%s:%s%s", uri.getScheme(),
hostname, uri.getPort(), uri.getPath());
+
+ return new EndpointDescription(
+ endpointUrl,
+ original.getServer(),
+ original.getServerCertificate(),
+ original.getSecurityMode(),
+ original.getSecurityPolicyUri(),
+ original.getUserIdentityTokens(),
+ original.getTransportProfileUri(),
+ original.getSecurityLevel());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s-%s", securityMode, securityPolicy);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
new file mode 100644
index 0000000000..d4311bf6d5
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV4.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import org.apache.streampipes.extensions.api.extractor.IParameterExtractor;
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import org.apache.streampipes.extensions.connectors.opcua.utils.SecurityUtils;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+import java.util.List;
+
+import static
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_MODE;
+import static
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.SECURITY_POLICY;
+import static
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION;
+import static
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration.USER_AUTHENTICATION_ANONYMOUS;
+import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
+import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
+import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
+
+public class OpcUaAdapterMigrationV4 implements IAdapterMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ OpcUaAdapter.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 3,
+ 4
+ );
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription
element,
+ IStaticPropertyExtractor
extractor) throws RuntimeException {
+ var config = element.getConfig();
+ element.setConfig(migrate(config, extractor));
+
+ return MigrationResult.success(element);
+ }
+
+ public List<StaticProperty> migrate(List<StaticProperty> staticProperties,
+ IParameterExtractor extractor) {
+ var securityMode =
+ StaticProperties.singleValueSelection(
+ Labels.withId(SECURITY_MODE),
+ SecurityUtils.getAvailableSecurityModes().stream().map(mode -> new
Option(mode.k, mode.v)).toList()
+ );
+ securityMode.getOptions().get(0).setSelected(true);
+
+ var securityPolicy = StaticProperties.singleValueSelection(
+ Labels.withId(SECURITY_POLICY),
+ SecurityUtils.getAvailableSecurityPolicies().stream().map(p -> new
Option(p.name())).toList()
+ );
+ securityPolicy.getOptions().get(0).setSelected(true);
+
+ boolean anonymous = true;
+ var currentAuthSettings = extractor.selectedAlternativeInternalId(
+ "ACCESS_MODE"
+ );
+ if (currentAuthSettings.equals("USERNAME_GROUP")) {
+ anonymous = false;
+ }
+ var authentication =
StaticProperties.alternatives(Labels.withId(USER_AUTHENTICATION),
+ Alternatives.from(Labels.withId(USER_AUTHENTICATION_ANONYMOUS)),
+ Alternatives.from(Labels.withId(USERNAME_GROUP),
+ StaticProperties.group(
+ Labels.withId(USERNAME_GROUP),
+ StaticProperties.stringFreeTextProperty(
+ Labels.withId(USERNAME)),
+ StaticProperties.secretValue(Labels.withId(PASSWORD))
+ ))
+ );
+ if (anonymous) {
+ authentication.getAlternatives().get(0).setSelected(true);
+ } else {
+ authentication.getAlternatives().get(1).setSelected(true);
+ var username = extractor.singleValueParameter("USERNAME", String.class);
+ var password = extractor.secretValue("PASSWORD");
+ var group = (StaticPropertyGroup)
authentication.getAlternatives().get(1).getStaticProperty();
+ ((FreeTextStaticProperty)
group.getStaticProperties().get(0)).setValue(username);
+ ((SecretStaticProperty)
group.getStaticProperties().get(1)).setValue(password);
+ ((SecretStaticProperty)
group.getStaticProperties().get(1)).setEncrypted(false);
+ }
+
+ // remove old authentication property, add new properties for
securityMode, policy and authentication options
+ staticProperties.remove(1);
+ staticProperties.add(1, securityMode);
+ staticProperties.add(2, securityPolicy);
+ staticProperties.add(3, authentication);
+
+ return staticProperties;
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
new file mode 100644
index 0000000000..bc24f4d88d
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaSinkMigrationV1.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+
+public class OpcUaSinkMigrationV1 implements IDataSinkMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ OpcUaSink.ID,
+ SpServiceTagPrefix.DATA_SINK,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation
element,
+
IDataSinkParameterExtractor extractor) throws RuntimeException {
+ var config = element.getStaticProperties();
+ var migratedConfigs = new OpcUaAdapterMigrationV4().migrate(config,
extractor);
+ element.setStaticProperties(migratedConfigs);
+ return MigrationResult.success(element);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
index 78babe0027..e5c9485bf7 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUa.java
@@ -19,12 +19,13 @@
package org.apache.streampipes.extensions.connectors.opcua.sink;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import
org.apache.streampipes.extensions.connectors.opcua.client.ConnectedOpcUaClient;
+import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.PrimitiveField;
import org.apache.streampipes.vocabulary.XSD;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
@@ -42,7 +43,8 @@ public class OpcUa {
private static final Logger LOG = LoggerFactory.getLogger(OpcUa.class);
- private OpcUaClient opcUaClient;
+ private ConnectedOpcUaClient connectedClient;
+ private final OpcUaConfig opcUaConfig;
private OpcUaParameters params;
private NodeId node;
@@ -73,24 +75,30 @@ public class OpcUa {
compatibleDataTypes.put(String.class, new Class[]{String.class});
}
- public void onInvocation(OpcUaParameters params) throws
+ private final OpcUaClientProvider clientProvider;
+
+ public OpcUa(OpcUaClientProvider clientProvider,
+ OpcUaParameters params) {
+ this.clientProvider = clientProvider;
+ this.opcUaConfig = params.config();
+ }
+
+ public void onInvocation() throws
SpRuntimeException {
try {
- this.params = params;
- this.node = NodeId.parse(params.getSelectedNode());
- opcUaClient = new SpOpcUaClient<>(params.getConfig()).getClient();
- opcUaClient.connect().get();
+ this.node = NodeId.parse(params.selectedNode());
+ this.connectedClient = clientProvider.getClient(opcUaConfig);
} catch (Exception e) {
- throw new SpRuntimeException("Could not connect to OPC-UA server: " +
params.getConfig().getOpcServerURL());
+ throw new SpRuntimeException("Could not connect to OPC-UA server: " +
params.config().getOpcServerURL());
}
// check whether input data type and target data type are compatible
try {
- Variant value =
opcUaClient.getAddressSpace().getVariableNode(node).readValue().getValue();
+ Variant value =
this.connectedClient.getClient().getAddressSpace().getVariableNode(node).readValue().getValue();
targetDataType = value.getValue().getClass();
- sourceDataType = XSDMatchings.get(params.getMappingPropertyType());
+ sourceDataType = XSDMatchings.get(params.mappingPropertyType());
if (!sourceDataType.equals(targetDataType)) {
if
(Arrays.stream(compatibleDataTypes.get(sourceDataType)).noneMatch(dt ->
dt.equals(targetDataType))) {
throw new SpRuntimeException("Data Type of event of target node are
not compatible");
@@ -107,40 +115,45 @@ public class OpcUa {
Variant v = getValue(inputEvent);
if (v == null) {
- LOG.error("Mapping property type: " +
this.params.getMappingPropertyType() + " is not supported");
+ LOG.error("Mapping property type: " + this.params.mappingPropertyType()
+ " is not supported");
} else {
DataValue value = new DataValue(v);
- CompletableFuture<StatusCode> f = opcUaClient.writeValue(node, value);
+ CompletableFuture<StatusCode> f =
this.connectedClient.getClient().writeValue(node, value);
try {
StatusCode status = f.get();
if (status.isBad()) {
if (status.getValue() == 0x80740000L) {
- LOG.error("Type missmatch! Tried to write value of type: " +
this.params.getMappingPropertyType()
+ LOG.error("Type missmatch! Tried to write value of type {} ",
this.params.mappingPropertyType()
+ " but server did not accept this");
} else if (status.getValue() == 0x803B0000L) {
LOG.error("Wrong access level. Not allowed to write to nodes");
}
LOG.error(
- "Value: " + value.getValue().toString() + " could not be written
to node Id: "
- + node.getIdentifier() + " on " + "OPC-UA server: " +
params.getConfig().getOpcServerURL());
+ "Value: {} could not be written to node Id {} on OPC-UA server
{}",
+ value.getValue().toString(),
+ node.getIdentifier(),
+ params.config().getOpcServerURL());
}
} catch (InterruptedException | ExecutionException e) {
- LOG.error("Exception: Value: " + value.getValue().toString() + " could
not be written to node Id: "
- + node.getIdentifier() + " on " + "OPC-UA server: " +
params.getConfig().getOpcServerURL());
+ LOG.error(
+ "Exception: Value {} could not be written to node Id {} on OPC_UA
server {}",
+ value.getValue().toString(),
+ node.getIdentifier(),
+ params.config().getOpcServerURL());
}
}
}
public void onDetach() throws SpRuntimeException {
- opcUaClient.disconnect();
+ clientProvider.releaseClient(opcUaConfig);
}
private Variant getValue(Event inputEvent) {
Variant result = null;
PrimitiveField propertyPrimitive =
-
inputEvent.getFieldBySelector(this.params.getMappingPropertySelector()).getAsPrimitive();
+
inputEvent.getFieldBySelector(this.params.mappingPropertySelector()).getAsPrimitive();
if (targetDataType.equals(Integer.class)) {
result = new Variant(propertyPrimitive.getAsInt());
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
index 15906af4aa..438bf154e8 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaParameters.java
@@ -20,36 +20,8 @@ package
org.apache.streampipes.extensions.connectors.opcua.sink;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
-public final class OpcUaParameters {
- private final String selectedNode;
- private final String mappingPropertySelector;
- private final String mappingPropertyType;
-
- private final OpcUaConfig config;
-
- public OpcUaParameters(OpcUaConfig config,
- String mappingPropertySelector,
- String mappingPropertyType,
- String selectedNode) {
- this.config = config;
- this.mappingPropertySelector = mappingPropertySelector;
- this.mappingPropertyType = mappingPropertyType;
- this.selectedNode = selectedNode;
- }
-
- public String getSelectedNode() {
- return selectedNode;
- }
-
- public String getMappingPropertySelector() {
- return mappingPropertySelector;
- }
-
- public String getMappingPropertyType() {
- return mappingPropertyType;
- }
-
- public OpcUaConfig getConfig() {
- return config;
- }
+public record OpcUaParameters(OpcUaConfig config,
+ String mappingPropertySelector,
+ String mappingPropertyType,
+ String selectedNode) {
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
index 50a6069cdf..b81f20d49c 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
@@ -26,6 +26,7 @@ import
org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
+import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
@@ -45,11 +46,18 @@ import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
public class OpcUaSink implements IStreamPipesDataSink, SupportsRuntimeConfig {
+ public static final String ID =
"org.apache.streampipes.sinks.databases.jvm.opcua";
+
private OpcUa opcUa;
+ private final OpcUaClientProvider clientProvider;
+
+ public OpcUaSink(OpcUaClientProvider clientProvider) {
+ this.clientProvider = clientProvider;
+ }
@Override
public IDataSinkConfiguration declareConfig() {
- var builder =
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.opcua", 0)
+ var builder = DataSinkBuilder.create(ID, 0)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.category(DataSinkType.FORWARD)
@@ -62,7 +70,7 @@ public class OpcUaSink implements IStreamPipesDataSink,
SupportsRuntimeConfig {
SharedUserConfiguration.appendSharedOpcUaConfig(builder, false);
return DataSinkConfiguration.create(
- OpcUaSink::new,
+ () -> new OpcUaSink(clientProvider),
builder.build()
);
}
@@ -89,8 +97,8 @@ public class OpcUaSink implements IStreamPipesDataSink,
SupportsRuntimeConfig {
config.getSelectedNodeNames().get(0)
);
- this.opcUa = new OpcUa();
- this.opcUa.onInvocation(params);
+ this.opcUa = new OpcUa(clientProvider, params);
+ this.opcUa.onInvocation();
}
@Override
@@ -106,6 +114,6 @@ public class OpcUaSink implements IStreamPipesDataSink,
SupportsRuntimeConfig {
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor
extractor) throws SpConfigurationException {
- return OpcUaUtil.resolveConfig(staticPropertyInternalName, extractor);
+ return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName,
extractor);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
index aa49a7a5f8..8cd76401f9 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
@@ -25,8 +25,9 @@ import
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtracto
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
import
org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaNodeBrowser;
-import org.apache.streampipes.extensions.connectors.opcua.client.SpOpcUaClient;
+import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+import
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
import org.apache.streampipes.extensions.connectors.opcua.model.OpcNode;
import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
@@ -37,7 +38,7 @@ import
org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputSta
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
@@ -78,7 +79,8 @@ public class OpcUaUtil {
* @throws AdapterException
* @throws ParseException
*/
- public static GuessSchema getSchema(IAdapterParameterExtractor extractor)
+ public static GuessSchema getSchema(OpcUaClientProvider clientProvider,
+ IAdapterParameterExtractor extractor)
throws AdapterException, ParseException {
var builder = GuessSchemaBuilder.create();
EventSchema eventSchema = new EventSchema();
@@ -86,14 +88,13 @@ public class OpcUaUtil {
Map<String, FieldStatusInfo> fieldStatusInfos = new HashMap<>();
List<EventProperty> allProperties = new ArrayList<>();
- SpOpcUaClient<OpcUaConfig> spOpcUaClient = new SpOpcUaClient<>(
-
SpOpcUaConfigExtractor.extractSharedConfig(extractor.getStaticPropertyExtractor(),
new OpcUaConfig())
+ var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(
+ extractor.getStaticPropertyExtractor(), new OpcUaConfig()
);
-
try {
- spOpcUaClient.connect();
+ var connectedClient = clientProvider.getClient(opcUaConfig);
OpcUaNodeBrowser nodeBrowser =
- new OpcUaNodeBrowser(spOpcUaClient.getClient(),
spOpcUaClient.getSpOpcConfig());
+ new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
List<OpcNode> selectedNodes = nodeBrowser.findNodes();
if (!selectedNodes.isEmpty()) {
@@ -116,12 +117,12 @@ public class OpcUaUtil {
var nodeIds = selectedNodes.stream()
.map(OpcNode::getNodeId)
.collect(Collectors.toList());
- var response = spOpcUaClient.getClient()
+ var response = connectedClient.getClient()
.readValues(0, TimestampsToReturn.Both,
nodeIds);
var returnValues = response.get();
- spOpcUaClient.disconnect();
+ //clientProvider.releaseClient(opcUaConfig);
makeEventPreview(selectedNodes, eventPreview, fieldStatusInfos,
returnValues);
@@ -129,7 +130,9 @@ public class OpcUaUtil {
} catch (Exception e) {
throw new AdapterException("Could not guess schema for opc node: " +
e.getMessage(), e);
} finally {
- spOpcUaClient.disconnect();
+ // TODO
+ //spOpcUaClient.disconnect();
+ clientProvider.releaseClient(opcUaConfig);
}
eventSchema.setEventProperties(allProperties);
@@ -172,7 +175,8 @@ public class OpcUaUtil {
* @param parameterExtractor to extract parameters from the OPC UA config
* @return {@code List<Option>} with available node names for the given OPC
UA configuration
*/
- public static RuntimeResolvableTreeInputStaticProperty resolveConfig(String
internalName,
+ public static RuntimeResolvableTreeInputStaticProperty
resolveConfig(OpcUaClientProvider clientProvider,
+ String
internalName,
IStaticPropertyExtractor parameterExtractor)
throws SpConfigurationException {
@@ -181,19 +185,18 @@ public class OpcUaUtil {
// access mode and host/url have to be selected
try {
parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
-
parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
+
parameterExtractor.selectedSingleValueInternalName(SharedUserConfiguration.SECURITY_MODE,
String.class);
+
parameterExtractor.selectedSingleValue(SharedUserConfiguration.SECURITY_POLICY,
String.class);
} catch (NullPointerException nullPointerException) {
return config;
}
- SpOpcUaClient spOpcUaClient = new SpOpcUaClient(
- SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new
OpcUaConfig())
- );
+ var opcUaConfig =
SpOpcUaConfigExtractor.extractSharedConfig(parameterExtractor, new
OpcUaConfig());
try {
- spOpcUaClient.connect();
+ var connectedClient = clientProvider.getClient(opcUaConfig);
OpcUaNodeBrowser nodeBrowser =
- new OpcUaNodeBrowser(spOpcUaClient.getClient(),
spOpcUaClient.getSpOpcConfig());
+ new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
var nodes =
nodeBrowser.buildNodeTreeFromOrigin(config.getNextBaseNodeToResolve());
if (Objects.isNull(config.getNextBaseNodeToResolve())) {
@@ -204,7 +207,7 @@ public class OpcUaUtil {
if (!config.getSelectedNodesInternalNames().isEmpty()) {
config.setSelectedNodesInternalNames(
- filterMissingNodes(spOpcUaClient.getClient(),
config.getSelectedNodesInternalNames())
+ filterMissingNodes(connectedClient.getClient(),
config.getSelectedNodesInternalNames())
);
}
@@ -215,13 +218,15 @@ public class OpcUaUtil {
} catch (ExecutionException | InterruptedException | URISyntaxException e)
{
throw new SpConfigurationException("Could not connect to the OPC UA
server with the provided settings", e);
} finally {
- if (spOpcUaClient.getClient() != null) {
- spOpcUaClient.disconnect();
- }
+ clientProvider.releaseClient(opcUaConfig);
+ // TODO
+// if (spOpcUaClient.getClient() != null) {
+// spOpcUaClient.disconnect();
+// }
}
}
- public static List<String> filterMissingNodes(OpcUaClient opcUaClient,
+ public static List<String> filterMissingNodes(UaClient opcUaClient,
List<String> selectedNodes) {
return selectedNodes.stream().filter(selectedNode -> {
try {
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
new file mode 100644
index 0000000000..576f3eeb6f
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/SecurityUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.utils;
+
+import org.apache.streampipes.model.Tuple2;
+
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
+
+import java.util.List;
+
+public class SecurityUtils {
+
+ public static List<Tuple2<String, String>> getAvailableSecurityModes() {
+ return List.of(
+ new Tuple2<>("None", MessageSecurityMode.None.name()),
+ new Tuple2<>("Sign", MessageSecurityMode.Sign.name()),
+ new Tuple2<>("Sign & Encrypt",
MessageSecurityMode.SignAndEncrypt.name())
+ );
+ }
+
+ public static List<SecurityPolicy> getAvailableSecurityPolicies() {
+ return List.of(
+ SecurityPolicy.None,
+ SecurityPolicy.Basic128Rsa15,
+ SecurityPolicy.Basic256,
+ SecurityPolicy.Basic256Sha256,
+ SecurityPolicy.Aes128_Sha256_RsaOaep,
+ SecurityPolicy.Aes256_Sha256_RsaPss
+ );
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
index 079243ab0f..185776e43a 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/documentation.md
@@ -26,7 +26,40 @@
## Description
-Reads values from an OPC-UA server repeatedly
+This adapter reads node values from an OPC-UA server.
+The adapter supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from
the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode,
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted
certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of
type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify
itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client
Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+├─ pki/
+│ ├─ issuers/
+│ ├─ rejected/
+│ ├─ trusted/
+│ │ ├─ certs/
+│ │ ├─ crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
***
@@ -40,7 +73,15 @@ Reads values from an OPC-UA server repeatedly
Duration of the polling interval in seconds
-### Anonymous vs. Username/Password
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
Choose whether you want to connect anonymously or authenticate using your
credentials.
@@ -54,14 +95,6 @@ Where can the OPC UA server be found?
**URL**: Specify the server's full `URL` (including
port), can be with our without leading `opc.tcp://`<br/>
**Host/Port**: Insert the `host` address (with or
without leading `opc.tcp://`) and the `port`<br/>
-### Namespace Index
-
-Requires the index of the namespace you want to connect to.
-
-### Node ID
-
-The identifier of the node you want to read from, numbers and strings are both
valid.
-
### Available Nodes
Shows all available nodes once namespace index and node ID are given.
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
index 995045ec7a..15f54c5768 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
@@ -38,10 +38,22 @@ OPC_SERVER_HOST.description=Example: test-server.com,
opc.tcp://test-server.com)
OPC_SERVER_PORT.title=Port
OPC_SERVER_PORT.description=Example: 4840
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if
security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
+
ACCESS_MODE.title=Security Level
ACCESS_MODE.description=Select the OPC UA security level for the connection
-USERNAME_GROUP.title=Sign (username & password)
+USERNAME_GROUP.title=Username & Password
USERNAME_GROUP.description=
UNAUTHENTICATED.title=None
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
index 27221467c9..ee28512b27 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/documentation.md
@@ -26,7 +26,40 @@
## Description
-Allows to write events to an OPC-UA server.
+This data sink can be used to write values to an OPC-UA server.
+The sink supports both signed/encrypted and unencrypted communication.
+
+Certificates must be provided directly to the service and cannot be added from
the UI or REST APIs.
+To establish connections using a `Sign` or `Sign & Encrypt` security mode,
+the following environment variables must be provided to the extension service:
+
+* SP_OPCUA_SECURITY_DIR the directory where the keystore and trusted
certificates are located
+* SP_OPCUA_KEYSTORE_FILE the keystore file (e.g., keystore.pfx, must be of
type PKCS12)
+* SP_OPCUA_KEYSTORE_PASSWORD the password to the keystore
+* SP_OPCUA_APPLICATION_URI the application URI used by the client to identify
itself
+
+Certificate requirements:
+
+The X509 certificate must provide the following extras:
+* Key Usage: Certificate Sign
+* Subject Alternative Name: Application URI
+* Basic Constraints: Must provide CA:FALSE when using a self-signed certificate
+* Extended Key Usage: TLS Web Server Authentication, TLS Web Client
Authentication
+
+The directory layout of the `SP_OPCUA_SECURITY_DIR` look as follows:
+
+```
+SP_OPC_SECURITY_DIR/
+├─ pki/
+│ ├─ issuers/
+│ ├─ rejected/
+│ ├─ trusted/
+│ │ ├─ certs/
+│ │ ├─ crl/
+```
+
+Trusted certs need to be present in the `pki/trusted/certs` folder.
+Rejected certificates are stored in the `rejected` folder.
***
@@ -46,6 +79,21 @@ The hostname of the OPC-UA server.
The port of the OPC-UA server.
+### Security Mode
+
+Can be either None, Signed or Signed & Encrypt
+
+### Security Policy
+
+Choose one of the OPC-UA security policies or `None`
+
+### User Authentication
+
+Choose whether you want to connect anonymously or authenticate using your
credentials.
+
+ **Anonymous**: No further information required <br/>
+ **Username/Password**: Insert your `username` and
`password` to access the OPC UA server
+
### Namespace Index
The namespace index in which the node should be written
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
index 7e78d16a1a..b26c09c598 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.sinks.databases.jvm.opcua/strings.en
@@ -56,3 +56,15 @@ AVAILABLE_NODES.description=Select the nodes that are
relevant for you. Please e
MAPPING_PROPERY.title=Field to write
MAPPING_PROPERY.description=The field that should be written to the OPC-UA
server
+
+securityMode.title=Security Mode
+securityMode.description=The OPC-UA security mode
+
+securityPolicy.title=Security Policy
+securityPolicy.description=The OPC-UA security policy. Choose "None" if
security mode is "None"
+
+userAuthentication.title=User Authentication
+userAuthentication.description=Choose an authentication method for the user
+
+anonymous.title=Anonymous
+anonymous.description=
diff --git
a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
index c7c1a4bfe7..8823d519b3 100644
---
a/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
+++
b/ui/src/app/data-explorer/components/data-view/data-view-designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
@@ -83,7 +83,11 @@ export class DataExplorerWidgetDataSettingsComponent
implements OnInit {
this.dataExplorerService.getAllPersistedDataStreams(),
this.datalakeRestService.getAllMeasurementSeries(),
).subscribe(response => {
- this.availablePipelines = response[0];
+ this.availablePipelines = response[0].filter(
+ p =>
+ response[1].find(m => m.measureName === p.measureName) !==
+ undefined,
+ );
this.availableMeasurements = response[1];
// replace pipeline event schemas. Reason: Available measures do
not contain field for timestamp