This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/client-opc by this push:
     new fd395599241 fix
fd395599241 is described below

commit fd395599241f9e7bbd923f0e235a821655f5155d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 16:34:18 2025 +0800

    fix
---
 .../db/pipe/sink/protocol/opcua/OpcUaSink.java     |  2 +
 .../protocol/opcua/client/IoTDBOpcUaClient.java    | 68 ----------------------
 2 files changed, 2 insertions(+), 68 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 2856d2d0433..6928385eca9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
 import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
@@ -321,6 +322,7 @@ public class OpcUaSink implements PipeConnector {
                 parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY, 
SINK_IOTDB_PASSWORD_KEY))
             : new AnonymousProvider();
     client = new IoTDBOpcUaClient(nodeUrl, policy, provider);
+    new ClientRunner(client).run();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index a46f2153e95..120039d5600 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -68,74 +68,6 @@ public class IoTDBOpcUaClient {
   public void run(OpcUaClient client) throws Exception {
     // synchronous connect
     client.connect().get();
-
-    // create a subscription and a monitored item
-    final UaSubscription subscription =
-        client.getSubscriptionManager().createSubscription(200.0).get();
-
-    final ReadValueId readValueId =
-        new ReadValueId(
-            Identifiers.Server, AttributeId.EventNotifier.uid(), null, 
QualifiedName.NULL_VALUE);
-
-    // client handle must be unique per item
-    final UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
-    final EventFilter eventFilter =
-        new EventFilter(
-            new SimpleAttributeOperand[] {
-              new SimpleAttributeOperand(
-                  Identifiers.BaseEventType,
-                  new QualifiedName[] {new QualifiedName(0, "Time")},
-                  AttributeId.Value.uid(),
-                  null),
-              new SimpleAttributeOperand(
-                  Identifiers.BaseEventType,
-                  new QualifiedName[] {new QualifiedName(0, "Message")},
-                  AttributeId.Value.uid(),
-                  null),
-              new SimpleAttributeOperand(
-                  Identifiers.BaseEventType,
-                  new QualifiedName[] {new QualifiedName(0, "SourceName")},
-                  AttributeId.Value.uid(),
-                  null),
-              new SimpleAttributeOperand(
-                  Identifiers.BaseEventType,
-                  new QualifiedName[] {new QualifiedName(0, "SourceNode")},
-                  AttributeId.Value.uid(),
-                  null)
-            },
-            new ContentFilter(null));
-
-    final MonitoringParameters parameters =
-        new MonitoringParameters(
-            clientHandle,
-            0.0,
-            ExtensionObject.encode(client.getStaticSerializationContext(), 
eventFilter),
-            uint(10000),
-            true);
-
-    final MonitoredItemCreateRequest request =
-        new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, 
parameters);
-
-    final List<UaMonitoredItem> items =
-        subscription
-            .createMonitoredItems(TimestampsToReturn.Both, 
Collections.singletonList(request))
-            .get();
-
-    // do something with the value updates
-    final UaMonitoredItem monitoredItem = items.get(0);
-
-    final AtomicInteger eventCount = new AtomicInteger(0);
-
-    monitoredItem.setEventConsumer(
-        (item, vs) -> {
-          eventCount.incrementAndGet();
-          System.out.println("Event Received from " + 
item.getReadValueId().getNodeId());
-
-          for (int i = 0; i < vs.length; i++) {
-            System.out.println(("\tvariant[" + i + "]: " + vs[i].getValue()));
-          }
-        });
   }
 
   /////////////////////////////// Getter ///////////////////////////////

Reply via email to