This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 3579-support-extensionobjects-in-opc-ua-adapter
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3579-support-extensionobjects-in-opc-ua-adapter by this push:
new 3776c752da feat(#3579): Add support for OPC-UA extension objects
3776c752da is described below
commit 3776c752dab27b0643a1854e39c5e699263e9a40
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Apr 24 22:06:59 2025 +0200
feat(#3579): Add support for OPC-UA extension objects
---
pom.xml | 11 +-
.../streampipes-connectors-opcua/pom.xml | 4 +
.../connectors/opcua/adapter/OpcUaAdapter.java | 55 ++++-----
.../connectors/opcua/adapter/OpcUaNodeBrowser.java | 43 +++----
.../opcua/adapter/OpcUaNodeProvider.java} | 33 +++---
.../opcua/adapter/OpcUaSchemaProvider.java | 129 +++++++++++++++++++++
.../opcua/client/ConnectedOpcUaClient.java | 8 +-
.../connectors/opcua/client/SpOpcUaClient.java | 16 +--
.../opcua/config/SpOpcUaConfigExtractor.java | 6 +-
.../opcua/model/BasicVariableNodeInfo.java} | 32 ++---
.../opcua/model/ExtensionObjectOpcUaNode.java | 108 +++++++++++++++++
.../extensions/connectors/opcua/model/OpcNode.java | 129 ---------------------
.../connectors/opcua/model/OpcUaNode.java | 48 ++++++++
.../connectors/opcua/model/OpcUaNodeFactory.java} | 28 +++--
.../connectors/opcua/model/PrimitiveOpcUaNode.java | 89 ++++++++++++++
.../connectors/opcua/sink/OpcUaSink.java | 4 +-
.../connectors/opcua/utils/OpcUaTypes.java | 31 +++++
.../utils/{OpcUaUtil.java => OpcUaUtils.java} | 120 +------------------
.../connectors/opcua/utils/OpcUaUtilTest.java | 6 +-
.../manager/setup/CouchDbInstallationStep.java | 1 -
.../streampipes/storage/couchdb/utils/Utils.java | 4 -
21 files changed, 524 insertions(+), 381 deletions(-)
diff --git a/pom.xml b/pom.xml
index 148d28b9ba..6eb14421ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
<commons-text.version>1.12.0</commons-text.version>
<commoms-collection4.version>4.4</commoms-collection4.version>
<ditto-client.version>1.0.0</ditto-client.version>
- <eclipse.milo.version>0.6.14</eclipse.milo.version>
+ <eclipse.milo.version>0.6.16</eclipse.milo.version>
<error-prone.version>2.10.0</error-prone.version>
<file-management.version>3.1.0</file-management.version>
<flink.version>1.13.5</flink.version>
@@ -72,8 +72,8 @@
<influxdb.version>2.24</influxdb.version>
<inlong.version>1.13.0</inlong.version>
<iotdb.version>1.3.0</iotdb.version>
- <jackson.version>2.17.0</jackson.version>
- <jackson.databind.version>2.17.0</jackson.databind.version>
+ <jackson.version>2.18.3</jackson.version>
+ <jackson.databind.version>2.18.3</jackson.databind.version>
<jakarta-annotation.version>3.0.0</jakarta-annotation.version>
<jakarta-activation-api.version>2.1.3</jakarta-activation-api.version>
<jakarta-inject-api.version>2.0.1</jakarta-inject-api.version>
@@ -725,6 +725,11 @@
<artifactId>sdk-client</artifactId>
<version>${eclipse.milo.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.milo</groupId>
+ <artifactId>dictionary-reader</artifactId>
+ <version>${eclipse.milo.version}</version>
+ </dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-sse</artifactId>
diff --git a/streampipes-extensions/streampipes-connectors-opcua/pom.xml
b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
index 80e07326c2..d7cdc2e3ae 100644
--- a/streampipes-extensions/streampipes-connectors-opcua/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-opcua/pom.xml
@@ -54,6 +54,10 @@
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.milo</groupId>
+ <artifactId>dictionary-reader</artifactId>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
index 526ff7bb24..c75b4ba439 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
@@ -34,8 +34,8 @@ import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProv
import
org.apache.streampipes.extensions.connectors.opcua.config.OpcUaAdapterConfig;
import
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
-import org.apache.streampipes.extensions.connectors.opcua.model.OpcNode;
-import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.extensions.connectors.opcua.model.OpcUaNode;
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtils;
import
org.apache.streampipes.extensions.management.connect.PullAdapterScheduler;
import
org.apache.streampipes.extensions.management.connect.adapter.util.PollingSettings;
import org.apache.streampipes.model.AdapterType;
@@ -50,13 +50,11 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -68,7 +66,6 @@ import java.util.stream.Collectors;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULL_MODE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.SUBSCRIPTION_MODE;
-import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil.getSchema;
public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter,
SupportsRuntimeConfig {
@@ -80,9 +77,8 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
private final OpcUaClientProvider clientProvider;
private ConnectedOpcUaClient connectedClient;
private OpcUaAdapterConfig opcUaAdapterConfig;
- private List<OpcNode> allNodes;
- private List<NodeId> allNodeIds;
- private int numberProperties;
+ private OpcUaNodeProvider nodeProvider;
+ private List<OpcUaNode> allNodes;
private final Map<String, Object> event;
private IEventCollector collector;
@@ -95,13 +91,11 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
public OpcUaAdapter(OpcUaClientProvider clientProvider) {
this.clientProvider = clientProvider;
- this.numberProperties = 0;
this.event = new HashMap<>();
this.nodeIdToLabelMapping = new HashMap<>();
}
private void prepareAdapter(IAdapterParameterExtractor extractor) throws
AdapterException {
- this.allNodeIds = new ArrayList<>();
List<String> deleteKeys = extractor
.getAdapterDescription()
.getSchemaRules()
@@ -114,21 +108,19 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
OpcUaNodeBrowser browserClient =
new OpcUaNodeBrowser(this.connectedClient.getClient(),
this.opcUaAdapterConfig);
- this.allNodes = browserClient.findNodes(deleteKeys);
-
-
- for (OpcNode node : this.allNodes) {
- this.allNodeIds.add(node.getNodeId());
- }
+ this.nodeProvider = browserClient.findNodes(deleteKeys);
+ this.allNodes = nodeProvider.getNodes();
if (opcUaAdapterConfig.inPullMode()) {
this.pullingIntervalMilliSeconds =
opcUaAdapterConfig.getPullIntervalMilliSeconds();
} else {
- this.numberProperties = this.allNodeIds.size();
- this.connectedClient.createListSubscription(this.allNodeIds, this);
+ var allNodeIds = this.allNodes.stream()
+ .map(node -> node.nodeInfo().getNodeId()).toList();
+ this.connectedClient.createListSubscription(allNodeIds, this);
}
- this.allNodes.forEach(node ->
this.nodeIdToLabelMapping.put(node.getNodeId().toString(), node.getLabel()));
+ this.allNodes.forEach(node -> this.nodeIdToLabelMapping
+ .put(node.nodeInfo().getNodeId().toString(),
node.nodeInfo().getDisplayName()));
} catch (Exception e) {
@@ -139,7 +131,10 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public void pullData() throws ExecutionException, RuntimeException,
InterruptedException, TimeoutException {
var response =
- this.connectedClient.getClient().readValues(0,
TimestampsToReturn.Both, this.allNodeIds);
+ this.connectedClient.getClient().readValues(
+ 0,
+ TimestampsToReturn.Both,
+ this.allNodes.stream().map(o ->
o.nodeInfo().getNodeId()).toList());
boolean badStatusCodeReceived = false;
boolean emptyValueReceived = false;
List<DataValue> returnValues =
@@ -151,13 +146,13 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
for (int i = 0; i < returnValues.size(); i++) {
var status = returnValues.get(i).getStatusCode();
if (StatusCode.GOOD.equals(status)) {
- Object value = returnValues.get(i).getValue().getValue();
- this.event.put(this.allNodes.get(i).getLabel(), value);
+ var value = returnValues.get(i).getValue();
+ this.allNodes.get(i).addToEvent(connectedClient.getClient(),
this.event, value);
} else {
badStatusCodeReceived = true;
LOG.warn("Received status code {} for node label: {}",
status,
- this.allNodes.get(i).getLabel());
+ this.allNodes.get(i).nodeInfo().getDisplayName());
}
}
}
@@ -177,16 +172,15 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
String key =
this.nodeIdToLabelMapping.get(item.getReadValueId().getNodeId().toString());
- OpcNode currNode = this.allNodes.stream()
- .filter(node -> key.equals(node.getLabel()))
+ var currNode = this.allNodes.stream()
+ .filter(node -> key.equals(node.nodeInfo().getDisplayName()))
.findFirst()
.orElse(null);
if (currNode != null) {
- event.put(currNode.getLabel(), value.getValue().getValue());
-
+ currNode.addToEvent(connectedClient.getClient(), event,
value.getValue());
// ensure that event is complete and all opc ua subscriptions
transmitted at least one value
- if (event.keySet().size() >= this.numberProperties) {
+ if (event.size() >= nodeProvider.getNumberOfEventProperties()) {
Map<String, Object> newEvent = new HashMap<>();
// deep copy of event to prevent preprocessor error
for (String k : event.keySet()) {
@@ -210,7 +204,6 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
this.opcUaAdapterConfig =
SpOpcUaConfigExtractor.extractAdapterConfig(extractor.getStaticPropertyExtractor());
- //this.connectedClient = clientProvider.getClient(this.opcUaAdapterConfig);
this.collector = collector;
this.prepareAdapter(extractor);
@@ -233,7 +226,7 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor
extractor) throws SpConfigurationException {
- return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName,
extractor);
+ return OpcUaUtils.resolveConfig(clientProvider,
staticPropertyInternalName, extractor);
}
@Override
@@ -255,6 +248,6 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
IAdapterGuessSchemaContext
adapterGuessSchemaContext) throws AdapterException {
- return getSchema(clientProvider, extractor);
+ return new OpcUaSchemaProvider().getSchema(clientProvider, extractor);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
index 2a150fa355..c6fe76e28b 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeBrowser.java
@@ -19,13 +19,15 @@
package org.apache.streampipes.extensions.connectors.opcua.adapter;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
-import org.apache.streampipes.extensions.connectors.opcua.model.OpcNode;
-import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaTypes;
+import
org.apache.streampipes.extensions.connectors.opcua.model.BasicVariableNodeInfo;
+import org.apache.streampipes.extensions.connectors.opcua.model.OpcUaNode;
+import
org.apache.streampipes.extensions.connectors.opcua.model.OpcUaNodeFactory;
import org.apache.streampipes.model.staticproperty.TreeInputNode;
import org.eclipse.milo.opcua.sdk.client.AddressSpace;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.UaClient;
+import
org.eclipse.milo.opcua.sdk.client.model.nodes.variables.BaseDataVariableTypeNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.Identifiers;
@@ -33,7 +35,6 @@ import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.UaRuntimeException;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
-import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,35 +47,26 @@ import java.util.stream.Collectors;
public class OpcUaNodeBrowser {
- private final UaClient client;
+ private final OpcUaClient client;
private final OpcUaConfig spOpcConfig;
private static final Logger LOG =
LoggerFactory.getLogger(OpcUaNodeBrowser.class);
public OpcUaNodeBrowser(
- UaClient client,
+ OpcUaClient client,
OpcUaConfig spOpcUaClientConfig
) {
this.client = client;
this.spOpcConfig = spOpcUaClientConfig;
}
- public List<OpcNode> findNodes() throws UaException {
- var opcNodes = new ArrayList<OpcNode>();
+ public OpcUaNodeProvider findNodes(List<String> runtimeNameFilters) throws
UaException {
+ var opcNodes = new ArrayList<OpcUaNode>();
for (String selectedNodeName : this.spOpcConfig.getSelectedNodeNames()) {
- opcNodes.add(toOpcNode(selectedNodeName));
+ opcNodes.add(toOpcNode(selectedNodeName, runtimeNameFilters));
}
- return opcNodes;
- }
-
- public List<OpcNode> findNodes(List<String> runtimeNameFilters) throws
UaException {
- return findNodes()
- .stream()
- .filter(node -> runtimeNameFilters
- .stream()
- .noneMatch(f -> f.equals(node.getLabel())))
- .collect(Collectors.toList());
+ return new OpcUaNodeProvider(opcNodes);
}
public List<TreeInputNode> buildNodeTreeFromOrigin(String
nextBaseNodeToResolve)
@@ -87,7 +79,8 @@ public class OpcUaNodeBrowser {
return findChildren(client, currentNodeId);
}
- private OpcNode toOpcNode(String nodeName) throws UaException {
+ private OpcUaNode toOpcNode(String nodeName,
+ List<String> runtimeNamesToDelete) throws
UaException {
AddressSpace addressSpace = getAddressSpace();
NodeId nodeId;
@@ -115,11 +108,9 @@ public class OpcUaNodeBrowser {
.toString()
);
- if (node instanceof UaVariableNode) {
- UInteger value = (UInteger) ((UaVariableNode) node).getDataType()
- .getIdentifier();
- return new OpcNode(node.getDisplayName()
- .getText(), OpcUaTypes.getType(value),
node.getNodeId());
+ if (node instanceof BaseDataVariableTypeNode) {
+ var nodeInfo = new BasicVariableNodeInfo((BaseDataVariableTypeNode)
node);
+ return OpcUaNodeFactory.createOpcUaNode(nodeInfo, runtimeNamesToDelete);
}
LOG.warn("Node {} not of type UaVariableNode", node.getDisplayName());
@@ -138,9 +129,9 @@ public class OpcUaNodeBrowser {
.map(node -> {
TreeInputNode childNode = new TreeInputNode();
childNode.setNodeName(node.getDisplayName()
- .getText());
+ .getText());
childNode.setInternalNodeName(node.getNodeId()
- .toParseableString());
+ .toParseableString());
childNode.setDataNode(isDataNode(node));
childNode.setNodeMetadata(new OpcUaNodeMetadataExtractor(client,
node).extract());
return childNode;
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeProvider.java
similarity index 54%
copy from
streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
copy to
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeProvider.java
index cd4a63bf48..3350494c11 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaNodeProvider.java
@@ -16,24 +16,29 @@
*
*/
-package org.apache.streampipes.extensions.connectors.opcua.utils;
+package org.apache.streampipes.extensions.connectors.opcua.adapter;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import org.apache.streampipes.extensions.connectors.opcua.model.OpcUaNode;
-public class OpcUaUtilTest {
+import java.util.List;
- private static final String SERVER_ADDRESS_WITH_OPC_PREFIX =
"opc.tcp://example.com";
+public class OpcUaNodeProvider {
- @Test
- public void testAddOpcPrefixIfNotExistsWithPrefix() {
- var result =
OpcUaUtil.addOpcPrefixIfNotExists(SERVER_ADDRESS_WITH_OPC_PREFIX);
- Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
+ private final List<OpcUaNode> opcUaNodes;
+
+ public OpcUaNodeProvider(List<OpcUaNode> opcUaNodes) {
+ this.opcUaNodes = opcUaNodes;
+ }
+
+ public List<OpcUaNode> getNodes() {
+ return opcUaNodes;
+ }
+
+ public int getNumberOfNodes() {
+ return opcUaNodes.size();
}
- @Test
- public void testAddOpcPrefixIfNotExistsNoPrefix() {
- var result = OpcUaUtil.addOpcPrefixIfNotExists("example.com");
- Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
+ public int getNumberOfEventProperties() {
+ return
opcUaNodes.stream().mapToInt(OpcUaNode::getNumberOfEventProperties).sum();
}
-}
\ No newline at end of file
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaSchemaProvider.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaSchemaProvider.java
new file mode 100644
index 0000000000..91fc016674
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaSchemaProvider.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.adapter;
+
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+import org.apache.streampipes.commons.exceptions.connect.ParseException;
+import
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
+import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
+import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+import
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
+import org.apache.streampipes.extensions.connectors.opcua.model.OpcUaNode;
+import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class OpcUaSchemaProvider {
+
+ /***
+ * OPC UA specific implementation of
+ * @param extractor
+ * @return guess schema
+ * @throws AdapterException
+ * @throws ParseException
+ */
+ public GuessSchema getSchema(OpcUaClientProvider clientProvider,
+ IAdapterParameterExtractor extractor)
+ throws AdapterException, ParseException {
+ var builder = GuessSchemaBuilder.create();
+ EventSchema eventSchema = new EventSchema();
+ Map<String, Object> eventPreview = new HashMap<>();
+ Map<String, FieldStatusInfo> fieldStatusInfos = new HashMap<>();
+ List<EventProperty> allProperties = new ArrayList<>();
+
+ var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(
+ extractor.getStaticPropertyExtractor(), new OpcUaConfig()
+ );
+ try {
+ var connectedClient = clientProvider.getClient(opcUaConfig);
+ OpcUaNodeBrowser nodeBrowser =
+ new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
+ var nodeProvider = nodeBrowser.findNodes(List.of());
+ var selectedNodes = nodeProvider.getNodes();
+
+ if (!selectedNodes.isEmpty()) {
+ for (OpcUaNode opcNode : selectedNodes) {
+ opcNode.addToSchema(connectedClient.getClient(), allProperties);
+ }
+ }
+
+ var nodeIds = selectedNodes.stream()
+ .map(node -> node.nodeInfo().getNodeId())
+ .collect(Collectors.toList());
+ var response = connectedClient.getClient()
+ .readValues(0, TimestampsToReturn.Both, nodeIds);
+
+ var returnValues = response.get();
+ makeEventPreview(connectedClient.getClient(), selectedNodes,
eventPreview, fieldStatusInfos, returnValues);
+
+
+ } catch (Exception e) {
+ throw new AdapterException("Could not guess schema for opc node: " +
e.getMessage(), e);
+ } finally {
+ clientProvider.releaseClient(opcUaConfig);
+ }
+
+ eventSchema.setEventProperties(allProperties);
+ builder.properties(allProperties);
+ builder.fieldStatusInfos(fieldStatusInfos);
+ builder.preview(eventPreview);
+
+ return builder.build();
+ }
+
+ private static void makeEventPreview(
+ OpcUaClient client,
+ List<OpcUaNode> selectedNodes,
+ Map<String, Object> eventPreview,
+ Map<String, FieldStatusInfo> fieldStatusInfos,
+ List<DataValue> dataValues
+ ) {
+
+ for (int i = 0; i < dataValues.size(); i++) {
+ var dv = dataValues.get(i);
+ var node = selectedNodes.get(i);
+ if (StatusCode.GOOD.equals(dv.getStatusCode())) {
+ var value = dv.getValue();
+ node.addToEventPreview(client, eventPreview, fieldStatusInfos, value,
FieldStatusInfo.good());
+ } else {
+ String additionalInfo = dv.getStatusCode() != null ? dv.getStatusCode()
+ .toString() : "Status code is null";
+ node.addToEventPreview(
+ client,
+ Map.of(),
+ fieldStatusInfos,
+ null,
+ FieldStatusInfo.bad(additionalInfo, false));
+ }
+ }
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
index d71d85646f..125f23b01d 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/ConnectedOpcUaClient.java
@@ -20,7 +20,7 @@ package
org.apache.streampipes.extensions.connectors.opcua.client;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
-import org.eclipse.milo.opcua.sdk.client.api.UaClient;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import
org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
@@ -48,10 +48,10 @@ import static
org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.
public class ConnectedOpcUaClient {
private static final Logger LOG =
LoggerFactory.getLogger(ConnectedOpcUaClient.class);
- private final UaClient client;
+ private final OpcUaClient client;
private static final AtomicLong clientHandles = new AtomicLong(1L);
- public ConnectedOpcUaClient(UaClient client) {
+ public ConnectedOpcUaClient(OpcUaClient client) {
this.client = client;
}
@@ -144,7 +144,7 @@ public class ConnectedOpcUaClient {
*
* @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
*/
- public UaClient getClient() {
+ public OpcUaClient getClient() {
return this.client;
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
index a17d373b66..04a3f93998 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/client/SpOpcUaClient.java
@@ -23,11 +23,11 @@ import
org.apache.streampipes.commons.exceptions.SpConfigurationException;
import
org.apache.streampipes.extensions.connectors.opcua.config.MiloOpcUaConfigurationProvider;
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
+import org.eclipse.milo.opcua.binaryschema.GenericBsdParser;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
+import
org.eclipse.milo.opcua.sdk.client.dtd.DataTypeDictionarySessionInitializer;
import org.eclipse.milo.opcua.stack.core.UaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
@@ -37,9 +37,6 @@ import java.util.concurrent.ExecutionException;
*/
public class SpOpcUaClient<T extends OpcUaConfig> {
- private static final Logger LOG =
LoggerFactory.getLogger(SpOpcUaClient.class);
-
- private OpcUaClient client;
private final T spOpcConfig;
public SpOpcUaClient(T config) {
@@ -55,15 +52,8 @@ public class SpOpcUaClient<T extends OpcUaConfig> {
throws UaException, ExecutionException, InterruptedException,
SpConfigurationException, URISyntaxException {
OpcUaClientConfig clientConfig = new
MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
var client = OpcUaClient.create(clientConfig);
+ client.addSessionInitializer(new DataTypeDictionarySessionInitializer(new
GenericBsdParser()));
client.connect().get();
return new ConnectedOpcUaClient(client);
}
-
-
-
-
-
- public T getSpOpcConfig() {
- return spOpcConfig;
- }
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
index 333dbadc06..3bb397fbd3 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
@@ -23,7 +23,7 @@ import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import
org.apache.streampipes.extensions.connectors.opcua.config.identity.AnonymousIdentityConfig;
import
org.apache.streampipes.extensions.connectors.opcua.config.identity.UsernamePasswordIdentityConfig;
import
org.apache.streampipes.extensions.connectors.opcua.config.security.SecurityConfig;
-import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtils;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MessageSecurityMode;
@@ -103,9 +103,9 @@ public class SpOpcUaConfigExtractor {
if (useURL) {
String serverAddress =
extractor.singleValueParameter(OPC_SERVER_URL.name(), String.class);
- config.setOpcServerURL(OpcUaUtil.addOpcPrefixIfNotExists(serverAddress));
+
config.setOpcServerURL(OpcUaUtils.addOpcPrefixIfNotExists(serverAddress));
} else {
- String serverAddress = OpcUaUtil.addOpcPrefixIfNotExists(
+ String serverAddress = OpcUaUtils.addOpcPrefixIfNotExists(
extractor.singleValueParameter(OPC_SERVER_HOST.name(), String.class)
);
int port = extractor.singleValueParameter(OPC_SERVER_PORT.name(),
int.class);
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/BasicVariableNodeInfo.java
similarity index 54%
copy from
streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
copy to
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/BasicVariableNodeInfo.java
index cd4a63bf48..6d26b49d28 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/BasicVariableNodeInfo.java
@@ -16,24 +16,28 @@
*
*/
-package org.apache.streampipes.extensions.connectors.opcua.utils;
+package org.apache.streampipes.extensions.connectors.opcua.model;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import
org.eclipse.milo.opcua.sdk.client.model.nodes.variables.BaseDataVariableTypeNode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-public class OpcUaUtilTest {
+public class BasicVariableNodeInfo {
- private static final String SERVER_ADDRESS_WITH_OPC_PREFIX =
"opc.tcp://example.com";
+ private final BaseDataVariableTypeNode node;
- @Test
- public void testAddOpcPrefixIfNotExistsWithPrefix() {
- var result =
OpcUaUtil.addOpcPrefixIfNotExists(SERVER_ADDRESS_WITH_OPC_PREFIX);
- Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
+ public BasicVariableNodeInfo(BaseDataVariableTypeNode node) {
+ this.node = node;
}
- @Test
- public void testAddOpcPrefixIfNotExistsNoPrefix() {
- var result = OpcUaUtil.addOpcPrefixIfNotExists("example.com");
- Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
+ public BaseDataVariableTypeNode getNode() {
+ return node;
}
-}
\ No newline at end of file
+
+ public String getDisplayName() {
+ return node.getDisplayName().getText();
+ }
+
+ public NodeId getNodeId() {
+ return node.getNodeId();
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/ExtensionObjectOpcUaNode.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/ExtensionObjectOpcUaNode.java
new file mode 100644
index 0000000000..ea04b99170
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/ExtensionObjectOpcUaNode.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.model;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaTypes;
+import org.apache.streampipes.model.connect.guess.FieldStatus;
+import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+
+import org.eclipse.milo.opcua.binaryschema.Struct;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+
+import java.util.List;
+import java.util.Map;
+
+public class ExtensionObjectOpcUaNode implements OpcUaNode {
+
+ private final BasicVariableNodeInfo nodeInfo;
+ private final List<String> runtimeNamesToDelete;
+
+ public ExtensionObjectOpcUaNode(BasicVariableNodeInfo nodeInfo,
+ List<String> runtimeNamesToDelete) {
+ this.nodeInfo = nodeInfo;
+ this.runtimeNamesToDelete = runtimeNamesToDelete;
+ }
+
+ @Override
+ public BasicVariableNodeInfo nodeInfo() {
+ return nodeInfo;
+ }
+
+ @Override
+ public int getNumberOfEventProperties() {
+ return 0;
+ }
+
+ @Override
+ public void addToSchema(OpcUaClient client,
+ List<EventProperty> eventProperties) {
+ var struct = extractStruct(client,
nodeInfo.getNode().getValue().getValue());
+ struct.getMembers().forEach((key, member) -> {
+ eventProperties.add(
+
PrimitivePropertyBuilder.create(OpcUaTypes.getTypeFromValue(member.getValue()),
key)
+ .label(key)
+ .build()
+ );
+ });
+ }
+
+ @Override
+ public void addToEvent(OpcUaClient client,
+ Map<String, Object> event,
+ Variant variant) {
+ var struct = extractStruct(client, variant);
+
+ struct.getMembers().forEach((key, member) -> {
+ event.put(key, member.getValue());
+ });
+ }
+
+ @Override
+ public void addToEventPreview(OpcUaClient client,
+ Map<String, Object> eventPreview,
+ Map<String, FieldStatusInfo> fieldStatusInfos,
+ Variant variant,
+ FieldStatusInfo fieldStatusInfo) {
+ if (fieldStatusInfo.getFieldStatus() == FieldStatus.GOOD) {
+ var struct = extractStruct(client, variant);
+ struct.getMembers().forEach((key, member) -> {
+ eventPreview.put(key, member.getValue());
+ fieldStatusInfos.put(key, fieldStatusInfo);
+ });
+ } else {
+ throw new SpRuntimeException("Could not read value for " +
nodeInfo.getDisplayName());
+ }
+ }
+
+ private Struct extractStruct(OpcUaClient client,
+ Variant variant) {
+ if (variant.getValue() instanceof ExtensionObject extensionObject) {
+ var decoded =
extensionObject.decode(client.getDynamicSerializationContext());
+ if (decoded instanceof Struct struct) {
+ return struct;
+ }
+ }
+ throw new SpRuntimeException("Decoded value is not a Struct");
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcNode.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcNode.java
deleted file mode 100644
index f27b3321ff..0000000000
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcNode.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.extensions.connectors.opcua.model;
-
-import org.apache.streampipes.sdk.utils.Datatypes;
-
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-
-/**
- * OpcNode is a StreamPipes internal model of an OPC UA node.
- * It's main purpose is to ease the handling of nodes within StreamPipes.
- */
-
-public class OpcNode {
- String label;
- Datatypes type;
- NodeId nodeId;
- int opcUnitId;
- private boolean readable;
-
- /**
- * Constructor for class OpcNode without an OPC UA unit identifier. <br>
- * Unit identifier is set to zero as default.
- *
- * @param label name of the OPC UA node
- * @param type datatype of the OPC UA node
- * @param nodeId identifier of the OPC UA node
- */
- public OpcNode(String label, Datatypes type, NodeId nodeId) {
- this.label = label;
- this.type = type;
- this.nodeId = nodeId;
- this.opcUnitId = 0;
- }
-
- /**
- * Constructor for class OpcNode with an OPC UA unit identifier. <br>
- * This identifier references to an OPC UA measurement unit, e.g. degree
celsius. <br>
- * With {@link OpcNode#getQudtURI()} the OPC UA internal ID is mapped to the
QUDT unit ontology <br>
- *
- * @param label name of the OPC UA node
- * @param type datatype of the OPC UA node
- * @param nodeId identifier of the OPC UA node
- * @param opcUnitId OPC UA internal unit identifier
- */
- public OpcNode(String label, Datatypes type, NodeId nodeId, Integer
opcUnitId) {
- this.label = label;
- this.type = type;
- this.nodeId = nodeId;
- this.opcUnitId = opcUnitId;
- }
-
- public String getLabel() {
- return label;
- }
-
- public void setLabel(String label) {
- this.label = label;
- }
-
- public Datatypes getType() {
- return type;
- }
-
- public void setType(Datatypes type) {
- this.type = type;
- }
-
- public NodeId getNodeId() {
- return nodeId;
- }
-
- public void setNodeId(NodeId nodeId) {
- this.nodeId = nodeId;
- }
-
- public int getOpcUnitId() {
- return this.opcUnitId;
- }
-
- public boolean hasUnitId() {
- // zero is the default case when no unit id is present
- return this.opcUnitId != 0;
- }
-
- public boolean isReadable() {
- return readable;
- }
-
- public void setReadable(boolean readable) {
- this.readable = readable;
- }
-
- /**
- * Returns the corresponding QUDT URI if the {@code opcUnitId} is given,
- * otherwise it returns an empty string. <br>
- * Currently, there are only two examples added. <br>
- * Other units have to be added manually, please have a look at the <a
href="http://opcfoundation.org/UA/EngineeringUnits/UNECE/UNECE_to_OPCUA.csv">
OPC UA unitID mapping table</a>. <br>
- *
- * @return QUDT URI as string of the given unit
- */
-
- public String getQudtURI() {
- switch (this.opcUnitId) {
- case 17476:
- return "http://qudt.org/vocab/unit#DEG";
- case 4408652:
- return "http://qudt.org/vocab/unit#DegreeCelsius";
- default:
- return "";
- }
- }
-}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcUaNode.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcUaNode.java
new file mode 100644
index 0000000000..f25efd4110
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcUaNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.model;
+
+import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
+import org.apache.streampipes.model.schema.EventProperty;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+
+import java.util.List;
+import java.util.Map;
+
+public interface OpcUaNode {
+
+ BasicVariableNodeInfo nodeInfo();
+
+ int getNumberOfEventProperties();
+
+ void addToSchema(OpcUaClient client,
+ List<EventProperty> eventProperties);
+
+ void addToEvent(OpcUaClient client,
+ Map<String, Object> event,
+ Variant variant);
+
+ void addToEventPreview(OpcUaClient client,
+ Map<String, Object> eventPreview,
+ Map<String, FieldStatusInfo> fieldStatusInfos,
+ Variant variant,
+ FieldStatusInfo fieldStatusInfo);
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcUaNodeFactory.java
similarity index 54%
copy from
streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
copy to
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcUaNodeFactory.java
index cd4a63bf48..2bfd8944a4 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/OpcUaNodeFactory.java
@@ -16,24 +16,22 @@
*
*/
-package org.apache.streampipes.extensions.connectors.opcua.utils;
+package org.apache.streampipes.extensions.connectors.opcua.model;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaTypes;
-public class OpcUaUtilTest {
+import
org.eclipse.milo.opcua.sdk.client.model.nodes.variables.BaseDataVariableTypeNode;
- private static final String SERVER_ADDRESS_WITH_OPC_PREFIX =
"opc.tcp://example.com";
+import java.util.List;
- @Test
- public void testAddOpcPrefixIfNotExistsWithPrefix() {
- var result =
OpcUaUtil.addOpcPrefixIfNotExists(SERVER_ADDRESS_WITH_OPC_PREFIX);
- Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
- }
+public class OpcUaNodeFactory {
- @Test
- public void testAddOpcPrefixIfNotExistsNoPrefix() {
- var result = OpcUaUtil.addOpcPrefixIfNotExists("example.com");
- Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
+ public static OpcUaNode createOpcUaNode(BasicVariableNodeInfo nodeInfo,
+ List<String> runtimeNamesToDelete) {
+ if (OpcUaTypes.isExtensionOrCustom(nodeInfo.getNode())) {
+ return new ExtensionObjectOpcUaNode(nodeInfo, runtimeNamesToDelete);
+ } else {
+ return new PrimitiveOpcUaNode(nodeInfo, runtimeNamesToDelete);
+ }
}
-}
\ No newline at end of file
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/PrimitiveOpcUaNode.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/PrimitiveOpcUaNode.java
new file mode 100644
index 0000000000..381d0bdf84
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/model/PrimitiveOpcUaNode.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.model;
+
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaTypes;
+import org.apache.streampipes.model.connect.guess.FieldStatus;
+import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.utils.Datatypes;
+
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+
+import java.util.List;
+import java.util.Map;
+
+public class PrimitiveOpcUaNode implements OpcUaNode {
+
+ private final BasicVariableNodeInfo nodeInfo;
+ private final List<String> runtimeNamesToDelete;
+
+ public PrimitiveOpcUaNode(BasicVariableNodeInfo nodeInfo,
+ List<String> runtimeNamesToDelete) {
+ this.nodeInfo = nodeInfo;
+ this.runtimeNamesToDelete = runtimeNamesToDelete;
+ }
+
+ @Override
+ public BasicVariableNodeInfo nodeInfo() {
+ return nodeInfo;
+ }
+
+ @Override
+ public int getNumberOfEventProperties() {
+ return 1;
+ }
+
+ @Override
+ public void addToSchema(OpcUaClient client,
+ List<EventProperty> eventProperties) {
+ eventProperties.add(PrimitivePropertyBuilder
+ .create(getType(), nodeInfo().getDisplayName())
+ .label(nodeInfo().getDisplayName())
+ .build());
+ }
+
+ @Override
+ public void addToEvent(OpcUaClient client,
+ Map<String, Object> event,
+ Variant variant) {
+ event.put(nodeInfo.getDisplayName(), variant.getValue());
+ }
+
+ @Override
+ public void addToEventPreview(OpcUaClient client,
+ Map<String, Object> eventPreview,
+ Map<String, FieldStatusInfo> fieldStatusInfos,
+ Variant variant,
+ FieldStatusInfo fieldStatusInfo) {
+ if (fieldStatusInfo.getFieldStatus() == FieldStatus.GOOD) {
+ eventPreview.put(nodeInfo().getDisplayName(), variant.getValue());
+ }
+ fieldStatusInfos.put(nodeInfo().getDisplayName(), fieldStatusInfo);
+ }
+
+ private Datatypes getType() {
+ UInteger value = (UInteger) nodeInfo.getNode().getDataType()
+ .getIdentifier();
+ return OpcUaTypes.getType(value);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
index b81f20d49c..1674b2f677 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/sink/OpcUaSink.java
@@ -29,7 +29,7 @@ import
org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProvider;
import
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
-import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtils;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
@@ -114,6 +114,6 @@ public class OpcUaSink implements IStreamPipesDataSink,
SupportsRuntimeConfig {
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
IStaticPropertyExtractor
extractor) throws SpConfigurationException {
- return OpcUaUtil.resolveConfig(clientProvider, staticPropertyInternalName,
extractor);
+ return OpcUaUtils.resolveConfig(clientProvider,
staticPropertyInternalName, extractor);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaTypes.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaTypes.java
index 46692e96ef..e87cfc60f7 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaTypes.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaTypes.java
@@ -20,8 +20,12 @@ package
org.apache.streampipes.extensions.connectors.opcua.utils;
import org.apache.streampipes.sdk.utils.Datatypes;
+import
org.eclipse.milo.opcua.sdk.client.model.nodes.variables.BaseDataVariableTypeNode;
+import org.eclipse.milo.opcua.stack.core.BuiltinDataType;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
+import java.util.Objects;
+
public class OpcUaTypes {
/**
@@ -54,5 +58,32 @@ public class OpcUaTypes {
return Datatypes.String;
}
+ public static Datatypes getTypeFromValue(Object value) {
+ if (value instanceof Boolean) {
+ return Datatypes.Boolean;
+ } else if (value instanceof Integer) {
+ return Datatypes.Integer;
+ } else if (value instanceof Long) {
+ return Datatypes.Long;
+ } else if (value instanceof Float) {
+ return Datatypes.Float;
+ } else if (value instanceof Double) {
+ return Datatypes.Double;
+ } else {
+ return Datatypes.String;
+ }
+ }
+
+ /**
+ * Determines if the node is an extension data type or a custom data type
+ *
+ * @param node a data variable node
+ * @return true if the node is an ExtensionObject or custom data type
+ */
+ public static boolean isExtensionOrCustom(BaseDataVariableTypeNode node) {
+ return !BuiltinDataType.isBuiltin(node.getDataType())
+ || Objects.equals(node.getDataType(),
BuiltinDataType.ExtensionObject.getNodeId());
+ }
+
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java
similarity index 53%
rename from
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
rename to
streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java
index 8cd76401f9..2d0babe581 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtil.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java
@@ -19,9 +19,6 @@
package org.apache.streampipes.extensions.connectors.opcua.utils;
import org.apache.streampipes.commons.exceptions.SpConfigurationException;
-import org.apache.streampipes.commons.exceptions.connect.AdapterException;
-import org.apache.streampipes.commons.exceptions.connect.ParseException;
-import
org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
import
org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaNodeBrowser;
@@ -29,37 +26,22 @@ import
org.apache.streampipes.extensions.connectors.opcua.client.OpcUaClientProv
import org.apache.streampipes.extensions.connectors.opcua.config.OpcUaConfig;
import
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
import
org.apache.streampipes.extensions.connectors.opcua.config.SpOpcUaConfigExtractor;
-import org.apache.streampipes.extensions.connectors.opcua.model.OpcNode;
-import org.apache.streampipes.model.connect.guess.FieldStatusInfo;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventSchema;
import
org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
-import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
-import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.eclipse.milo.opcua.sdk.client.api.UaClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.UaException;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
/***
* Collection of several utility functions in context of OPC UA
*/
-public class OpcUaUtil {
+public class OpcUaUtils {
private static final String OPC_TCP_PREFIX = "opc.tcp://";
@@ -72,102 +54,6 @@ public class OpcUaUtil {
return serverAddress.startsWith(OPC_TCP_PREFIX) ? serverAddress :
OPC_TCP_PREFIX + serverAddress;
}
- /***
- * OPC UA specific implementation of
- * @param extractor
- * @return guess schema
- * @throws AdapterException
- * @throws ParseException
- */
- public static GuessSchema getSchema(OpcUaClientProvider clientProvider,
- IAdapterParameterExtractor extractor)
- throws AdapterException, ParseException {
- var builder = GuessSchemaBuilder.create();
- EventSchema eventSchema = new EventSchema();
- Map<String, Object> eventPreview = new HashMap<>();
- Map<String, FieldStatusInfo> fieldStatusInfos = new HashMap<>();
- List<EventProperty> allProperties = new ArrayList<>();
-
- var opcUaConfig = SpOpcUaConfigExtractor.extractSharedConfig(
- extractor.getStaticPropertyExtractor(), new OpcUaConfig()
- );
- try {
- var connectedClient = clientProvider.getClient(opcUaConfig);
- OpcUaNodeBrowser nodeBrowser =
- new OpcUaNodeBrowser(connectedClient.getClient(), opcUaConfig);
- List<OpcNode> selectedNodes = nodeBrowser.findNodes();
-
- if (!selectedNodes.isEmpty()) {
- for (OpcNode opcNode : selectedNodes) {
- if (opcNode.hasUnitId()) {
- allProperties.add(PrimitivePropertyBuilder
- .create(opcNode.getType(),
opcNode.getLabel())
- .label(opcNode.getLabel())
- .measurementUnit(new
URI(opcNode.getQudtURI()))
- .build());
- } else {
- allProperties.add(PrimitivePropertyBuilder
- .create(opcNode.getType(),
opcNode.getLabel())
- .label(opcNode.getLabel())
- .build());
- }
- }
- }
-
- var nodeIds = selectedNodes.stream()
- .map(OpcNode::getNodeId)
- .collect(Collectors.toList());
- var response = connectedClient.getClient()
- .readValues(0, TimestampsToReturn.Both,
nodeIds);
-
- var returnValues = response.get();
-
- //clientProvider.releaseClient(opcUaConfig);
-
- makeEventPreview(selectedNodes, eventPreview, fieldStatusInfos,
returnValues);
-
-
- } catch (Exception e) {
- throw new AdapterException("Could not guess schema for opc node: " +
e.getMessage(), e);
- } finally {
- // TODO
- //spOpcUaClient.disconnect();
- clientProvider.releaseClient(opcUaConfig);
- }
-
- eventSchema.setEventProperties(allProperties);
- builder.properties(allProperties);
- builder.fieldStatusInfos(fieldStatusInfos);
- builder.preview(eventPreview);
-
- return builder.build();
- }
-
- private static void makeEventPreview(
- List<OpcNode> selectedNodes,
- Map<String, Object> eventPreview,
- Map<String, FieldStatusInfo> fieldStatusInfos,
- List<DataValue> dataValues
- ) {
-
- for (int i = 0; i < dataValues.size(); i++) {
- var dv = dataValues.get(i);
- String label = selectedNodes.get(i)
- .getLabel();
- if (StatusCode.GOOD.equals(dv.getStatusCode())) {
- var value = dv.getValue()
- .getValue();
- eventPreview.put(label, value);
- fieldStatusInfos.put(label, FieldStatusInfo.good());
- } else {
- String additionalInfo = dv.getStatusCode() != null ? dv.getStatusCode()
- .toString() :
"Status code is null";
- fieldStatusInfos.put(label, FieldStatusInfo.bad(additionalInfo,
false));
- }
- }
- }
-
-
/***
* OPC UA specific implementation of
* {@link ResolvesContainerProvidedOptions resolveOptions(String,
StaticPropertyExtractor)}.
@@ -219,10 +105,6 @@ public class OpcUaUtil {
throw new SpConfigurationException("Could not connect to the OPC UA
server with the provided settings", e);
} finally {
clientProvider.releaseClient(opcUaConfig);
- // TODO
-// if (spOpcUaClient.getClient() != null) {
-// spOpcUaClient.disconnect();
-// }
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
b/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
index cd4a63bf48..4d000118e0 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/test/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtilTest.java
@@ -27,13 +27,13 @@ public class OpcUaUtilTest {
@Test
public void testAddOpcPrefixIfNotExistsWithPrefix() {
- var result =
OpcUaUtil.addOpcPrefixIfNotExists(SERVER_ADDRESS_WITH_OPC_PREFIX);
+ var result =
OpcUaUtils.addOpcPrefixIfNotExists(SERVER_ADDRESS_WITH_OPC_PREFIX);
Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
}
@Test
public void testAddOpcPrefixIfNotExistsNoPrefix() {
- var result = OpcUaUtil.addOpcPrefixIfNotExists("example.com");
+ var result = OpcUaUtils.addOpcPrefixIfNotExists("example.com");
Assertions.assertEquals(SERVER_ADDRESS_WITH_OPC_PREFIX, result);
}
-}
\ No newline at end of file
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index 538f8243ae..e1e305e4a9 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -66,7 +66,6 @@ public class CouchDbInstallationStep extends InstallationStep
{
Utils.getCouchDbUserClient();
Utils.getCouchDbPipelineClient();
Utils.getCouchDbNotificationClient();
- Utils.getCouchDbPipelineCategoriesClient();
logSuccess(getTitle());
} catch (Exception e) {
diff --git
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index 65a3d10c53..b6f9bf20ba 100644
---
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -94,10 +94,6 @@ public class Utils {
return getCouchDbStandardSerializerClient("notification");
}
- public static CouchDbClient getCouchDbPipelineCategoriesClient() {
- return getCouchDbStandardSerializerClient("pipelinecategories");
- }
-
public static CouchDbClient getCouchDbGsonClient(String dbname) {
CouchDbClient dbClient = new CouchDbClient(props(dbname));
dbClient.setGsonBuilder(GsonSerializer.getGsonBuilder());