This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch client-opc
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/client-opc by this push:
new fd395599241 fix
fd395599241 is described below
commit fd395599241f9e7bbd923f0e235a821655f5155d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 22 16:34:18 2025 +0800
fix
---
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 2 +
.../protocol/opcua/client/IoTDBOpcUaClient.java | 68 ----------------------
2 files changed, 2 insertions(+), 68 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 2856d2d0433..6928385eca9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
@@ -321,6 +322,7 @@ public class OpcUaSink implements PipeConnector {
parameters.getStringByKeys(CONNECTOR_IOTDB_PASSWORD_KEY,
SINK_IOTDB_PASSWORD_KEY))
: new AnonymousProvider();
client = new IoTDBOpcUaClient(nodeUrl, policy, provider);
+ new ClientRunner(client).run();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index a46f2153e95..120039d5600 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -68,74 +68,6 @@ public class IoTDBOpcUaClient {
public void run(OpcUaClient client) throws Exception {
// synchronous connect
client.connect().get();
-
- // create a subscription and a monitored item
- final UaSubscription subscription =
- client.getSubscriptionManager().createSubscription(200.0).get();
-
- final ReadValueId readValueId =
- new ReadValueId(
- Identifiers.Server, AttributeId.EventNotifier.uid(), null,
QualifiedName.NULL_VALUE);
-
- // client handle must be unique per item
- final UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
- final EventFilter eventFilter =
- new EventFilter(
- new SimpleAttributeOperand[] {
- new SimpleAttributeOperand(
- Identifiers.BaseEventType,
- new QualifiedName[] {new QualifiedName(0, "Time")},
- AttributeId.Value.uid(),
- null),
- new SimpleAttributeOperand(
- Identifiers.BaseEventType,
- new QualifiedName[] {new QualifiedName(0, "Message")},
- AttributeId.Value.uid(),
- null),
- new SimpleAttributeOperand(
- Identifiers.BaseEventType,
- new QualifiedName[] {new QualifiedName(0, "SourceName")},
- AttributeId.Value.uid(),
- null),
- new SimpleAttributeOperand(
- Identifiers.BaseEventType,
- new QualifiedName[] {new QualifiedName(0, "SourceNode")},
- AttributeId.Value.uid(),
- null)
- },
- new ContentFilter(null));
-
- final MonitoringParameters parameters =
- new MonitoringParameters(
- clientHandle,
- 0.0,
- ExtensionObject.encode(client.getStaticSerializationContext(),
eventFilter),
- uint(10000),
- true);
-
- final MonitoredItemCreateRequest request =
- new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting,
parameters);
-
- final List<UaMonitoredItem> items =
- subscription
- .createMonitoredItems(TimestampsToReturn.Both,
Collections.singletonList(request))
- .get();
-
- // do something with the value updates
- final UaMonitoredItem monitoredItem = items.get(0);
-
- final AtomicInteger eventCount = new AtomicInteger(0);
-
- monitoredItem.setEventConsumer(
- (item, vs) -> {
- eventCount.incrementAndGet();
- System.out.println("Event Received from " +
item.getReadValueId().getNodeId());
-
- for (int i = 0; i < vs.length; i++) {
- System.out.println(("\tvariant[" + i + "]: " + vs[i].getValue()));
- }
- });
}
/////////////////////////////// Getter ///////////////////////////////