This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch opc-fix-master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 979a331474107470cc74ef6c4b25d614e7b5e627
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 20 16:43:01 2026 +0800

    complete
---
 .../sink/protocol/opcua/server/OpcUaNameSpace.java | 108 ++++++++++++++++++---
 1 file changed, 97 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 3c79a3aa304..e1ff701f088 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -47,7 +47,7 @@ import 
org.eclipse.milo.opcua.sdk.server.model.nodes.objects.BaseEventTypeNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
 import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
-import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
+import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
@@ -57,6 +57,7 @@ import 
org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,20 +72,23 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcUaNameSpace.class);
   public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
-  private final SubscriptionModel subscriptionModel;
   private final OpcUaServerBuilder builder;
 
+  // Do not use subscription model because the original subscription model has 
some bugs
+  private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new 
ConcurrentHashMap<>();
+
   public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder 
builder) {
     super(server, NAMESPACE_URI);
     this.builder = builder;
 
-    subscriptionModel = new SubscriptionModel(server, this);
-    getLifecycleManager().addLifecycle(subscriptionModel);
     getLifecycleManager()
         .addLifecycle(
             new Lifecycle() {
@@ -291,7 +295,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
         if (Objects.isNull(measurementNode.getValue())
             || Objects.isNull(measurementNode.getValue().getSourceTime())
             || measurementNode.getValue().getSourceTime().getUtcTime() < 
utcTimestamp) {
-          measurementNode.setValue(dataValue);
+          notifyNodeValueChange(measurementNode.getNodeId(), dataValue, 
measurementNode);
         }
       } else {
         value = values.get(i);
@@ -311,9 +315,11 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       if (Objects.isNull(valueNode.getValue())
           || Objects.isNull(valueNode.getValue().getSourceTime())
           || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
-        valueNode.setValue(
+        notifyNodeValueChange(
+            valueNode.getNodeId(),
             new DataValue(
-                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()));
+                new Variant(value), currentQuality, new DateTime(timestamp), 
new DateTime()),
+            valueNode);
       }
     }
   }
@@ -546,24 +552,104 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
     }
   }
 
+  /**
+   * On point value changing, notify all subscribed clients proactively
+   *
+   * @param nodeId NodeId of the changing node
+   * @param newValue New value of the node (DataValue object containing value, 
status code, and
+   *     timestamp)
+   * @param variableNode Corresponding UaVariableNode instance, used to update 
the local cached
+   *     value of the node
+   */
+  public void notifyNodeValueChange(
+      NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
+    // 1. Update the local value of the node first, to ensure that the latest 
value can be obtained
+    // directly when the client calls the Read service
+    variableNode.setValue(newValue);
+
+    // 2. Proactively push the change to all subscribed clients
+    List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
+    if (subscribedItems != null && !subscribedItems.isEmpty()) {
+      for (DataItem item : subscribedItems) {
+        try {
+          // Proactively push, the client will immediately receive the change 
notification, no need
+          // to wait for polling
+          item.setValue(newValue);
+        } catch (Exception e) {
+          // Single client push failure does not affect other clients, just 
log it
+          LOGGER.warn("Failed to push value change to subscription client, 
nodeId={}", nodeId, e);
+        }
+      }
+    }
+  }
+
   @Override
   public void onDataItemsCreated(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsCreated(dataItems);
+    for (DataItem item : dataItems) {
+      final ReadValueId readValueId = item.getReadValueId();
+      // Only handle Value attribute subscription (align with the original 
SubscriptionModel logic,
+      // ignore other attribute subscriptions)
+      if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+        continue;
+      }
+      final NodeId nodeId = readValueId.getNodeId();
+
+      // 1. Add the new subscription item to the subscription mapping
+      nodeSubscriptions.compute(
+          nodeId,
+          (k, existingList) -> {
+            List<DataItem> list =
+                existingList != null ? existingList : new 
CopyOnWriteArrayList<>();
+            list.add(item);
+            return list;
+          });
+
+      // 2. 【Key Optimization】Proactively push the current node's initial 
value when the new
+      // subscription item is created
+      // Eliminate Bad_WaitingForInitialData, no need to wait for any polling
+      try {
+        UaVariableNode node = (UaVariableNode) 
getNodeManager().getNode(nodeId).orElse(null);
+        if (node != null && node.getValue() != null) {
+          // Immediately push the current value to the new subscriber, the 
client will instantly be
+          // able to get the initial data
+          item.setValue(node.getValue());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Failed to send initial value to new subscription, 
nodeId={}", nodeId, e);
+      }
+    }
   }
 
   @Override
   public void onDataItemsModified(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsModified(dataItems);
+    // Push mode, client modifies subscription parameters (e.g. sampling 
interval) has no effect on
+    // our active push, no additional processing is needed
   }
 
   @Override
   public void onDataItemsDeleted(final List<DataItem> dataItems) {
-    subscriptionModel.onDataItemsDeleted(dataItems);
+    for (DataItem item : dataItems) {
+      final ReadValueId readValueId = item.getReadValueId();
+      if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
+        continue;
+      }
+      final NodeId nodeId = readValueId.getNodeId();
+
+      // When the client cancels the subscription, remove this subscription 
item from the mapping
+      nodeSubscriptions.computeIfPresent(
+          nodeId,
+          (k, existingList) -> {
+            existingList.remove(item);
+            // Automatically clean up the key when there are no subscribers, 
save memory
+            return existingList.isEmpty() ? null : existingList;
+          });
+    }
   }
 
   @Override
   public void onMonitoringModeChanged(final List<MonitoredItem> 
monitoredItems) {
-    subscriptionModel.onMonitoringModeChanged(monitoredItems);
+    // Push mode, monitoring mode change has no effect on active push, no 
additional processing is
+    // needed
   }
 
   /////////////////////////////// Conflict detection 
///////////////////////////////

Reply via email to