This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new d1249daa902 Pipe: Implemented OPC DA Sink for local COM & Fixed the 
newest value of OPC UA Sink (#14964) (#14978)
d1249daa902 is described below

commit d1249daa902eaba92a7af0e5230c798b449ed1bf
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 28 11:23:53 2025 +0800

    Pipe: Implemented OPC DA Sink for local COM & Fixed the newest value of OPC 
UA Sink (#14964) (#14978)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 iotdb-core/datanode/pom.xml                        |   8 +
 .../PipeDataRegionConnectorConstructor.java        |   4 +
 .../connector/protocol/opcda/OpcDaConnector.java   | 136 ++++++++
 .../pipe/connector/protocol/opcda/OpcDaHeader.java | 200 +++++++++++
 .../protocol/opcda/OpcDaServerHandle.java          | 387 +++++++++++++++++++++
 .../connector/protocol/opcua/OpcUaConnector.java   |  47 ++-
 .../connector/protocol/opcua/OpcUaNameSpace.java   |   2 +-
 .../agent/plugin/builtin/BuiltinPipePlugin.java    |   5 +
 .../builtin/connector/opcda/OpcDaConnector.java    |  30 ++
 .../config/constant/PipeConnectorConstant.java     |   6 +
 pom.xml                                            |   5 +
 11 files changed, 815 insertions(+), 15 deletions(-)

diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 6194b6272a4..ae4ae4c62b2 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -165,6 +165,14 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna-platform</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.jsonwebtoken</groupId>
             <artifactId>jjwt-api</artifactId>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
index e11e8dc5685..c95fec2b4bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionConnectorConstructor.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeConnectorConst
 import 
org.apache.iotdb.commons.pipe.agent.plugin.meta.DataNodePipePluginMetaKeeper;
 import 
org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBDataRegionAirGapConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaConnector;
 import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -65,6 +66,8 @@ class PipeDataRegionConnectorConstructor extends 
PipeConnectorConstructor {
         BuiltinPipePlugin.WEBSOCKET_CONNECTOR.getPipePluginName(), 
WebSocketConnector::new);
     pluginConstructors.put(
         BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), 
OpcUaConnector::new);
+    pluginConstructors.put(
+        BuiltinPipePlugin.OPC_DA_CONNECTOR.getPipePluginName(), 
OpcDaConnector::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), 
DoNothingConnector::new);
     pluginConstructors.put(
@@ -91,6 +94,7 @@ class PipeDataRegionConnectorConstructor extends 
PipeConnectorConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.WEBSOCKET_SINK.getPipePluginName(), 
WebSocketConnector::new);
     pluginConstructors.put(BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), 
OpcUaConnector::new);
+    pluginConstructors.put(BuiltinPipePlugin.OPC_DA_SINK.getPipePluginName(), 
OpcDaConnector::new);
     pluginConstructors.put(
         BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), 
DoNothingConnector::new);
     pluginConstructors.put(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaConnector.java
new file mode 100644
index 00000000000..8bd54d9c0a8
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaConnector.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcda;
+
+import org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_CLSID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_OPC_DA_PROGID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_CLSID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_OPC_DA_PROGID_KEY;
+
+/**
+ * Send data in IoTDB based on Opc Da protocol, using JNA. All data are 
converted into tablets, and
+ * then push the newest value to the <b>local COM</b> server in another 
process.
+ */
+public class OpcDaConnector implements PipeConnector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcDaConnector.class);
+  private static final Map<String, Pair<AtomicInteger, OpcDaServerHandle>>
+      CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP = new ConcurrentHashMap<>();
+  private String clsID;
+  private OpcDaServerHandle handle;
+
+  @Override
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
+    // TODO: upgrade this logic after "1 in 2" logic is supported
+    validator.validate(
+        args ->
+            (((boolean) args[1] || (boolean) args[2] || (boolean) args[3] || 
(boolean) args[4])),
+        String.format(
+            "One of '%s', '%s', '%s' and '%s' must be specified",
+            SINK_OPC_DA_CLSID_KEY,
+            CONNECTOR_OPC_DA_CLSID_KEY,
+            SINK_OPC_DA_PROGID_KEY,
+            CONNECTOR_OPC_DA_PROGID_KEY),
+        validator.getParameters().hasAttribute(SINK_OPC_DA_CLSID_KEY),
+        validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_CLSID_KEY),
+        validator.getParameters().hasAttribute(SINK_OPC_DA_PROGID_KEY),
+        validator.getParameters().hasAttribute(CONNECTOR_OPC_DA_PROGID_KEY));
+  }
+
+  @Override
+  public void customize(
+      final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
+      throws Exception {
+    synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
+      clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY, 
SINK_OPC_DA_CLSID_KEY);
+      if (Objects.isNull(clsID)) {
+        clsID =
+            OpcDaServerHandle.getClsIDFromProgID(
+                parameters.getStringByKeys(CONNECTOR_OPC_DA_PROGID_KEY, 
SINK_OPC_DA_PROGID_KEY));
+      }
+      handle =
+          CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP
+              .computeIfAbsent(
+                  clsID, key -> new Pair<>(new AtomicInteger(0), new 
OpcDaServerHandle(clsID)))
+              .getRight();
+      
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID).getLeft().incrementAndGet();
+    }
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void heartbeat() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    OpcUaConnector.transferByTablet(
+        tabletInsertionEvent, LOGGER, tablet -> handle.transfer(tablet));
+  }
+
+  @Override
+  public void transfer(final Event event) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (Objects.isNull(clsID)) {
+      return;
+    }
+
+    synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
+      final Pair<AtomicInteger, OpcDaServerHandle> pair =
+          CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID);
+      if (pair == null) {
+        return;
+      }
+
+      if (pair.getLeft().decrementAndGet() <= 0) {
+        try {
+          pair.getRight().close();
+        } finally {
+          CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(clsID);
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaHeader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaHeader.java
new file mode 100644
index 00000000000..878a719b12e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaHeader.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcda;
+
+import com.sun.jna.Pointer;
+import com.sun.jna.Structure;
+import com.sun.jna.WString;
+import com.sun.jna.platform.win32.COM.Unknown;
+import com.sun.jna.platform.win32.Guid;
+import com.sun.jna.platform.win32.Variant;
+import com.sun.jna.ptr.IntByReference;
+import com.sun.jna.ptr.PointerByReference;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** We define the OPC DA Classes and interfaces here like C's .h file. */
+public class OpcDaHeader {
+  // IOPCServer
+  static final Guid.IID IID_IOPCServer = new 
Guid.IID("39C13A4D-011E-11D0-9675-0020AFD8ADB3");
+
+  // IOPCItemMgt
+  static final Guid.IID IID_IOPCItemMgt = new 
Guid.IID("39C13A54-011E-11D0-9675-0020AFD8ADB3");
+
+  // IOPCSyncIO
+  static final Guid.IID IID_IOPCSyncIO = new 
Guid.IID("39C13A52-011E-11D0-9675-0020AFD8ADB3");
+
+  // IUnknown
+  static final Guid.IID IID_IUNKNOWN = new 
Guid.IID("00000000-0000-0000-C000-000000000046");
+
+  public static class IOPCServer extends Unknown {
+    public IOPCServer(final Pointer p) {
+      super(p);
+    }
+
+    // /* [string][in] */ LPCWSTR szName,
+    // /* [in] */ BOOL bActive,
+    // /* [in] */ DWORD dwRequestedUpdateRate,
+    // /* [in] */ OPCHANDLE hClientGroup,
+    // /* [in][unique] */ LONG *pTimeBias,
+    // /* [in][unique] */ FLOAT *pPercentDeadband,
+    // /* [in] */ DWORD dwLCID,
+    // /* [out] */ OPCHANDLE *phServerGroup,
+    // /* [out] */ DWORD *pRevisedUpdateRate,
+    // /* [in] */ REFIID riid,
+    // /* [iid_is][out] */ LPUNKNOWN *ppUnk) = 0;
+    public int addGroup(
+        final String szName, // Group name ("" means auto)
+        final boolean bActive, // Whether to activate the group
+        final int dwRequestedUpdateRate, // The update rate of request (ms)
+        final int hClientGroup, // The handle of client group
+        final Pointer pTimeBias, // Time zone bias
+        final Pointer pPercentDeadband, // Dead band
+        final int dwLCID, // Region ID
+        final PointerByReference phServerGroup, // Server group handler
+        final IntByReference pRevisedUpdateRate, // Real update rate
+        final Guid.GUID.ByReference riid, // Interface IID
+        final PointerByReference ppUnk // The OPC Group pointer returned
+        ) {
+      // Convert Java string into COM "bstr"
+      final WString wName = new WString(szName);
+
+      return this._invokeNativeInt(
+          3,
+          new Object[] {
+            this.getPointer(),
+            wName,
+            bActive ? 1 : 0,
+            dwRequestedUpdateRate,
+            hClientGroup,
+            pTimeBias,
+            pPercentDeadband,
+            dwLCID,
+            phServerGroup,
+            pRevisedUpdateRate,
+            riid != null ? riid.getPointer() : null,
+            ppUnk
+          });
+    }
+  }
+
+  // IOPCItemMgt(
+  // /* [in] */ DWORD dwCount,
+  // /* [in] */ OPCITEMDEF *pItemArray,
+  // /* [out] */ OPCITEMRESULT **ppAddResults,
+  // /* [out] */ HRESULT **ppErrors) = 0;
+  public static class IOPCItemMgt extends Unknown {
+    public IOPCItemMgt(final Pointer p) {
+      super(p);
+    }
+
+    public int addItems(
+        final int dwCount, // Data count
+        final OPCITEMDEF[] pItemArray, // Items array to create
+        final PointerByReference pResults, // Results' handles
+        final PointerByReference pErrors // Error's pointers
+        ) {
+      return this._invokeNativeInt(
+          3, new Object[] {this.getPointer(), dwCount, pItemArray, pResults, 
pErrors});
+    }
+  }
+
+  public static class IOPCSyncIO extends Unknown {
+    public IOPCSyncIO(final Pointer p) {
+      super(p);
+    }
+
+    // /* [in] */ DWORD dwCount,
+    // /* [size_is][in] */ OPCHANDLE *phServer,
+    // /* [size_is][in] */ VARIANT *pItemValues,
+    // /* [size_is][size_is][out] */ HRESULT **ppErrors) = 0;
+    public int write(
+        final int dwCount, // Data count
+        final Pointer phServer, // Server handles of items
+        final Pointer pItemValues, // Values of items
+        final PointerByReference pErrors // Error codes
+        ) {
+      return this._invokeNativeInt(
+          4,
+          new Object[] { // Write is the 4th method in vtable
+            this.getPointer(), dwCount, phServer, pItemValues, pErrors
+          });
+    }
+  }
+
+  // /* [string] */ LPWSTR szAccessPath;
+  // /* [string] */ LPWSTR szItemID;
+  // BOOL bActive;
+  // OPCHANDLE hClient;
+  // DWORD dwBlobSize;
+  // /* [size_is] */ BYTE *pBlob;
+  // VARTYPE vtRequestedDataType;
+  // WORD wReserved;
+  public static class OPCITEMDEF extends Structure {
+    public WString szAccessPath = new WString(""); // Access path (Usually 
empty)
+    public WString szItemID; // Item ID(Like "Channel1.Device1.Tag1")
+    public int bActive; // Whether to activate this item(TRUE=1, FALSE=0)
+    public int hClient; // Client handle, Used in async callback and remove 
item
+    public int dwBlobSize; // BLOB size
+    public Pointer pBlob; // BLOB's pointer
+    public short vtRequestedDataType = Variant.VT_UNKNOWN; // Requested 
datatype
+    public short wReserved; // Reserved
+
+    // As C structure
+    @Override
+    protected List<String> getFieldOrder() {
+      return Arrays.asList(
+          "szAccessPath",
+          "szItemID",
+          "bActive",
+          "hClient",
+          "dwBlobSize",
+          "pBlob",
+          "vtRequestedDataType",
+          "wReserved");
+    }
+  }
+
+  // OPCHANDLE hServer;
+  // VARTYPE vtCanonicalDataType;
+  // WORD wReserved;
+  // DWORD dwAccessRights;
+  // DWORD dwBlobSize;
+  // /* [size_is] */ BYTE *pBlob;
+  public static class OPCITEMRESULT extends Structure {
+    public int hServer; // Server handle, Used to write
+    public short vtCanonicalDataType; // Data type (like Variant.VT_R8)
+    public short wReserved; // Reserved word
+    public int dwAccessRights; // Access right
+    public int dwBlobSize; // BLOB size
+    public Pointer pBlob; // BLOB pointer
+
+    public OPCITEMRESULT(final Pointer pointer) {
+      super(pointer);
+    }
+
+    @Override
+    protected List<String> getFieldOrder() {
+      return Arrays.asList(
+          "hServer", "vtCanonicalDataType", "wReserved", "dwAccessRights", 
"dwBlobSize", "pBlob");
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
new file mode 100644
index 00000000000..f22379737d0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcda/OpcDaServerHandle.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.opcda;
+
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import com.sun.jna.Memory;
+import com.sun.jna.Native;
+import com.sun.jna.Pointer;
+import com.sun.jna.WString;
+import com.sun.jna.platform.win32.COM.IUnknown;
+import com.sun.jna.platform.win32.COM.Unknown;
+import com.sun.jna.platform.win32.Guid;
+import com.sun.jna.platform.win32.OaIdl;
+import com.sun.jna.platform.win32.Ole32;
+import com.sun.jna.platform.win32.OleAuto;
+import com.sun.jna.platform.win32.Variant;
+import com.sun.jna.platform.win32.WTypes;
+import com.sun.jna.platform.win32.WinDef;
+import com.sun.jna.platform.win32.WinError;
+import com.sun.jna.platform.win32.WinNT;
+import com.sun.jna.ptr.IntByReference;
+import com.sun.jna.ptr.PointerByReference;
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IOPCItemMgt;
+import static 
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IOPCServer;
+import static 
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IOPCSyncIO;
+import static 
org.apache.iotdb.db.pipe.connector.protocol.opcda.OpcDaHeader.IID_IUNKNOWN;
+
+public class OpcDaServerHandle implements Closeable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcDaServerHandle.class);
+
+  private final PointerByReference ppvServer = new PointerByReference();
+  private final OpcDaHeader.IOPCServer opcServer;
+  private final OpcDaHeader.IOPCItemMgt itemMgt;
+  private final OpcDaHeader.IOPCSyncIO syncIO;
+  private final Map<String, Integer> serverHandleMap = new 
ConcurrentHashMap<>();
+  private final Map<String, Long> serverTimestampMap = new 
ConcurrentHashMap<>();
+
+  // Save it here to avoid memory leakage
+  private WTypes.BSTR bstr;
+
+  OpcDaServerHandle(String clsOrProgID) {
+    final Guid.CLSID CLSID_OPC_SERVER = new Guid.CLSID(clsOrProgID);
+
+    Ole32.INSTANCE.CoInitializeEx(null, Ole32.COINIT_MULTITHREADED);
+    final PointerByReference ppvServer = new PointerByReference();
+
+    WinNT.HRESULT hr =
+        Ole32.INSTANCE.CoCreateInstance(CLSID_OPC_SERVER, null, 0x17, 
IID_IOPCServer, ppvServer);
+
+    if (hr.intValue() != WinError.S_OK.intValue()) {
+      throw new PipeException(
+          "Failed to connect to server, error code: 0x" + 
Integer.toHexString(hr.intValue()));
+    }
+
+    opcServer = new OpcDaHeader.IOPCServer(ppvServer.getValue());
+
+    // 3. Create group
+    final PointerByReference phServerGroup = new PointerByReference();
+    final PointerByReference phOPCGroup = new PointerByReference();
+    final IntByReference pRevisedUpdateRate = new IntByReference();
+    final int hr2 =
+        opcServer.addGroup(
+            "",
+            true,
+            1000,
+            0,
+            null,
+            null,
+            0,
+            phServerGroup,
+            pRevisedUpdateRate,
+            new Guid.GUID.ByReference(IID_IUNKNOWN.getPointer()),
+            phOPCGroup);
+
+    if (hr2 == WinError.S_OK.intValue()) {
+      LOGGER.info(
+          "Create group successfully! Server handle: {}, update rate: {} ms",
+          phServerGroup.getValue(),
+          pRevisedUpdateRate.getValue());
+    } else {
+      throw new PipeException(
+          "Failed to create group,error code: 0x" + 
Integer.toHexString(hr.intValue()));
+    }
+
+    final IUnknown groupUnknown = new Unknown(phOPCGroup.getValue());
+
+    // 4. Acquire IOPCItemMgt interface (To create Item)
+    final PointerByReference ppvItemMgt = new PointerByReference();
+    hr =
+        groupUnknown.QueryInterface(
+            new Guid.REFIID(new 
Guid.GUID.ByReference(IID_IOPCItemMgt).getPointer()), ppvItemMgt);
+    if (hr.intValue() == WinError.S_OK.intValue()) {
+      LOGGER.info("Acquire IOPCItemMgt successfully! Interface address: {}", 
ppvItemMgt.getValue());
+    } else {
+      throw new PipeException(
+          "Failed to acquire IOPCItemMgt, error code: 0x" + 
Integer.toHexString(hr.intValue()));
+    }
+
+    itemMgt = new OpcDaHeader.IOPCItemMgt(ppvItemMgt.getValue());
+
+    // 5. Acquire IOPCSyncIO Interface
+    PointerByReference ppvSyncIO = new PointerByReference();
+    hr =
+        groupUnknown.QueryInterface(
+            new Guid.REFIID(new 
Guid.GUID.ByReference(IID_IOPCSyncIO).getPointer()), ppvSyncIO);
+    if (hr.intValue() == WinError.S_OK.intValue()) {
+      LOGGER.info("Acquire IOPCSyncIO successfully! Interface address: {}", 
ppvSyncIO.getValue());
+    } else {
+      throw new PipeException(
+          "Failed to acquire IOPCSyncIO, error code: 0x" + 
Integer.toHexString(hr.intValue()));
+    }
+    syncIO = new OpcDaHeader.IOPCSyncIO(ppvSyncIO.getValue());
+  }
+
+  static String getClsIDFromProgID(final String progID) {
+    // To receive CLSID struct
+    final Guid.CLSID.ByReference pclsid = new Guid.CLSID.ByReference();
+
+    final WinNT.HRESULT hr = Ole32.INSTANCE.CLSIDFromProgID(progID, pclsid);
+
+    if (hr.intValue() == WinError.S_OK.intValue()) { // S_OK = 0
+      // Format CLSID (like "{CAE8D0E1-117B-11D5-924B-11C0F023E91C}")
+      final String clsidStr =
+          String.format(
+              "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X",
+              pclsid.Data1,
+              pclsid.Data2,
+              pclsid.Data3,
+              pclsid.Data4[0],
+              pclsid.Data4[1],
+              pclsid.Data4[2],
+              pclsid.Data4[3],
+              pclsid.Data4[4],
+              pclsid.Data4[5],
+              pclsid.Data4[6],
+              pclsid.Data4[7]);
+      LOGGER.info("Successfully converted progID {} to CLSID: {{}}", progID, 
clsidStr);
+      return clsidStr;
+    } else {
+      throw new PipeException(
+          "Error: ProgID is invalid or unregistered, (HRESULT=0x"
+              + Integer.toHexString(hr.intValue())
+              + ")");
+    }
+  }
+
+  void transfer(final Tablet tablet) {
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+    final List<MeasurementSchema> schemas = tablet.getSchemas();
+
+    for (int i = 0; i < schemas.size(); ++i) {
+      final String itemId =
+          tablet.deviceId + TsFileConstant.PATH_SEPARATOR + 
schemas.get(i).getMeasurementId();
+      if (!serverHandleMap.containsKey(itemId)) {
+        addItem(itemId, schemas.get(i).getType());
+      }
+      for (int j = tablet.getMaxRowNumber() - 1; j >= 0; --j) {
+        if (Objects.isNull(tablet.bitMaps)
+            || Objects.isNull(tablet.bitMaps[i])
+            || !tablet.bitMaps[i].isMarked(j)) {
+          if (serverTimestampMap.get(itemId) <= tablet.timestamps[j]) {
+            writeData(
+                itemId, getTabletObjectValue4Opc(tablet.values[i], j, 
schemas.get(i).getType()));
+            serverTimestampMap.put(itemId, tablet.timestamps[j]);
+          }
+          break;
+        }
+      }
+    }
+  }
+
+  private void addItem(final String itemId, final TSDataType type) {
+    final OpcDaHeader.OPCITEMDEF[] itemDefs = new OpcDaHeader.OPCITEMDEF[1];
+    itemDefs[0] = new OpcDaHeader.OPCITEMDEF();
+    itemDefs[0].szAccessPath = new WString("");
+    itemDefs[0].szItemID = new WString(itemId + "\0");
+    itemDefs[0].bActive = 1;
+    itemDefs[0].hClient = 0;
+    itemDefs[0].dwBlobSize = 0;
+    itemDefs[0].pBlob = Pointer.NULL;
+    itemDefs[0].vtRequestedDataType = convertTsDataType2VariantType(type);
+    itemDefs[0].wReserved = 0;
+    itemDefs[0].write();
+
+    final PointerByReference ppItemResults = new PointerByReference();
+    final PointerByReference ppErrors = new PointerByReference();
+    final int hr = itemMgt.addItems(1, itemDefs, ppItemResults, ppErrors);
+
+    final Pointer pErrors = ppErrors.getValue();
+    if (Objects.nonNull(pErrors)) {
+      // Read errors
+      final int[] errors =
+          pErrors.getIntArray(0, 1); // Pick 1 element because only 1 element 
is added
+      final int itemError = errors[0];
+
+      try {
+        if (itemError == WinError.S_OK.intValue()) {
+          LOGGER.debug("Successfully added item {}.", itemId);
+        } else {
+          throw new PipeException(
+              "Failed to add item "
+                  + itemId
+                  + ", opc error code: 0x"
+                  + Integer.toHexString(itemError));
+        }
+      } finally {
+        Ole32.INSTANCE.CoTaskMemFree(pErrors);
+      }
+    }
+
+    if (hr != WinError.S_OK.intValue()) {
+      throw new PipeException("Failed to add item, win error code: 0x" + 
Integer.toHexString(hr));
+    }
+
+    final Pointer pItemResults = ppItemResults.getValue();
+
+    final OpcDaHeader.OPCITEMRESULT[] itemResults = new 
OpcDaHeader.OPCITEMRESULT[1];
+    itemResults[0] = new OpcDaHeader.OPCITEMRESULT(pItemResults);
+    itemResults[0].read();
+
+    serverHandleMap.put(itemId, itemResults[0].hServer);
+    serverTimestampMap.put(itemId, Long.MIN_VALUE);
+  }
+
+  private void writeData(final String itemId, final Variant.VARIANT value) {
+    final Pointer phServer = new Memory(Native.getNativeSize(int.class));
+    phServer.write(0, new int[] {serverHandleMap.get(itemId)}, 0, 1);
+
+    final PointerByReference ppErrors = new PointerByReference();
+    final int hr = syncIO.write(1, phServer, value.getPointer(), ppErrors);
+    // Free after write
+    if (Objects.nonNull(bstr)) {
+      OleAuto.INSTANCE.SysFreeString(bstr);
+    }
+
+    final Pointer pErrors = ppErrors.getValue();
+    if (Objects.nonNull(pErrors)) {
+      // Read error code array, each for a result
+      final int[] errors =
+          pErrors.getIntArray(0, 1); // Read 1 element because only 1 point is 
written
+      final int itemError = errors[0];
+
+      try {
+        if (itemError != WinError.S_OK.intValue()) {
+          throw new PipeException(
+              "Failed to write "
+                  + itemId
+                  + ", value: "
+                  + value
+                  + ", opc error code: 0x"
+                  + Integer.toHexString(itemError));
+        }
+      } finally {
+        Ole32.INSTANCE.CoTaskMemFree(pErrors);
+      }
+    }
+
+    if (hr != WinError.S_OK.intValue()) {
+      throw new PipeException("Failed to write, win error code: 0x" + 
Integer.toHexString(hr));
+    }
+  }
+
+  private short convertTsDataType2VariantType(final TSDataType dataType) {
+    switch (dataType) {
+      case BOOLEAN:
+        return Variant.VT_BOOL;
+      case INT32:
+        return Variant.VT_I4;
+      case INT64:
+        return Variant.VT_I8;
+      case DATE:
+      case TIMESTAMP:
+        return Variant.VT_DATE;
+      case FLOAT:
+        return Variant.VT_R4;
+      case DOUBLE:
+        return Variant.VT_R8;
+      case TEXT:
+      case STRING:
+        // Note that "Variant" does not support "VT_BLOB" data, and not all 
the DA server
+        // support this, thus we use "VT_BSTR" to substitute
+      case BLOB:
+        return Variant.VT_BSTR;
+      default:
+        throw new UnSupportedDataTypeException("UnSupported dataType " + 
dataType);
+    }
+  }
+
+  private Variant.VARIANT getTabletObjectValue4Opc(
+      final Object column, final int rowIndex, final TSDataType type) {
+    final Variant.VARIANT value = new Variant.VARIANT();
+    switch (type) {
+      case BOOLEAN:
+        value.setValue(Variant.VT_BOOL, new OaIdl.VARIANT_BOOL(((boolean[]) 
column)[rowIndex]));
+        break;
+      case INT32:
+        value.setValue(Variant.VT_I4, new WinDef.LONG(((int[]) 
column)[rowIndex]));
+        break;
+      case DATE:
+        value.setValue(
+            Variant.VT_DATE, new OaIdl.DATE((Date.valueOf(((LocalDate[]) 
column)[rowIndex]))));
+        break;
+      case INT64:
+        value.setValue(Variant.VT_I8, new WinDef.LONGLONG(((long[]) 
column)[rowIndex]));
+        break;
+      case TIMESTAMP:
+        value.setValue(
+            Variant.VT_DATE, new OaIdl.DATE(new java.util.Date(((long[]) 
column)[rowIndex])));
+        break;
+      case FLOAT:
+        value.setValue(Variant.VT_R4, ((float[]) column)[rowIndex]);
+        break;
+      case DOUBLE:
+        value.setValue(Variant.VT_R8, ((double[]) column)[rowIndex]);
+        break;
+      case TEXT:
+      case STRING:
+      case BLOB:
+        bstr = OleAuto.INSTANCE.SysAllocString(((Binary[]) 
column)[rowIndex].toString());
+        value.setValue(Variant.VT_BSTR, bstr);
+        break;
+      default:
+        throw new UnSupportedDataTypeException("UnSupported dataType " + type);
+    }
+    return value;
+  }
+
+  @Override
+  public void close() {
+    // Help gc
+    serverTimestampMap.clear();
+    serverHandleMap.clear();
+
+    // Release resource
+    if (Objects.nonNull(ppvServer.getValue())) {
+      Ole32.INSTANCE.CoTaskMemFree(ppvServer.getValue());
+    }
+    if (Objects.nonNull(syncIO)) {
+      syncIO.Release();
+    }
+    if (Objects.nonNull(itemMgt)) {
+      itemMgt.Release();
+    }
+    if (Objects.nonNull(opcServer)) {
+      opcServer.Release();
+    }
+    // Unload COM
+    Ole32.INSTANCE.CoUninitialize();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index cfd898b62c1..78c1d8fa75f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.record.Tablet;
 import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
-import org.eclipse.milo.opcua.stack.core.UaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,8 +71,11 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 
 /**
  * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data 
are converted into
- * tablets, then eventNodes to send to the subscriber clients. Notice that 
there is no namespace
- * since the eventNodes do not need to be saved.
+ * tablets, and then:
+ *
+ * <p>1. In pub-sub mode, converted to eventNodes to send to the subscriber 
clients.
+ *
+ * <p>2. In client-server mode, push the newest value to the local server.
  */
 public class OpcUaConnector implements PipeConnector {
 
@@ -207,11 +209,19 @@ public class OpcUaConnector implements PipeConnector {
 
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    transferByTablet(tabletInsertionEvent, LOGGER, tablet -> 
nameSpace.transfer(tablet));
+  }
+
+  public static void transferByTablet(
+      final TabletInsertionEvent tabletInsertionEvent,
+      final Logger logger,
+      final ThrowingConsumer<Tablet, Exception> transferTablet)
+      throws Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
     if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
         && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
-      LOGGER.warn(
-          "OpcUaConnector only support "
+      logger.warn(
+          "This Connector only support "
               + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent. "
               + "Ignore {}.",
           tabletInsertionEvent);
@@ -219,15 +229,17 @@ public class OpcUaConnector implements PipeConnector {
     }
 
     if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-      transferTabletWrapper((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
+      transferTabletWrapper(
+          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent, 
transferTablet);
     } else {
-      transferTabletWrapper((PipeRawTabletInsertionEvent) 
tabletInsertionEvent);
+      transferTabletWrapper((PipeRawTabletInsertionEvent) 
tabletInsertionEvent, transferTablet);
     }
   }
 
-  private void transferTabletWrapper(
-      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
-      throws UaException {
+  private static void transferTabletWrapper(
+      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent,
+      final ThrowingConsumer<Tablet, Exception> transferTablet)
+      throws Exception {
     // We increase the reference count for this event to determine if the 
event may be released.
     if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
         OpcUaConnector.class.getName())) {
@@ -235,7 +247,7 @@ public class OpcUaConnector implements PipeConnector {
     }
     try {
       for (final Tablet tablet : 
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
-        nameSpace.transfer(tablet);
+        transferTablet.accept(tablet);
       }
     } finally {
       pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -243,19 +255,26 @@ public class OpcUaConnector implements PipeConnector {
     }
   }
 
-  private void transferTabletWrapper(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
-      throws UaException {
+  private static void transferTabletWrapper(
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent,
+      final ThrowingConsumer<Tablet, Exception> transferTablet)
+      throws Exception {
     // We increase the reference count for this event to determine if the 
event may be released.
     if 
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName()))
 {
       return;
     }
     try {
-      nameSpace.transfer(pipeRawTabletInsertionEvent.convertToTablet());
+      transferTablet.accept(pipeRawTabletInsertionEvent.convertToTablet());
     } finally {
       
pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcUaConnector.class.getName(),
 false);
     }
   }
 
+  @FunctionalInterface
+  public interface ThrowingConsumer<T, E extends Exception> {
+    void accept(final T t) throws E;
+  }
+
   @Override
   public void close() throws Exception {
     if (serverKey == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
index 104d9120d86..87b285f1905 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaNameSpace.java
@@ -182,7 +182,7 @@ public class OpcUaNameSpace extends 
ManagedNamespaceWithLifecycle {
       }
 
       int lastNonnullIndex = -1;
-      for (int j = 0; j < tablet.rowSize; ++j) {
+      for (int j = tablet.rowSize - 1; j >= 0; --j) {
         if (!tablet.bitMaps[i].isMarked(j)) {
           lastNonnullIndex = j;
           break;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 67f3a0265af..deefd1c89ee 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSyncConnector;
+import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcda.OpcDaConnector;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcua.OpcUaConnector;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.websocket.WebSocketConnector;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.writeback.WriteBackConnector;
@@ -86,6 +87,7 @@ public enum BuiltinPipePlugin {
 
   WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
   OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
+  OPC_DA_CONNECTOR("opc-da-connector", OpcDaConnector.class),
   WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class),
 
   DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
@@ -97,6 +99,7 @@ public enum BuiltinPipePlugin {
   IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class),
   WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class),
   OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class),
+  OPC_DA_SINK("opc-da-sink", OpcDaConnector.class),
   WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class),
   SUBSCRIPTION_SINK("subscription-sink", DoNothingConnector.class),
   PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", 
PipeConsensusAsyncConnector.class),
@@ -153,6 +156,7 @@ public enum BuiltinPipePlugin {
                   IOTDB_AIR_GAP_CONNECTOR.getPipePluginName().toUpperCase(),
                   WEBSOCKET_CONNECTOR.getPipePluginName().toUpperCase(),
                   OPC_UA_CONNECTOR.getPipePluginName().toUpperCase(),
+                  OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(),
                   WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(),
                   
PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(),
                   // Sinks
@@ -161,6 +165,7 @@ public enum BuiltinPipePlugin {
                   IOTDB_LEGACY_PIPE_SINK.getPipePluginName().toUpperCase(),
                   WEBSOCKET_SINK.getPipePluginName().toUpperCase(),
                   OPC_UA_SINK.getPipePluginName().toUpperCase(),
+                  OPC_DA_SINK.getPipePluginName().toUpperCase(),
                   WRITE_BACK_SINK.getPipePluginName().toUpperCase(),
                   SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(),
                   
PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase())));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcda/OpcDaConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcda/OpcDaConnector.java
new file mode 100644
index 00000000000..25df1d46fb8
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/connector/opcda/OpcDaConnector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.opcda;
+
+import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.PlaceholderConnector;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents 
the OPC DA connector.
+ * There is a real implementation in the server module but cannot be imported 
here. The pipe agent
+ * in the server module will replace this class with the real implementation 
when initializing the
+ * OPC DA connector.
+ */
+public class OpcDaConnector extends PlaceholderConnector {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 1c5b442c3a1..13a9cd8a7f9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -247,6 +247,12 @@ public class PipeConnectorConstant {
   public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = 
"sink.mark-as-pipe-request";
   public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = 
true;
 
+  public static final String CONNECTOR_OPC_DA_CLSID_KEY = 
"connector.opcda.clsid";
+  public static final String SINK_OPC_DA_CLSID_KEY = "sink.opcda.clsid";
+
+  public static final String CONNECTOR_OPC_DA_PROGID_KEY = 
"connector.opcda.progid";
+  public static final String SINK_OPC_DA_PROGID_KEY = "sink.opcda.progid";
+
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
   }
diff --git a/pom.xml b/pom.xml
index e30e2954e08..b205b7eb619 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,6 +226,11 @@
                 <artifactId>jna</artifactId>
                 <version>${jna.version}</version>
             </dependency>
+            <dependency>
+                <groupId>net.java.dev.jna</groupId>
+                <artifactId>jna-platform</artifactId>
+                <version>${jna.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-csv</artifactId>


Reply via email to