This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch opc-da
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3dbae5cb2d7da75c9fb35f6ba7f4879bfcebb33f
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 23:33:10 2026 +0800

    da
---
 .../sink/protocol/opcda/OpcDaServerHandle.java     | 240 +++++++++++++++++++++
 .../db/pipe/sink/protocol/opcda/OpcDaSink.java     | 166 +++++++++++++-
 .../sink/protocol/opcda/OpcDaServerHandleTest.java | 184 ++++++++++++++++
 .../agent/plugin/builtin/sink/opcda/OpcDaSink.java |   2 +
 .../pipe/config/constant/PipeSinkConstant.java     |  22 ++
 5 files changed, 603 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java
index 816d757391b..2d089155c39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.opcda;
 
+import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
@@ -40,6 +41,7 @@ import com.sun.jna.platform.win32.WinNT;
 import com.sun.jna.ptr.IntByReference;
 import com.sun.jna.ptr.PointerByReference;
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -51,6 +53,8 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.sql.Date;
 import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -60,6 +64,10 @@ import static 
org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCI
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCServer;
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCSyncIO;
 import static 
org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IUNKNOWN;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE;
 
 public class OpcDaServerHandle implements Closeable {
 
@@ -70,11 +78,19 @@ public class OpcDaServerHandle implements Closeable {
   private final OpcDaHeader.IOPCSyncIO syncIO;
   private final Map<String, Integer> serverHandleMap = new 
ConcurrentHashMap<>();
   private final Map<String, Long> serverTimestampMap = new 
ConcurrentHashMap<>();
+  private final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig;
 
   // Save it here to avoid memory leakage
   private WTypes.BSTR bstr;
 
   OpcDaServerHandle(String clsOrProgID) {
+    this(clsOrProgID, TableModelItemIdEncodingConfig.humanReadableDefaults());
+  }
+
+  OpcDaServerHandle(
+      final String clsOrProgID,
+      final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) {
+    this.tableModelItemIdEncodingConfig = tableModelItemIdEncodingConfig;
     final Guid.CLSID CLSID_OPC_SERVER = new Guid.CLSID(clsOrProgID);
 
     Ole32.INSTANCE.CoInitializeEx(null, Ole32.COINIT_MULTITHREADED);
@@ -181,6 +197,20 @@ public class OpcDaServerHandle implements Closeable {
   }
 
   void transfer(final Tablet tablet) {
+    transfer(tablet, false, null);
+  }
+
+  void transfer(
+      final Tablet tablet, final boolean isTableModel, final String 
tableModelDatabaseName) {
+    if (!isTableModel) {
+      transferTreeModelTablet(tablet);
+      return;
+    }
+
+    transferTableModelTablet(tablet, tableModelDatabaseName);
+  }
+
+  private void transferTreeModelTablet(final Tablet tablet) {
     new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
     final List<IMeasurementSchema> schemas = tablet.getSchemas();
 
@@ -206,6 +236,216 @@ public class OpcDaServerHandle implements Closeable {
     }
   }
 
+  private void transferTableModelTablet(final Tablet tablet, final String 
tableModelDatabaseName) {
+    final List<IMeasurementSchema> schemas = tablet.getSchemas();
+    final Map<String, ItemValuePosition> itemIdToValuePosition =
+        collectLatestTableModelItemPositions(
+            tablet, tableModelDatabaseName, tableModelItemIdEncodingConfig);
+
+    for (final Map.Entry<String, ItemValuePosition> entry : 
itemIdToValuePosition.entrySet()) {
+      final String itemId = entry.getKey();
+      final ItemValuePosition itemValuePosition = entry.getValue();
+      final IMeasurementSchema schema = 
schemas.get(itemValuePosition.getColumnIndex());
+
+      if (!serverHandleMap.containsKey(itemId)) {
+        addItem(itemId, schema.getType());
+      }
+
+      final long timestamp = 
tablet.getTimestamp(itemValuePosition.getRowIndex());
+      if (serverTimestampMap.get(itemId) <= timestamp) {
+        writeData(
+            itemId,
+            getTabletObjectValue4Opc(
+                tablet.getValues()[itemValuePosition.getColumnIndex()],
+                itemValuePosition.getRowIndex(),
+                schema.getType()));
+        serverTimestampMap.put(itemId, timestamp);
+      }
+    }
+  }
+
+  static Map<String, ItemValuePosition> collectLatestTableModelItemPositions(
+      final Tablet tablet, final String tableModelDatabaseName) {
+    return collectLatestTableModelItemPositions(
+        tablet, tableModelDatabaseName, 
TableModelItemIdEncodingConfig.humanReadableDefaults());
+  }
+
+  static Map<String, ItemValuePosition> collectLatestTableModelItemPositions(
+      final Tablet tablet,
+      final String tableModelDatabaseName,
+      final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) {
+    if (Objects.isNull(tableModelDatabaseName)) {
+      throw new PipeException("The table model database name must exist when 
transferring tablet.");
+    }
+
+    new 
PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp();
+    final List<IMeasurementSchema> schemas = tablet.getSchemas();
+    final Map<String, ItemValuePosition> itemIdToValuePosition = new 
LinkedHashMap<>();
+
+    for (int rowIndex = tablet.getRowSize() - 1; rowIndex >= 0; --rowIndex) {
+      for (int columnIndex = 0; columnIndex < schemas.size(); ++columnIndex) {
+        if 
(!tablet.getColumnTypes().get(columnIndex).equals(ColumnCategory.FIELD)
+            || tablet.isNull(rowIndex, columnIndex)) {
+          continue;
+        }
+
+        itemIdToValuePosition.putIfAbsent(
+            generateTableModelItemId(
+                tableModelDatabaseName,
+                tablet,
+                rowIndex,
+                schemas.get(columnIndex).getMeasurementName(),
+                tableModelItemIdEncodingConfig),
+            new ItemValuePosition(rowIndex, columnIndex));
+      }
+    }
+
+    return itemIdToValuePosition;
+  }
+
+  static String generateTableModelItemId(
+      final String tableModelDatabaseName,
+      final Tablet tablet,
+      final int rowIndex,
+      final String measurementName) {
+    return generateTableModelItemId(
+        tableModelDatabaseName,
+        tablet,
+        rowIndex,
+        measurementName,
+        TableModelItemIdEncodingConfig.humanReadableDefaults());
+  }
+
+  static String generateTableModelItemId(
+      final String tableModelDatabaseName,
+      final Tablet tablet,
+      final int rowIndex,
+      final String measurementName,
+      final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) {
+    final StringBuilder builder =
+        new 
StringBuilder(tableModelItemIdEncodingConfig.encodeSegment(tableModelDatabaseName));
+    for (final Object segment : tablet.getDeviceID(rowIndex).getSegments()) {
+      builder
+          .append(TsFileConstant.PATH_SEPARATOR)
+          .append(tableModelItemIdEncodingConfig.encodeSegment(segment));
+    }
+    return builder
+        .append(TsFileConstant.PATH_SEPARATOR)
+        .append(tableModelItemIdEncodingConfig.encodeSegment(measurementName))
+        .toString();
+  }
+
+  static final class ItemValuePosition {
+    private final int rowIndex;
+    private final int columnIndex;
+
+    private ItemValuePosition(final int rowIndex, final int columnIndex) {
+      this.rowIndex = rowIndex;
+      this.columnIndex = columnIndex;
+    }
+
+    int getRowIndex() {
+      return rowIndex;
+    }
+
+    int getColumnIndex() {
+      return columnIndex;
+    }
+  }
+
+  static final class TableModelItemIdEncodingConfig {
+    private final String nullTagSentinel;
+    private final String segmentEscape;
+    private final String escapedSegmentEscape;
+    private final String escapedPathSeparator;
+
+    static TableModelItemIdEncodingConfig humanReadableDefaults() {
+      return new TableModelItemIdEncodingConfig(
+          CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE,
+          CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE,
+          CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE,
+          CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE);
+    }
+
+    TableModelItemIdEncodingConfig(
+        final String nullTagSentinel,
+        final String segmentEscape,
+        final String escapedSegmentEscape,
+        final String escapedPathSeparator) {
+      validate(
+          nullTagSentinel, segmentEscape, escapedSegmentEscape, 
escapedPathSeparator);
+      this.nullTagSentinel = nullTagSentinel;
+      this.segmentEscape = segmentEscape;
+      this.escapedSegmentEscape = escapedSegmentEscape;
+      this.escapedPathSeparator = escapedPathSeparator;
+    }
+
+    private static void validate(
+        final String nullTagSentinel,
+        final String segmentEscape,
+        final String escapedSegmentEscape,
+        final String escapedPathSeparator) {
+      final String pathSeparator = 
String.valueOf(TsFileConstant.PATH_SEPARATOR);
+      final List<String> tokens =
+          Arrays.asList(
+              nullTagSentinel, segmentEscape, escapedSegmentEscape, 
escapedPathSeparator);
+      if (tokens.stream().anyMatch(token -> Objects.isNull(token) || 
token.isEmpty())) {
+        throw new PipeException("The OPC DA table model item id encoding 
tokens must be non-empty.");
+      }
+      if (tokens.stream().anyMatch(token -> token.contains(pathSeparator))) {
+        throw new PipeException(
+            "The OPC DA table model item id encoding tokens must not contain 
path separator '.'.");
+      }
+      if (nullTagSentinel.contains(segmentEscape)
+          || escapedSegmentEscape.contains(segmentEscape)
+          || escapedPathSeparator.contains(segmentEscape)) {
+        throw new PipeException(
+            "The OPC DA table model item id encoding tokens must not contain 
the segment escape token.");
+      }
+      if (tokens.stream().distinct().count() != tokens.size()) {
+        throw new PipeException(
+            "The OPC DA table model item id encoding tokens must be pairwise 
different.");
+      }
+    }
+
+    String encodeSegment(final Object segment) {
+      if (Objects.isNull(segment)) {
+        return nullTagSentinel;
+      }
+
+      return segment
+          .toString()
+          .replace(segmentEscape, escapedMarker(escapedSegmentEscape))
+          .replace(nullTagSentinel, escapedMarker(nullTagSentinel))
+          .replace(TsFileConstant.PATH_SEPARATOR, 
escapedMarker(escapedPathSeparator));
+    }
+
+    private String escapedMarker(final String marker) {
+      return segmentEscape + marker + segmentEscape;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof TableModelItemIdEncodingConfig)) {
+        return false;
+      }
+      final TableModelItemIdEncodingConfig that = 
(TableModelItemIdEncodingConfig) obj;
+      return Objects.equals(nullTagSentinel, that.nullTagSentinel)
+          && Objects.equals(segmentEscape, that.segmentEscape)
+          && Objects.equals(escapedSegmentEscape, that.escapedSegmentEscape)
+          && Objects.equals(escapedPathSeparator, that.escapedPathSeparator);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          nullTagSentinel, segmentEscape, escapedSegmentEscape, 
escapedPathSeparator);
+    }
+  }
+
   private void addItem(final String itemId, final TSDataType type) {
     final OpcDaHeader.OPCITEMDEF[] itemDefs = new OpcDaHeader.OPCITEMDEF[1];
     itemDefs[0] = new OpcDaHeader.OPCITEMDEF();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java
index 322943de402..fdc6f2fb51f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.pipe.sink.protocol.opcda;
 
-import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -30,6 +32,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,22 +40,36 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_CLSID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_PROGID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_SEGMENT_ESCAPE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_CLSID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_NULL_TAG_SENTINEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_PROGID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_SEGMENT_ESCAPE_KEY;
 
 /**
  * Send data in IoTDB based on Opc Da protocol, using JNA. All data are 
converted into tablets, and
  * then push the newest value to the <b>local COM</b> server in another 
process.
  */
 @TreeModel
+@TableModel
 public class OpcDaSink implements PipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpcDaSink.class);
-  private static final Map<String, Pair<AtomicInteger, OpcDaServerHandle>>
+  private static final Map<ServerHandleKey, Pair<AtomicInteger, 
OpcDaServerHandle>>
       CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP = new ConcurrentHashMap<>();
-  private String clsID;
+  private ServerHandleKey serverHandleKey;
   private OpcDaServerHandle handle;
 
   @Override
@@ -74,6 +91,8 @@ public class OpcDaSink implements PipeConnector {
     if (!System.getProperty("os.name").toLowerCase().startsWith("windows")) {
       throw new PipeParameterNotValidException("opc-da-sink must run on 
windows system.");
     }
+
+    createTableModelItemIdEncodingConfig(validator.getParameters());
   }
 
   @Override
@@ -81,18 +100,25 @@ public class OpcDaSink implements PipeConnector {
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
     synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
-      clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY, 
SINK_OPC_DA_CLSID_KEY);
+      String clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY, 
SINK_OPC_DA_CLSID_KEY);
       if (Objects.isNull(clsID)) {
         clsID =
             OpcDaServerHandle.getClsIDFromProgID(
                 parameters.getStringByKeys(CONNECTOR_OPC_DA_PROGID_KEY, 
SINK_OPC_DA_PROGID_KEY));
       }
+      serverHandleKey =
+          new ServerHandleKey(clsID, 
createTableModelItemIdEncodingConfig(parameters));
       handle =
           CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP
               .computeIfAbsent(
-                  clsID, key -> new Pair<>(new AtomicInteger(0), new 
OpcDaServerHandle(clsID)))
+                  serverHandleKey,
+                  key ->
+                      new Pair<>(
+                          new AtomicInteger(0),
+                          new OpcDaServerHandle(
+                              key.clsID, key.tableModelItemIdEncodingConfig)))
               .getRight();
-      
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID).getLeft().incrementAndGet();
+      
CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(serverHandleKey).getLeft().incrementAndGet();
     }
   }
 
@@ -108,8 +134,10 @@ public class OpcDaSink implements PipeConnector {
 
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    OpcUaSink.transferByTablet(
-        tabletInsertionEvent, LOGGER, (tablet, isTableModel) -> 
handle.transfer(tablet));
+    transferByTablet(
+        tabletInsertionEvent,
+        (tablet, isTableModel, tableModelDatabaseName) ->
+            handle.transfer(tablet, isTableModel, tableModelDatabaseName));
   }
 
   @Override
@@ -119,13 +147,13 @@ public class OpcDaSink implements PipeConnector {
 
   @Override
   public void close() throws Exception {
-    if (Objects.isNull(clsID)) {
+    if (Objects.isNull(serverHandleKey)) {
       return;
     }
 
     synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) {
       final Pair<AtomicInteger, OpcDaServerHandle> pair =
-          CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID);
+          CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(serverHandleKey);
       if (pair == null) {
         return;
       }
@@ -134,9 +162,125 @@ public class OpcDaSink implements PipeConnector {
         try {
           pair.getRight().close();
         } finally {
-          CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(clsID);
+          CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(serverHandleKey);
         }
       }
     }
   }
+
+  private static OpcDaServerHandle.TableModelItemIdEncodingConfig
+      createTableModelItemIdEncodingConfig(final PipeParameters parameters) {
+    return new OpcDaServerHandle.TableModelItemIdEncodingConfig(
+        parameters.getStringOrDefault(
+            Arrays.asList(
+                CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_KEY, 
SINK_OPC_DA_NULL_TAG_SENTINEL_KEY),
+            CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE),
+        parameters.getStringOrDefault(
+            Arrays.asList(CONNECTOR_OPC_DA_SEGMENT_ESCAPE_KEY, 
SINK_OPC_DA_SEGMENT_ESCAPE_KEY),
+            CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE),
+        parameters.getStringOrDefault(
+            Arrays.asList(
+                CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY,
+                SINK_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY),
+            CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE),
+        parameters.getStringOrDefault(
+            Arrays.asList(
+                CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY,
+                SINK_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY),
+            CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE));
+  }
+
+  private static void transferByTablet(
+      final TabletInsertionEvent tabletInsertionEvent,
+      final ThrowingTriConsumer<Tablet, Boolean, String, Exception> 
transferTablet)
+      throws Exception {
+    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+      LOGGER.warn(
+          "This Connector only support "
+              + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent. "
+              + "Ignore {}.",
+          tabletInsertionEvent);
+      return;
+    }
+
+    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+      transferTabletWrapper(
+          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent, 
transferTablet);
+    } else {
+      transferTabletWrapper((PipeRawTabletInsertionEvent) 
tabletInsertionEvent, transferTablet);
+    }
+  }
+
+  private static void transferTabletWrapper(
+      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent,
+      final ThrowingTriConsumer<Tablet, Boolean, String, Exception> 
transferTablet)
+      throws Exception {
+    if 
(!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(OpcDaSink.class.getName()))
 {
+      return;
+    }
+    try {
+      final boolean isTableModel = 
pipeInsertNodeTabletInsertionEvent.isTableModelEvent();
+      final String tableModelDatabaseName =
+          isTableModel ? 
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() : null;
+      for (final Tablet tablet : 
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
+        transferTablet.accept(tablet, isTableModel, tableModelDatabaseName);
+      }
+    } finally {
+      
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(OpcDaSink.class.getName(),
 false);
+    }
+  }
+
+  private static void transferTabletWrapper(
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent,
+      final ThrowingTriConsumer<Tablet, Boolean, String, Exception> 
transferTablet)
+      throws Exception {
+    if 
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcDaSink.class.getName()))
 {
+      return;
+    }
+    try {
+      final boolean isTableModel = 
pipeRawTabletInsertionEvent.isTableModelEvent();
+      transferTablet.accept(
+          pipeRawTabletInsertionEvent.convertToTablet(),
+          isTableModel,
+          isTableModel ? 
pipeRawTabletInsertionEvent.getTableModelDatabaseName() : null);
+    } finally {
+      
pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcDaSink.class.getName(), 
false);
+    }
+  }
+
+  @FunctionalInterface
+  private interface ThrowingTriConsumer<T, U, V, E extends Exception> {
+    void accept(final T t, final U u, final V v) throws E;
+  }
+
+  private static final class ServerHandleKey {
+    private final String clsID;
+    private final OpcDaServerHandle.TableModelItemIdEncodingConfig 
tableModelItemIdEncodingConfig;
+
+    private ServerHandleKey(
+        final String clsID,
+        final OpcDaServerHandle.TableModelItemIdEncodingConfig 
tableModelItemIdEncodingConfig) {
+      this.clsID = clsID;
+      this.tableModelItemIdEncodingConfig = tableModelItemIdEncodingConfig;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof ServerHandleKey)) {
+        return false;
+      }
+      final ServerHandleKey that = (ServerHandleKey) obj;
+      return Objects.equals(clsID, that.clsID)
+          && Objects.equals(tableModelItemIdEncodingConfig, 
that.tableModelItemIdEncodingConfig);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(clsID, tableModelItemIdEncodingConfig);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandleTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandleTest.java
new file mode 100644
index 00000000000..2f5136772d8
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandleTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcda;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class OpcDaServerHandleTest {
+
+  @Test
+  public void testCollectLatestTableModelItemPositions() {
+    final Tablet tablet = generateTableModelTablet();
+
+    final Map<String, OpcDaServerHandle.ItemValuePosition> itemPositions =
+        OpcDaServerHandle.collectLatestTableModelItemPositions(tablet, 
"factory");
+
+    Assert.assertEquals(6, itemPositions.size());
+    Assert.assertFalse(itemPositions.containsKey("factory.status.d1.site"));
+
+    assertItem(itemPositions, tablet, "factory.status.d1.s1", 13L, 2L);
+    assertItem(itemPositions, tablet, "factory.status.d1.s2", false, 2L);
+    assertItem(itemPositions, tablet, "factory.status.d2.s1", 21L, 1L);
+    assertItem(itemPositions, tablet, "factory.status.d2.s2", true, 1L);
+    assertItem(itemPositions, tablet, "factory.status.__NULL__.s1", 31L, 3L);
+    assertItem(itemPositions, tablet, "factory.status.__NULL__.s2", false, 3L);
+  }
+
+  @Test
+  public void testGenerateTableModelItemIdShouldEscapeSpecialSegments() {
+    final Tablet tablet = generateEscapedTableModelTablet();
+
+    final Map<String, OpcDaServerHandle.ItemValuePosition> itemPositions =
+        OpcDaServerHandle.collectLatestTableModelItemPositions(tablet, 
"factory");
+
+    Assert.assertEquals(4, itemPositions.size());
+    assertItem(itemPositions, tablet, 
"factory.status.a.b__ESC__DOT__ESC__c.s1", 11L, 1L);
+    assertItem(itemPositions, tablet, 
"factory.status.a__ESC__DOT__ESC__b.c.s1", 22L, 2L);
+    assertItem(itemPositions, tablet, "factory.status.__NULL__.c.s1", 33L, 3L);
+    assertItem(itemPositions, tablet, "factory.status.null.c.s1", 44L, 4L);
+  }
+
+  @Test
+  public void testGenerateTableModelItemIdShouldHonorCustomEncodingConfig() {
+    final Tablet tablet = generateCustomEncodingTablet();
+    final Map<String, OpcDaServerHandle.ItemValuePosition> itemPositions =
+        OpcDaServerHandle.collectLatestTableModelItemPositions(
+            tablet,
+            "factory",
+            new OpcDaServerHandle.TableModelItemIdEncodingConfig(
+                "[NULL]", "[ESC]", "ESC", "DOT"));
+
+    Assert.assertEquals(3, itemPositions.size());
+    assertItem(itemPositions, tablet, "factory.status.[NULL].c.s1", 33L, 1L);
+    assertItem(itemPositions, tablet, "factory.status.[ESC][NULL][ESC].c.s1", 
55L, 2L);
+    assertItem(itemPositions, tablet, "factory.status.[ESC]ESC[ESC].c.s1", 
66L, 3L);
+  }
+
+  private static void assertItem(
+      final Map<String, OpcDaServerHandle.ItemValuePosition> itemPositions,
+      final Tablet tablet,
+      final String itemId,
+      final Object expectedValue,
+      final long expectedTimestamp) {
+    final OpcDaServerHandle.ItemValuePosition itemValuePosition = 
itemPositions.get(itemId);
+    Assert.assertNotNull(itemValuePosition);
+    Assert.assertEquals(expectedTimestamp, 
tablet.getTimestamp(itemValuePosition.getRowIndex()));
+    Assert.assertEquals(
+        expectedValue,
+        tablet.getValue(itemValuePosition.getRowIndex(), 
itemValuePosition.getColumnIndex()));
+  }
+
+  private static Tablet generateTableModelTablet() {
+    final List<String> measurementNames = Arrays.asList("site", "s1", "s2");
+    final List<TSDataType> dataTypes =
+        Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.BOOLEAN);
+    final List<ColumnCategory> columnTypes =
+        Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, 
ColumnCategory.FIELD);
+
+    final Tablet tablet = new Tablet("status", measurementNames, dataTypes, 
columnTypes, 5);
+    tablet.initBitMaps();
+
+    addRow(tablet, 0, 2L, "d1", 12L, true);
+    addRow(tablet, 1, 3L, null, 31L, false);
+    addRow(tablet, 2, 1L, "d1", 11L, null);
+    addRow(tablet, 3, 1L, "d2", 21L, true);
+    addRow(tablet, 4, 2L, "d1", 13L, false);
+    return tablet;
+  }
+
+  private static Tablet generateEscapedTableModelTablet() {
+    final List<String> measurementNames = Arrays.asList("tag1", "tag2", "s1");
+    final List<TSDataType> dataTypes =
+        Arrays.asList(TSDataType.STRING, TSDataType.STRING, TSDataType.INT64);
+    final List<ColumnCategory> columnTypes =
+        Arrays.asList(ColumnCategory.TAG, ColumnCategory.TAG, 
ColumnCategory.FIELD);
+
+    final Tablet tablet = new Tablet("status", measurementNames, dataTypes, 
columnTypes, 4);
+    tablet.initBitMaps();
+
+    addRow(tablet, 0, 1L, new String[] {"a", "b.c"}, 11L);
+    addRow(tablet, 1, 2L, new String[] {"a.b", "c"}, 22L);
+    addRow(tablet, 2, 3L, new String[] {null, "c"}, 33L);
+    addRow(tablet, 3, 4L, new String[] {"null", "c"}, 44L);
+    return tablet;
+  }
+
+  private static Tablet generateCustomEncodingTablet() {
+    final List<String> measurementNames = Arrays.asList("tag1", "tag2", "s1");
+    final List<TSDataType> dataTypes =
+        Arrays.asList(TSDataType.STRING, TSDataType.STRING, TSDataType.INT64);
+    final List<ColumnCategory> columnTypes =
+        Arrays.asList(ColumnCategory.TAG, ColumnCategory.TAG, 
ColumnCategory.FIELD);
+
+    final Tablet tablet = new Tablet("status", measurementNames, dataTypes, 
columnTypes, 3);
+    tablet.initBitMaps();
+
+    addRow(tablet, 0, 1L, new String[] {null, "c"}, 33L);
+    addRow(tablet, 1, 2L, new String[] {"[NULL]", "c"}, 55L);
+    addRow(tablet, 2, 3L, new String[] {"[ESC]", "c"}, 66L);
+    return tablet;
+  }
+
+  private static void addRow(
+      final Tablet tablet,
+      final int rowIndex,
+      final long timestamp,
+      final String site,
+      final Long s1,
+      final Boolean s2) {
+    tablet.addTimestamp(rowIndex, timestamp);
+    tablet.addValue(
+        "site",
+        rowIndex,
+        site == null ? null : new 
Binary(site.getBytes(StandardCharsets.UTF_8)));
+    tablet.addValue("s1", rowIndex, s1);
+    tablet.addValue("s2", rowIndex, s2);
+    tablet.setRowSize(rowIndex + 1);
+  }
+
+  private static void addRow(
+      final Tablet tablet,
+      final int rowIndex,
+      final long timestamp,
+      final String[] tags,
+      final Long s1) {
+    tablet.addTimestamp(rowIndex, timestamp);
+    tablet.addValue(
+        "tag1",
+        rowIndex,
+        tags[0] == null ? null : new 
Binary(tags[0].getBytes(StandardCharsets.UTF_8)));
+    tablet.addValue(
+        "tag2",
+        rowIndex,
+        tags[1] == null ? null : new 
Binary(tags[1].getBytes(StandardCharsets.UTF_8)));
+    tablet.addValue("s1", rowIndex, s1);
+    tablet.setRowSize(rowIndex + 1);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java
index 76dd6b42072..ccb462f09cd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.PlaceholderSink;
+import org.apache.iotdb.pipe.api.annotation.TableModel;
 import org.apache.iotdb.pipe.api.annotation.TreeModel;
 
 /**
@@ -29,4 +30,5 @@ import org.apache.iotdb.pipe.api.annotation.TreeModel;
  * OPC DA connector.
  */
 @TreeModel
+@TableModel
 public class OpcDaSink extends PlaceholderSink {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
index e6f14911e66..af4517d657f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java
@@ -351,6 +351,28 @@ public class PipeSinkConstant {
   public static final String CONNECTOR_OPC_DA_PROGID_KEY = 
"connector.opcda.progid";
   public static final String SINK_OPC_DA_PROGID_KEY = "sink.opcda.progid";
 
+  public static final String CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_KEY =
+      "connector.opcda.null-tag-sentinel";
+  public static final String SINK_OPC_DA_NULL_TAG_SENTINEL_KEY = 
"sink.opcda.null-tag-sentinel";
+  public static final String CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE 
= "__NULL__";
+
+  public static final String CONNECTOR_OPC_DA_SEGMENT_ESCAPE_KEY =
+      "connector.opcda.segment-escape";
+  public static final String SINK_OPC_DA_SEGMENT_ESCAPE_KEY = 
"sink.opcda.segment-escape";
+  public static final String CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE = 
"__ESC__";
+
+  public static final String CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY =
+      "connector.opcda.escaped-segment-escape";
+  public static final String SINK_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY =
+      "sink.opcda.escaped-segment-escape";
+  public static final String 
CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE = "ESC";
+
+  public static final String CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY =
+      "connector.opcda.escaped-path-separator";
+  public static final String SINK_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY =
+      "sink.opcda.escaped-path-separator";
+  public static final String 
CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE = "DOT";
+
   public static final String CONNECTOR_USE_EVENT_USER_NAME_KEY = 
"connector.use-event-user-name";
   public static final String SINK_USE_EVENT_USER_NAME_KEY = 
"sink.use-event-user-name";
   public static final boolean CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE = 
false;


Reply via email to