This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new d1249daa902 Pipe: Implemented OPC DA Sink for local COM & Fixed the
newest value of OPC UA Sink (#14964) (#14978)
d1249daa902 is described below
commit d1249daa902eaba92a7af0e5230c798b449ed1bf
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 28 11:23:53 2025 +0800
Pipe: Implemented OPC DA Sink for local COM & Fixed the newest value of OPC
UA Sink (#14964) (#14978)
Co-authored-by: Steve Yurong Su <[email protected]>
---
iotdb-core/datanode/pom.xml | 8 +
.../PipeDataRegionConnectorConstructor.java | 4 +
.../connector/protocol/opcda/OpcDaConnector.java | 136 ++++++++
.../pipe/connector/protocol/opcda/OpcDaHeader.java | 200 +++++++++++
.../protocol/opcda/OpcDaServerHandle.java | 387 +++++++++++++++++++++
.../connector/protocol/opcua/OpcUaConnector.java | 47 ++-
.../connector/protocol/opcua/OpcUaNameSpace.java | 2 +-
.../agent/plugin/builtin/BuiltinPipePlugin.java | 5 +
.../builtin/connector/opcda/OpcDaConnector.java | 30 ++
.../config/constant/PipeConnectorConstant.java | 6 +
pom.xml | 5 +
11 files changed, 815 insertions(+), 15 deletions(-)
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 6194b6272a4..ae4ae4c62b2 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -165,6 +165,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>net.java.dev.jna</groupId>
+ <artifactId>jna</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.jna</groupId>
+ <artifactId>jna-platform</artifactId>
+ </dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
index e11e8dc5685..c95fec2b4bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConst
import
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
import
org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBDataRegionAirGapConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaConnector;
import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -65,6 +66,8 @@ class PipeDataRegionConnectorConstructor extends
PipeConnectorConstructor {
BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(),
WebSocketConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
OpcUaConnector::new);
+ pluginConstructors.put(
+ BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(),
OpcDaConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(),
DoNothingConnector::new);
pluginConstructors.put(
@@ -91,6 +94,7 @@ class PipeDataRegionConnectorConstructor extends
PipeConnectorConstructor {
pluginConstructors.put(
BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(),
WebSocketConnector::new);
pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
OpcUaConnector::new);
+ pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(),
OpcDaConnector::new);
pluginConstructors.put(
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(),
DoNothingConnector::new);
pluginConstructors.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaConnector.java
new file mode 100644
index 00000000000..8bd54d9c0a8
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaConnector.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcda;
+
+import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_CLSID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_PROGID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_CLSID_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_PROGID_KEY;
+
+/**
+ * Send data in IoTDB based on Opc Da protocol, using JNA. All data are
converted into tablets, and
+ * then push the newest value to the <b>local COM</b> server in another
process.
+ */
+public class OpcDaConnector implements PipeConnector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OpcDaConnector.class);
+ private static final Map<String, Pair<AtomicInteger, OpcDaServerHandle>>
+ CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP = new ConcurrentHashMap<>();
+ private String clsID;
+ private OpcDaServerHandle handle;
+
+ @Override
+ public void validate(final PipeParameterValidator validator) throws
Exception {
+ // TODO: upgrade this logic after "1 in 2" logic is supported
+ validator.validate(
+ args ->
+ (((boolean) args[1] || (boolean) args[2] || (boolean) args[3] ||
(boolean) args[4])),
+ String.format(
+ "One of '%s', '%s', '%s' and '%s' must be specified",
+ SINK_OPC_DA_CLSID_KEY,
+ CONNECTOR_OPC_DA_CLSID_KEY,
+ SINK_OPC_DA_PROGID_KEY,
+ CONNECTOR_OPC_DA_PROGID_KEY),
+ validator.getParameters().hasAttribute(SINK_OPC_DA_CLSID_KEY),
+ validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_CLSID_KEY),
+ validator.getParameters().hasAttribute(SINK_OPC_DA_PROGID_KEY),
+ validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_PROGID_KEY));
+ }
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration
configuration)
+ throws Exception {
+ synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
+ clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY,
SINK_OPC_DA_CLSID_KEY);
+ if (Objects.isNull(clsID)) {
+ clsID =
+ OpcDaServerHandle.getClsIDFromProgID(
+ parameters.getStringByKeys(CONNECTOR_OPC_DA_PROGID_KEY,
SINK_OPC_DA_PROGID_KEY));
+ }
+ handle =
+ CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP
+ .computeIfAbsent(
+ clsID, key -> new Pair<>(new AtomicInteger(0), new
OpcDaServerHandle(clsID)))
+ .getRight();
+
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID).getLeft().incrementAndGet();
+ }
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void heartbeat() throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ OpcUaConnector.transferByTablet(
+ tabletInsertionEvent, LOGGER, tablet -> handle.transfer(tablet));
+ }
+
+ @Override
+ public void transfer(final Event event) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (Objects.isNull(clsID)) {
+ return;
+ }
+
+ synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
+ final Pair<AtomicInteger, OpcDaServerHandle> pair =
+ CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID);
+ if (pair == null) {
+ return;
+ }
+
+ if (pair.getLeft().decrementAndGet() <= 0) {
+ try {
+ pair.getRight().close();
+ } finally {
+ CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(clsID);
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaHeader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaHeader.java
new file mode 100644
index 00000000000..878a719b12e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaHeader.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcda;
+
+import com.sun.jna.Pointer;
+import com.sun.jna.Structure;
+import com.sun.jna.WString;
+import com.sun.jna.platform.win32.COM.Unknown;
+import com.sun.jna.platform.win32.Guid;
+import com.sun.jna.platform.win32.Variant;
+import com.sun.jna.ptr.IntByReference;
+import com.sun.jna.ptr.PointerByReference;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** We define the OPC DA Classes and interfaces here like C's .h file. */
+public class OpcDaHeader {
+ // IOPCServer
+ static final Guid.IID IID_IOPCServer = new
Guid.IID("39C13A4D-011E-11D0-9675-0020AFD8ADB3");
+
+ // IOPCItemMgt
+ static final Guid.IID IID_IOPCItemMgt = new
Guid.IID("39C13A54-011E-11D0-9675-0020AFD8ADB3");
+
+ // IOPCSyncIO
+ static final Guid.IID IID_IOPCSyncIO = new
Guid.IID("39C13A52-011E-11D0-9675-0020AFD8ADB3");
+
+ // IUnknown
+ static final Guid.IID IID_IUNKNOWN = new
Guid.IID("00000000-0000-0000-C000-000000000046");
+
+ public static class IOPCServer extends Unknown {
+ public IOPCServer(final Pointer p) {
+ super(p);
+ }
+
+ // /* [string][in] */ LPCWSTR szName,
+ // /* [in] */ BOOL bActive,
+ // /* [in] */ DWORD dwRequestedUpdateRate,
+ // /* [in] */ OPCHANDLE hClientGroup,
+ // /* [in][unique] */ LONG *pTimeBias,
+ // /* [in][unique] */ FLOAT *pPercentDeadband,
+ // /* [in] */ DWORD dwLCID,
+ // /* [out] */ OPCHANDLE *phServerGroup,
+ // /* [out] */ DWORD *pRevisedUpdateRate,
+ // /* [in] */ REFIID riid,
+ // /* [iid_is][out] */ LPUNKNOWN *ppUnk) = 0;
+ public int addGroup(
+ final String szName, // Group name ("" means auto)
+ final boolean bActive, // Whether to activate the group
+ final int dwRequestedUpdateRate, // The update rate of request (ms)
+ final int hClientGroup, // The handle of client group
+ final Pointer pTimeBias, // Time zone bias
+ final Pointer pPercentDeadband, // Dead band
+ final int dwLCID, // Region ID
+ final PointerByReference phServerGroup, // Server group handler
+ final IntByReference pRevisedUpdateRate, // Real update rate
+ final Guid.GUID.ByReference riid, // Interface IID
+ final PointerByReference ppUnk // The OPC Group pointer returned
+ ) {
+ // Convert Java string into COM "bstr"
+ final WString wName = new WString(szName);
+
+ return this._invokeNativeInt(
+ 3,
+ new Object[] {
+ this.getPointer(),
+ wName,
+ bActive ? 1 : 0,
+ dwRequestedUpdateRate,
+ hClientGroup,
+ pTimeBias,
+ pPercentDeadband,
+ dwLCID,
+ phServerGroup,
+ pRevisedUpdateRate,
+ riid != null ? riid.getPointer() : null,
+ ppUnk
+ });
+ }
+ }
+
+ // IOPCItemMgt(
+ // /* [in] */ DWORD dwCount,
+ // /* [in] */ OPCITEMDEF *pItemArray,
+ // /* [out] */ OPCITEMRESULT **ppAddResults,
+ // /* [out] */ HRESULT **ppErrors) = 0;
+ public static class IOPCItemMgt extends Unknown {
+ public IOPCItemMgt(final Pointer p) {
+ super(p);
+ }
+
+ public int addItems(
+ final int dwCount, // Data count
+ final OPCITEMDEF[] pItemArray, // Items array to create
+ final PointerByReference pResults, // Results' handles
+ final PointerByReference pErrors // Error's pointers
+ ) {
+ return this._invokeNativeInt(
+ 3, new Object[] {this.getPointer(), dwCount, pItemArray, pResults,
pErrors});
+ }
+ }
+
+ public static class IOPCSyncIO extends Unknown {
+ public IOPCSyncIO(final Pointer p) {
+ super(p);
+ }
+
+ // /* [in] */ DWORD dwCount,
+ // /* [size_is][in] */ OPCHANDLE *phServer,
+ // /* [size_is][in] */ VARIANT *pItemValues,
+ // /* [size_is][size_is][out] */ HRESULT **ppErrors) = 0;
+ public int write(
+ final int dwCount, // Data count
+ final Pointer phServer, // Server handles of items
+ final Pointer pItemValues, // Values of items
+ final PointerByReference pErrors // Error codes
+ ) {
+ return this._invokeNativeInt(
+ 4,
+ new Object[] { // Write is the 4th method in vtable
+ this.getPointer(), dwCount, phServer, pItemValues, pErrors
+ });
+ }
+ }
+
+ // /* [string] */ LPWSTR szAccessPath;
+ // /* [string] */ LPWSTR szItemID;
+ // BOOL bActive;
+ // OPCHANDLE hClient;
+ // DWORD dwBlobSize;
+ // /* [size_is] */ BYTE *pBlob;
+ // VARTYPE vtRequestedDataType;
+ // WORD wReserved;
+ public static class OPCITEMDEF extends Structure {
+ public WString szAccessPath = new WString(""); // Access path (Usually
empty)
+ public WString szItemID; // Item ID(Like "Channel1.Device1.Tag1")
+ public int bActive; // Whether to activate this item(TRUE=1, FALSE=0)
+ public int hClient; // Client handle, Used in async callback and remove
item
+ public int dwBlobSize; // BLOB size
+ public Pointer pBlob; // BLOB's pointer
+ public short vtRequestedDataType = Variant.VT_UNKNOWN; // Requested
datatype
+ public short wReserved; // Reserved
+
+ // As C structure
+ @Override
+ protected List<String> getFieldOrder() {
+ return Arrays.asList(
+ "szAccessPath",
+ "szItemID",
+ "bActive",
+ "hClient",
+ "dwBlobSize",
+ "pBlob",
+ "vtRequestedDataType",
+ "wReserved");
+ }
+ }
+
+ // OPCHANDLE hServer;
+ // VARTYPE vtCanonicalDataType;
+ // WORD wReserved;
+ // DWORD dwAccessRights;
+ // DWORD dwBlobSize;
+ // /* [size_is] */ BYTE *pBlob;
+ public static class OPCITEMRESULT extends Structure {
+ public int hServer; // Server handle, Used to write
+ public short vtCanonicalDataType; // Data type (like Variant.VT_R8)
+ public short wReserved; // Reserved word
+ public int dwAccessRights; // Access right
+ public int dwBlobSize; // BLOB size
+ public Pointer pBlob; // BLOB pointer
+
+ public OPCITEMRESULT(final Pointer pointer) {
+ super(pointer);
+ }
+
+ @Override
+ protected List<String> getFieldOrder() {
+ return Arrays.asList(
+ "hServer", "vtCanonicalDataType", "wReserved", "dwAccessRights",
"dwBlobSize", "pBlob");
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
new file mode 100644
index 00000000000..f22379737d0
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcda;
+
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import com.sun.jna.Memory;
+import com.sun.jna.Native;
+import com.sun.jna.Pointer;
+import com.sun.jna.WString;
+import com.sun.jna.platform.win32.COM.IUnknown;
+import com.sun.jna.platform.win32.COM.Unknown;
+import com.sun.jna.platform.win32.Guid;
+import com.sun.jna.platform.win32.OaIdl;
+import com.sun.jna.platform.win32.Ole32;
+import com.sun.jna.platform.win32.OleAuto;
+import com.sun.jna.platform.win32.Variant;
+import com.sun.jna.platform.win32.WTypes;
+import com.sun.jna.platform.win32.WinDef;
+import com.sun.jna.platform.win32.WinError;
+import com.sun.jna.platform.win32.WinNT;
+import com.sun.jna.ptr.IntByReference;
+import com.sun.jna.ptr.PointerByReference;
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IOPCItemMgt;
+import static
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IOPCServer;
+import static
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IOPCSyncIO;
+import static
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IUNKNOWN;
+
+public class OpcDaServerHandle implements Closeable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OpcDaServerHandle.class);
+
+ private final PointerByReference ppvServer = new PointerByReference();
+ private final OpcDaHeader.IOPCServer opcServer;
+ private final OpcDaHeader.IOPCItemMgt itemMgt;
+ private final OpcDaHeader.IOPCSyncIO syncIO;
+ private final Map<String, Integer> serverHandleMap = new
ConcurrentHashMap<>();
+ private final Map<String, Long> serverTimestampMap = new
ConcurrentHashMap<>();
+
+ // Save it here to avoid memory leakage
+ private WTypes.BSTR bstr;
+
+ OpcDaServerHandle(String clsOrProgID) {
+ final Guid.CLSID CLSID_OPC_SERVER = new Guid.CLSID(clsOrProgID);
+
+ Ole32.INSTANCE.CoInitializeEx(null, Ole32.COINIT_MULTITHREADED);
+ final PointerByReference ppvServer = new PointerByReference();
+
+ WinNT.HRESULT hr =
+ Ole32.INSTANCE.CoCreateInstance(CLSID_OPC_SERVER, null, 0x17,
IID_IOPCServer, ppvServer);
+
+ if (hr.intValue() != WinError.S_OK.intValue()) {
+ throw new PipeException(
+ "Failed to connect to server, error code: 0x" +
Integer.toHexString(hr.intValue()));
+ }
+
+ opcServer = new OpcDaHeader.IOPCServer(ppvServer.getValue());
+
+ // 3. Create group
+ final PointerByReference phServerGroup = new PointerByReference();
+ final PointerByReference phOPCGroup = new PointerByReference();
+ final IntByReference pRevisedUpdateRate = new IntByReference();
+ final int hr2 =
+ opcServer.addGroup(
+ "",
+ true,
+ 1000,
+ 0,
+ null,
+ null,
+ 0,
+ phServerGroup,
+ pRevisedUpdateRate,
+ new Guid.GUID.ByReference(IID_IUNKNOWN.getPointer()),
+ phOPCGroup);
+
+ if (hr2 == WinError.S_OK.intValue()) {
+ LOGGER.info(
+ "Create group successfully! Server handle: {}, update rate: {} ms",
+ phServerGroup.getValue(),
+ pRevisedUpdateRate.getValue());
+ } else {
+ throw new PipeException(
+ "Failed to create group,error code: 0x" +
Integer.toHexString(hr.intValue()));
+ }
+
+ final IUnknown groupUnknown = new Unknown(phOPCGroup.getValue());
+
+ // 4. Acquire IOPCItemMgt interface (To create Item)
+ final PointerByReference ppvItemMgt = new PointerByReference();
+ hr =
+ groupUnknown.QueryInterface(
+ new Guid.REFIID(new
Guid.GUID.ByReference(IID_IOPCItemMgt).getPointer()), ppvItemMgt);
+ if (hr.intValue() == WinError.S_OK.intValue()) {
+ LOGGER.info("Acquire IOPCItemMgt successfully! Interface address: {}",
ppvItemMgt.getValue());
+ } else {
+ throw new PipeException(
+ "Failed to acquire IOPCItemMgt, error code: 0x" +
Integer.toHexString(hr.intValue()));
+ }
+
+ itemMgt = new OpcDaHeader.IOPCItemMgt(ppvItemMgt.getValue());
+
+ // 5. Acquire IOPCSyncIO Interface
+ PointerByReference ppvSyncIO = new PointerByReference();
+ hr =
+ groupUnknown.QueryInterface(
+ new Guid.REFIID(new
Guid.GUID.ByReference(IID_IOPCSyncIO).getPointer()), ppvSyncIO);
+ if (hr.intValue() == WinError.S_OK.intValue()) {
+ LOGGER.info("Acquire IOPCSyncIO successfully! Interface address: {}",
ppvSyncIO.getValue());
+ } else {
+ throw new PipeException(
+ "Failed to acquire IOPCSyncIO, error code: 0x" +
Integer.toHexString(hr.intValue()));
+ }
+ syncIO = new OpcDaHeader.IOPCSyncIO(ppvSyncIO.getValue());
+ }
+
+ static String getClsIDFromProgID(final String progID) {
+ // To receive CLSID struct
+ final Guid.CLSID.ByReference pclsid = new Guid.CLSID.ByReference();
+
+ final WinNT.HRESULT hr = Ole32.INSTANCE.CLSIDFromProgID(progID, pclsid);
+
+ if (hr.intValue() == WinError.S_OK.intValue()) { // S_OK = 0
+ // Format CLSID (like "{CAE8D0E1-117B-11D5-924B-11C0F023E91C}")
+ final String clsidStr =
+ String.format(
+ "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X",
+ pclsid.Data1,
+ pclsid.Data2,
+ pclsid.Data3,
+ pclsid.Data4[0],
+ pclsid.Data4[1],
+ pclsid.Data4[2],
+ pclsid.Data4[3],
+ pclsid.Data4[4],
+ pclsid.Data4[5],
+ pclsid.Data4[6],
+ pclsid.Data4[7]);
+ LOGGER.info("Successfully converted progID {} to CLSID: {{}}", progID,
clsidStr);
+ return clsidStr;
+ } else {
+ throw new PipeException(
+ "Error: ProgID is invalid or unregistered, (HRESULT=0x"
+ + Integer.toHexString(hr.intValue())
+ + ")");
+ }
+ }
+
+ void transfer(final Tablet tablet) {
+ new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+ final List<MeasurementSchema> schemas = tablet.getSchemas();
+
+ for (int i = 0; i < schemas.size(); ++i) {
+ final String itemId =
+ tablet.deviceId + TsFileConstant.PATH_SEPARATOR +
schemas.get(i).getMeasurementId();
+ if (!serverHandleMap.containsKey(itemId)) {
+ addItem(itemId, schemas.get(i).getType());
+ }
+ for (int j = tablet.getMaxRowNumber() - 1; j >= 0; --j) {
+ if (Objects.isNull(tablet.bitMaps)
+ || Objects.isNull(tablet.bitMaps[i])
+ || !tablet.bitMaps[i].isMarked(j)) {
+ if (serverTimestampMap.get(itemId) <= tablet.timestamps[j]) {
+ writeData(
+ itemId, getTabletObjectValue4Opc(tablet.values[i], j,
schemas.get(i).getType()));
+ serverTimestampMap.put(itemId, tablet.timestamps[j]);
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ private void addItem(final String itemId, final TSDataType type) {
+ final OpcDaHeader.OPCITEMDEF[] itemDefs = new OpcDaHeader.OPCITEMDEF[1];
+ itemDefs[0] = new OpcDaHeader.OPCITEMDEF();
+ itemDefs[0].szAccessPath = new WString("");
+ itemDefs[0].szItemID = new WString(itemId + "\0");
+ itemDefs[0].bActive = 1;
+ itemDefs[0].hClient = 0;
+ itemDefs[0].dwBlobSize = 0;
+ itemDefs[0].pBlob = Pointer.NULL;
+ itemDefs[0].vtRequestedDataType = convertTsDataType2VariantType(type);
+ itemDefs[0].wReserved = 0;
+ itemDefs[0].write();
+
+ final PointerByReference ppItemResults = new PointerByReference();
+ final PointerByReference ppErrors = new PointerByReference();
+ final int hr = itemMgt.addItems(1, itemDefs, ppItemResults, ppErrors);
+
+ final Pointer pErrors = ppErrors.getValue();
+ if (Objects.nonNull(pErrors)) {
+ // Read errors
+ final int[] errors =
+ pErrors.getIntArray(0, 1); // Pick 1 element because only 1 element
is added
+ final int itemError = errors[0];
+
+ try {
+ if (itemError == WinError.S_OK.intValue()) {
+ LOGGER.debug("Successfully added item {}.", itemId);
+ } else {
+ throw new PipeException(
+ "Failed to add item "
+ + itemId
+ + ", opc error code: 0x"
+ + Integer.toHexString(itemError));
+ }
+ } finally {
+ Ole32.INSTANCE.CoTaskMemFree(pErrors);
+ }
+ }
+
+ if (hr != WinError.S_OK.intValue()) {
+ throw new PipeException("Failed to add item, win error code: 0x" +
Integer.toHexString(hr));
+ }
+
+ final Pointer pItemResults = ppItemResults.getValue();
+
+ final OpcDaHeader.OPCITEMRESULT[] itemResults = new
OpcDaHeader.OPCITEMRESULT[1];
+ itemResults[0] = new OpcDaHeader.OPCITEMRESULT(pItemResults);
+ itemResults[0].read();
+
+ serverHandleMap.put(itemId, itemResults[0].hServer);
+ serverTimestampMap.put(itemId, Long.MIN_VALUE);
+ }
+
+ private void writeData(final String itemId, final Variant.VARIANT value) {
+ final Pointer phServer = new Memory(Native.getNativeSize(int.class));
+ phServer.write(0, new int[] {serverHandleMap.get(itemId)}, 0, 1);
+
+ final PointerByReference ppErrors = new PointerByReference();
+ final int hr = syncIO.write(1, phServer, value.getPointer(), ppErrors);
+ // Free after write
+ if (Objects.nonNull(bstr)) {
+ OleAuto.INSTANCE.SysFreeString(bstr);
+ }
+
+ final Pointer pErrors = ppErrors.getValue();
+ if (Objects.nonNull(pErrors)) {
+ // Read error code array, each for a result
+ final int[] errors =
+ pErrors.getIntArray(0, 1); // Read 1 element because only 1 point is
written
+ final int itemError = errors[0];
+
+ try {
+ if (itemError != WinError.S_OK.intValue()) {
+ throw new PipeException(
+ "Failed to write "
+ + itemId
+ + ", value: "
+ + value
+ + ", opc error code: 0x"
+ + Integer.toHexString(itemError));
+ }
+ } finally {
+ Ole32.INSTANCE.CoTaskMemFree(pErrors);
+ }
+ }
+
+ if (hr != WinError.S_OK.intValue()) {
+ throw new PipeException("Failed to write, win error code: 0x" +
Integer.toHexString(hr));
+ }
+ }
+
+ private short convertTsDataType2VariantType(final TSDataType dataType) {
+ switch (dataType) {
+ case BOOLEAN:
+ return Variant.VT_BOOL;
+ case INT32:
+ return Variant.VT_I4;
+ case INT64:
+ return Variant.VT_I8;
+ case DATE:
+ case TIMESTAMP:
+ return Variant.VT_DATE;
+ case FLOAT:
+ return Variant.VT_R4;
+ case DOUBLE:
+ return Variant.VT_R8;
+ case TEXT:
+ case STRING:
+ // Note that "Variant" does not support "VT_BLOB" data, and not all
the DA server
+ // support this, thus we use "VT_BSTR" to substitute
+ case BLOB:
+ return Variant.VT_BSTR;
+ default:
+ throw new UnSupportedDataTypeException("UnSupported dataType " +
dataType);
+ }
+ }
+
+ private Variant.VARIANT getTabletObjectValue4Opc(
+ final Object column, final int rowIndex, final TSDataType type) {
+ final Variant.VARIANT value = new Variant.VARIANT();
+ switch (type) {
+ case BOOLEAN:
+ value.setValue(Variant.VT_BOOL, new OaIdl.VARIANT_BOOL(((boolean[])
column)[rowIndex]));
+ break;
+ case INT32:
+ value.setValue(Variant.VT_I4, new WinDef.LONG(((int[])
column)[rowIndex]));
+ break;
+ case DATE:
+ value.setValue(
+ Variant.VT_DATE, new OaIdl.DATE((Date.valueOf(((LocalDate[])
column)[rowIndex]))));
+ break;
+ case INT64:
+ value.setValue(Variant.VT_I8, new WinDef.LONGLONG(((long[])
column)[rowIndex]));
+ break;
+ case TIMESTAMP:
+ value.setValue(
+ Variant.VT_DATE, new OaIdl.DATE(new java.util.Date(((long[])
column)[rowIndex])));
+ break;
+ case FLOAT:
+ value.setValue(Variant.VT_R4, ((float[]) column)[rowIndex]);
+ break;
+ case DOUBLE:
+ value.setValue(Variant.VT_R8, ((double[]) column)[rowIndex]);
+ break;
+ case TEXT:
+ case STRING:
+ case BLOB:
+ bstr = OleAuto.INSTANCE.SysAllocString(((Binary[])
column)[rowIndex].toString());
+ value.setValue(Variant.VT_BSTR, bstr);
+ break;
+ default:
+ throw new UnSupportedDataTypeException("UnSupported dataType " + type);
+ }
+ return value;
+ }
+
+ @Override
+ public void close() {
+ // Help gc
+ serverTimestampMap.clear();
+ serverHandleMap.clear();
+
+ // Release resource
+ if (Objects.nonNull(ppvServer.getValue())) {
+ Ole32.INSTANCE.CoTaskMemFree(ppvServer.getValue());
+ }
+ if (Objects.nonNull(syncIO)) {
+ syncIO.Release();
+ }
+ if (Objects.nonNull(itemMgt)) {
+ itemMgt.Release();
+ }
+ if (Objects.nonNull(opcServer)) {
+ opcServer.Release();
+ }
+ // Unload COM
+ Ole32.INSTANCE.CoUninitialize();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index cfd898b62c1..78c1d8fa75f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
-import org.eclipse.milo.opcua.stack.core.UaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,8 +71,11 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
/**
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data
are converted into
- * tablets, then eventNodes to send to the subscriber clients. Notice that
there is no namespace
- * since the eventNodes do not need to be saved.
+ * tablets, and then:
+ *
+ * <p>1. In pub-sub mode, converted to eventNodes to send to the subscriber
clients.
+ *
+ * <p>2. In client-server mode, push the newest value to the local server.
*/
public class OpcUaConnector implements PipeConnector {
@@ -207,11 +209,19 @@ public class OpcUaConnector implements PipeConnector {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ transferByTablet(tabletInsertionEvent, LOGGER, tablet ->
nameSpace.transfer(tablet));
+ }
+
+ public static void transferByTablet(
+ final TabletInsertionEvent tabletInsertionEvent,
+ final Logger logger,
+ final ThrowingConsumer<Tablet, Exception> transferTablet)
+ throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
&& !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
- LOGGER.warn(
- "OpcUaConnector only support "
+ logger.warn(
+ "This Connector only support "
+ "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent. "
+ "Ignore {}.",
tabletInsertionEvent);
@@ -219,15 +229,17 @@ public class OpcUaConnector implements PipeConnector {
}
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
- transferTabletWrapper((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
+ transferTabletWrapper(
+ (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent,
transferTablet);
} else {
- transferTabletWrapper((PipeRawTabletInsertionEvent)
tabletInsertionEvent);
+ transferTabletWrapper((PipeRawTabletInsertionEvent)
tabletInsertionEvent, transferTablet);
}
}
- private void transferTabletWrapper(
- final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
- throws UaException {
+ private static void transferTabletWrapper(
+ final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent,
+ final ThrowingConsumer<Tablet, Exception> transferTablet)
+ throws Exception {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
OpcUaConnector.class.getName())) {
@@ -235,7 +247,7 @@ public class OpcUaConnector implements PipeConnector {
}
try {
for (final Tablet tablet :
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
- nameSpace.transfer(tablet);
+ transferTablet.accept(tablet);
}
} finally {
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -243,19 +255,26 @@ public class OpcUaConnector implements PipeConnector {
}
}
- private void transferTabletWrapper(final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
- throws UaException {
+ private static void transferTabletWrapper(
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent,
+ final ThrowingConsumer<Tablet, Exception> transferTablet)
+ throws Exception {
// We increase the reference count for this event to determine if the
event may be released.
if
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName()))
{
return;
}
try {
- nameSpace.transfer(pipeRawTabletInsertionEvent.convertToTablet());
+ transferTablet.accept(pipeRawTabletInsertionEvent.convertToTablet());
} finally {
pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcUaConnector.class.getName(),
false);
}
}
+ @FunctionalInterface
+ public interface ThrowingConsumer<T, E extends Exception> {
+ void accept(final T t) throws E;
+ }
+
@Override
public void close() throws Exception {
if (serverKey == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
index 104d9120d86..87b285f1905 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
@@ -182,7 +182,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
}
int lastNonnullIndex = -1;
- for (int j = 0; j < tablet.rowSize; ++j) {
+ for (int j = tablet.rowSize - 1; j >= 0; --j) {
if (!tablet.bitMaps[i].isMarked(j)) {
lastNonnullIndex = j;
break;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 67f3a0265af..deefd1c89ee 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSyncConnector;
+import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcda.OpcDaConnector;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcua.OpcUaConnector;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.websocket.WebSocketConnector;
import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.writeback.WriteBackConnector;
@@ -86,6 +87,7 @@ public enum BuiltinPipePlugin {
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
+ OPC_DA_CONNECTOR("opc-da-connector", OpcDaConnector.class),
WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class),
DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
@@ -97,6 +99,7 @@ public enum BuiltinPipePlugin {
IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class),
WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class),
OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class),
+ OPC_DA_SINK("opc-da-sink", OpcDaConnector.class),
WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class),
SUBSCRIPTION_SINK("subscription-sink", DoNothingConnector.class),
PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink",
PipeConsensusAsyncConnector.class),
@@ -153,6 +156,7 @@ public enum BuiltinPipePlugin {
IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
+ OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
// Sinks
@@ -161,6 +165,7 @@ public enum BuiltinPipePlugin {
IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
OPC_UA_SINK.getPipePluginName().toUpperCase(),
+ OPC_DA_SINK.getPipePluginName().toUpperCase(),
WRITE_BACK_SINK.getPipePluginName().toUpperCase(),
SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcda/OpcDaConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcda/OpcDaConnector.java
new file mode 100644
index 00000000000..25df1d46fb8
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcda/OpcDaConnector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcda;
+
+import
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents
the OPC DA connector.
+ * There is a real implementation in the server module but cannot be imported
here. The pipe agent
+ * in the server module will replace this class with the real implementation
when initializing the
+ * OPC DA connector.
+ */
+public class OpcDaConnector extends PlaceholderConnector {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 1c5b442c3a1..13a9cd8a7f9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -247,6 +247,12 @@ public class PipeConnectorConstant {
public static final String SINK_MARK_AS_PIPE_REQUEST_KEY =
"sink.mark-as-pipe-request";
public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE =
true;
+ public static final String CONNECTOR_OPC_DA_CLSID_KEY =
"connector.opcda.clsid";
+ public static final String SINK_OPC_DA_CLSID_KEY = "sink.opcda.clsid";
+
+ public static final String CONNECTOR_OPC_DA_PROGID_KEY =
"connector.opcda.progid";
+ public static final String SINK_OPC_DA_PROGID_KEY = "sink.opcda.progid";
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git a/pom.xml b/pom.xml
index e30e2954e08..b205b7eb619 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,6 +226,11 @@
<artifactId>jna</artifactId>
<version>${jna.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.java.dev.jna</groupId>
+ <artifactId>jna-platform</artifactId>
+ <version>${jna.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>