This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch opc-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e811890c2b8ee5510310d1c2945be1ec380741e0 Author: Caideyipi <[email protected]> AuthorDate: Mon Apr 20 16:43:01 2026 +0800 complete --- .../sink/protocol/opcua/server/OpcUaNameSpace.java | 110 ++++++++++++++++++--- 1 file changed, 98 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index 43144cbdd15..4a1a3aead1f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -45,7 +45,7 @@ import org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode; import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode; import org.eclipse.milo.opcua.sdk.server.nodes.UaNode; import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode; -import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel; +import org.eclipse.milo.opcua.stack.core.AttributeId; import org.eclipse.milo.opcua.stack.core.Identifiers; import org.eclipse.milo.opcua.stack.core.UaException; import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; @@ -55,6 +55,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,19 +69,22 @@ import java.util.List; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class); public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server"; - private final SubscriptionModel subscriptionModel; private final OpcUaServerBuilder builder; + // Do not use subscription model because the original subscription model has some bugs + private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new ConcurrentHashMap<>(); + public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) { super(server, NAMESPACE_URI); this.builder = builder; - subscriptionModel = new SubscriptionModel(server, this); - getLifecycleManager().addLifecycle(subscriptionModel); getLifecycleManager() .addLifecycle( new Lifecycle() { @@ -245,7 +249,7 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { measurementNode = new UaVariableNode.UaVariableNodeBuilder(getNodeContext()) .setNodeId(nodeId) - .setAccessLevel(AccessLevel.READ_WRITE) + .setAccessLevel(AccessLevel.READ_ONLY) .setUserAccessLevel(AccessLevel.READ_ONLY) .setBrowseName(newQualifiedName(nodeName)) .setDisplayName(LocalizedText.english(nodeName)) @@ -279,7 +283,7 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { if (Objects.isNull(measurementNode.getValue()) || Objects.isNull(measurementNode.getValue().getSourceTime()) || measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) { - measurementNode.setValue(dataValue); + notifyNodeValueChange(nodeId, dataValue, measurementNode); } } else { valueNode = measurementNode; @@ -291,9 +295,11 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { if (Objects.isNull(valueNode.getValue()) || Objects.isNull(valueNode.getValue().getSourceTime()) || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) { - valueNode.setValue( + notifyNodeValueChange( + valueNode.getNodeId(), new DataValue( - new Variant(value), currentQuality, new DateTime(timestamp), new DateTime())); + new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()), + valueNode); } } } @@ -451,24 +457,104 @@ public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { } } + /** + * On point value changing, notify all subscribed clients proactively + * + * @param nodeId NodeId of the changing node + * @param newValue New value of the node (DataValue object containing value, status code, and + * timestamp) + * @param variableNode Corresponding UaVariableNode instance, used to update the local cached + * value of the node + */ + public void notifyNodeValueChange( + NodeId nodeId, DataValue newValue, UaVariableNode variableNode) { + // 1. Update the local value of the node first, to ensure that the latest value can be obtained + // directly when the client calls the Read service + variableNode.setValue(newValue); + + // 2. Proactively push the change to all subscribed clients + List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId); + if (subscribedItems != null && !subscribedItems.isEmpty()) { + for (DataItem item : subscribedItems) { + try { + // Proactively push, the client will immediately receive the change notification, no need + // to wait for polling + item.setValue(newValue); + } catch (Exception e) { + // Single client push failure does not affect other clients, just log it + LOGGER.warn("Failed to push value change to subscription client, nodeId={}", nodeId, e); + } + } + } + } + @Override public void onDataItemsCreated(final List<DataItem> dataItems) { - subscriptionModel.onDataItemsCreated(dataItems); + for (DataItem item : dataItems) { + final ReadValueId readValueId = item.getReadValueId(); + // Only handle Value attribute subscription (align with the original SubscriptionModel logic, + // ignore other attribute subscriptions) + if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) { + continue; + } + final NodeId nodeId = readValueId.getNodeId(); + + // 1. Add the new subscription item to the subscription mapping + nodeSubscriptions.compute( + nodeId, + (k, existingList) -> { + List<DataItem> list = + existingList != null ? existingList : new CopyOnWriteArrayList<>(); + list.add(item); + return list; + }); + + // 2. 【Key Optimization】Proactively push the current node's initial value when the new + // subscription item is created + // Eliminate Bad_WaitingForInitialData, no need to wait for any polling + try { + UaVariableNode node = (UaVariableNode) getNodeManager().getNode(nodeId).orElse(null); + if (node != null && node.getValue() != null) { + // Immediately push the current value to the new subscriber, the client will instantly be + // able to get the initial data + item.setValue(node.getValue()); + } + } catch (Exception e) { + LOGGER.warn("Failed to send initial value to new subscription, nodeId={}", nodeId, e); + } + } } @Override public void onDataItemsModified(final List<DataItem> dataItems) { - subscriptionModel.onDataItemsModified(dataItems); + // Push mode, client modifies subscription parameters (e.g. sampling interval) has no effect on + // our active push, no additional processing is needed } @Override public void onDataItemsDeleted(final List<DataItem> dataItems) { - subscriptionModel.onDataItemsDeleted(dataItems); + for (DataItem item : dataItems) { + final ReadValueId readValueId = item.getReadValueId(); + if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) { + continue; + } + final NodeId nodeId = readValueId.getNodeId(); + + // When the client cancels the subscription, remove this subscription item from the mapping + nodeSubscriptions.computeIfPresent( + nodeId, + (k, existingList) -> { + existingList.remove(item); + // Automatically clean up the key when there are no subscribers, save memory + return existingList.isEmpty() ? null : existingList; + }); + } } @Override public void onMonitoringModeChanged(final List<MonitoredItem> monitoredItems) { - subscriptionModel.onMonitoringModeChanged(monitoredItems); + // Push mode, monitoring mode change has no effect on active push, no additional processing is + // needed } /////////////////////////////// Conflict detection ///////////////////////////////
