This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch opc-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e811890c2b8ee5510310d1c2945be1ec380741e0
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 20 16:43:01 2026 +0800

    complete
---
 .../sink/protocol/opcua/server/OpcUaNameSpace.java | 110 ++++++++++++++++++---
 1 file changed, 98 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 43144cbdd15..4a1a3aead1f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -45,7 +45,7 @@ import 
org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
-import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
@@ -55,6 +55,7 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,19 +69,22 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
   public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
-  private final SubscriptionModel subscriptionModel;
   private final OpcUaServerBuilder builder;
 
+  // Do not use subscription model because the original subscription model has 
some bugs
+  private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new 
ConcurrentHashMap<>();
+
   public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
     this.builder = builder;
 
-    subscriptionModel = new SubscriptionModel(server, this);
-    getLifecycleManager().addLifecycle(subscriptionModel);
     getLifecycleManager()
         .addLifecycle(
             new Lifecycle() {
@@ -245,7 +249,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         measurementNode =
             new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
                 .setNodeId(nodeId)
-                .setAccessLevel(AccessLevel.READ_WRITE)
+                .setAccessLevel(AccessLevel.READ_ONLY)
                 .setUserAccessLevel(AccessLevel.READ_ONLY)
                 .setBrowseName(newQualifiedName(nodeName))
                 .setDisplayName(LocalizedText.english(nodeName))
@@ -279,7 +283,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         if (Objects.isNull(measurementNode.getValue())
             || Objects.isNull(measurementNode.getValue().getSourceTime())
             || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
-          measurementNode.setValue(dataValue);
+          notifyNodeValueChange(nodeId, dataValue, measurementNode);
         }
       } else {
         valueNode = measurementNode;
@@ -291,9 +295,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       if (Objects.isNull(valueNode.getValue())
           || Objects.isNull(valueNode.getValue().getSourceTime())
           || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
-        valueNode.setValue(
+        notifyNodeValueChange(
+            valueNode.getNodeId(),
             new DataValue(
-                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()));
+                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()),
+            valueNode);
       }
     }
   }
@@ -451,24 +457,104 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
   }
 
+  /**
+   * On point value changing, notify all subscribed clients proactively
+   *
+   * @param nodeId NodeId of the changing node
+   * @param newValue New value of the node (DataValue object containing value, 
status code, and
+   *     timestamp)
+   * @param variableNode Corresponding UaVariableNode instance, used to update 
the local cached
+   *     value of the node
+   */
+  public void notifyNodeValueChange(
+      NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
+    // 1. Update the local value of the node first, to ensure that the latest 
value can be obtained
+    // directly when the client calls the Read service
+    variableNode.setValue(newValue);
+
+    // 2. Proactively push the change to all subscribed clients
+    List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
+    if (subscribedItems != null && !subscribedItems.isEmpty()) {
+      for (DataItem item : subscribedItems) {
+        try {
+          // Proactively push, the client will immediately receive the change 
notification, no need
+          // to wait for polling
+          item.setValue(newValue);
+        } catch (Exception e) {
+          // Single client push failure does not affect other clients, just 
log it
+          LOGGER.warn("Failed to push value change to subscription client, 
nodeId={}", nodeId, e);
+        }
+      }
+    }
+  }
+
   @Override
   public void onDataItemsCreated(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsCreated(dataItems);
+    for (DataItem item : dataItems) {
+      final ReadValueId readValueId = item.getReadValueId();
+      // Only handle Value attribute subscription (align with the original 
SubscriptionModel logic,
+      // ignore other attribute subscriptions)
+      if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+        continue;
+      }
+      final NodeId nodeId = readValueId.getNodeId();
+
+      // 1. Add the new subscription item to the subscription mapping
+      nodeSubscriptions.compute(
+          nodeId,
+          (k, existingList) -> {
+            List<DataItem> list =
+                existingList != null ? existingList : new 
CopyOnWriteArrayList<>();
+            list.add(item);
+            return list;
+          });
+
+      // 2. 【Key Optimization】Proactively push the current node's initial 
value when the new
+      // subscription item is created
+      // Eliminate Bad_WaitingForInitialData, no need to wait for any polling
+      try {
+        UaVariableNode node = (UaVariableNode) 
getNodeManager().getNode(nodeId).orElse(null);
+        if (node != null && node.getValue() != null) {
+          // Immediately push the current value to the new subscriber, the 
client will instantly be
+          // able to get the initial data
+          item.setValue(node.getValue());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Failed to send initial value to new subscription, 
nodeId={}", nodeId, e);
+      }
+    }
   }
 
   @Override
   public void onDataItemsModified(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsModified(dataItems);
+    // Push mode, client modifies subscription parameters (e.g. sampling 
interval) has no effect on
+    // our active push, no additional processing is needed
   }
 
   @Override
   public void onDataItemsDeleted(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsDeleted(dataItems);
+    for (DataItem item : dataItems) {
+      final ReadValueId readValueId = item.getReadValueId();
+      if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+        continue;
+      }
+      final NodeId nodeId = readValueId.getNodeId();
+
+      // When the client cancels the subscription, remove this subscription 
item from the mapping
+      nodeSubscriptions.computeIfPresent(
+          nodeId,
+          (k, existingList) -> {
+            existingList.remove(item);
+            // Automatically clean up the key when there are no subscribers, 
save memory
+            return existingList.isEmpty() ? null : existingList;
+          });
+    }
   }
 
   @Override
   public void onMonitoringModeChanged(final List<MonitoredItem> 
monitoredItems) {
-    subscriptionModel.onMonitoringModeChanged(monitoredItems);
+    // Push mode, monitoring mode change has no effect on active push, no 
additional processing is
+    // needed
   }
 
   /////////////////////////////// Conflict detection 
///////////////////////////////

Reply via email to