This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 83447eb30b1 [IOTDB-6235] Pipe: Implement write-back-sink to transfer 
data back to the sender (#11466)
83447eb30b1 is described below

commit 83447eb30b1c4eb1c83fa45457df50ae91aed345
Author: Caideyipi <[email protected]>
AuthorDate: Fri Nov 3 19:11:28 2023 +0800

    [IOTDB-6235] Pipe: Implement write-back-sink to transfer data back to the 
sender (#11466)
---
 .../request/PipeTransferTabletInsertNodeReq.java   |  14 +-
 .../request/PipeTransferTabletRawReq.java          |  11 ++
 .../protocol/airgap/IoTDBAirGapConnector.java      |  21 ++-
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |   3 +-
 .../thrift/async/IoTDBThriftAsyncConnector.java    |   3 +-
 .../thrift/sync/IoTDBThriftSyncConnector.java      |   3 +-
 .../protocol/websocket/WebSocketConnector.java     |   4 +-
 .../protocol/writeback/WriteBackConnector.java     | 176 +++++++++++++++++++++
 .../connector/PipeConnectorSubtaskManager.java     |   5 +
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   3 +
 .../builtin/connector/WebSocketConnector.java      |   4 +-
 ...ocketConnector.java => WriteBackConnector.java} |   6 +-
 12 files changed, 230 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index b819ddace09..80b9f92042b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -25,7 +25,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -49,7 +49,7 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
     return insertNode;
   }
 
-  public Statement constructStatement() {
+  public InsertBaseStatement constructStatement() {
     if (insertNode instanceof InsertRowNode) {
       final InsertRowNode node = (InsertRowNode) insertNode;
 
@@ -87,6 +87,16 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
             insertNode));
   }
 
+  /////////////////////////////// WriteBack ///////////////////////////////
+
+  public static PipeTransferTabletInsertNodeReq 
toTPipeTransferRawReq(InsertNode insertNode) {
+    final PipeTransferTabletInsertNodeReq req = new 
PipeTransferTabletInsertNodeReq();
+
+    req.insertNode = insertNode;
+
+    return req;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(InsertNode 
insertNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 4908d26d695..8540a9a5a85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -200,6 +200,17 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     return sortedBitMap;
   }
 
+  /////////////////////////////// WriteBack ///////////////////////////////
+
+  public static PipeTransferTabletRawReq toTPipeTransferRawReq(Tablet tablet, 
boolean isAligned) {
+    final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
+
+    tabletReq.tablet = tablet;
+    tabletReq.isAligned = isAligned;
+
+    return tabletReq;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletRawReq toTPipeTransferReq(Tablet tablet, 
boolean isAligned)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 854355ad916..547fe8f60f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -100,17 +100,14 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
         empty -> {
           try {
             // Ensure the sink doesn't point to the air gap receiver on 
DataNode itself
-
-            if (!pipeConfig.getPipeAirGapReceiverEnabled()) {
-              return true;
-            }
-
-            return !NodeUrlUtils.containsLocalAddress(
-                givenNodeUrls.stream()
-                    .filter(
-                        tEndPoint -> tEndPoint.getPort() == 
pipeConfig.getPipeAirGapReceiverPort())
-                    .map(TEndPoint::getIp)
-                    .collect(Collectors.toList()));
+            return !(pipeConfig.getPipeAirGapReceiverEnabled()
+                && NodeUrlUtils.containsLocalAddress(
+                    givenNodeUrls.stream()
+                        .filter(
+                            tEndPoint ->
+                                tEndPoint.getPort() == 
pipeConfig.getPipeAirGapReceiverPort())
+                        .map(TEndPoint::getIp)
+                        .collect(Collectors.toList())));
           } catch (UnknownHostException e) {
             LOGGER.warn("Unknown host when checking pipe sink IP.", e);
             return false;
@@ -318,7 +315,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
   @Override
   public void transfer(Event event) {
     if (!(event instanceof PipeHeartbeatEvent)) {
-      LOGGER.warn("IoTDBAirGapConnector does not support transfer generic 
event: {}.", event);
+      LOGGER.warn("IoTDBAirGapConnector does not support transferring generic 
event: {}.", event);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index e61dd3428ef..cf46b975e01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -278,7 +278,8 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
   @Override
   public void transfer(Event event) throws Exception {
     if (!(event instanceof PipeHeartbeatEvent)) {
-      LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic 
event: {}.", event);
+      LOGGER.warn(
+          "IoTDBLegacyPipeConnector does not support transferring generic 
event: {}.", event);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 82966915c7d..b3e43b86101 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -360,7 +360,8 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     transferBatchedEventsIfNecessary();
 
     if (!(event instanceof PipeHeartbeatEvent)) {
-      LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.", event);
+      LOGGER.warn(
+          "IoTDBThriftAsyncConnector does not support transferring generic 
event: {}.", event);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index 15041bdf09d..b6129f4acaa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -307,7 +307,8 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
     }
 
     if (!(event instanceof PipeHeartbeatEvent)) {
-      LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic 
event: {}.", event);
+      LOGGER.warn(
+          "IoTDBThriftSyncConnector does not support transferring generic 
event: {}.", event);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 3329fd3bcf1..0bdc67498ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -62,7 +62,9 @@ public class WebSocketConnector implements PipeConnector {
       new PriorityQueue<>(Comparator.comparing(o -> o.left));
 
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {}
+  public void validate(PipeParameterValidator validator) throws Exception {
+    // Do nothing
+  }
 
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
new file mode 100644
index 00000000000..eb262704c68
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.writeback;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+
+public class WriteBackConnector implements PipeConnector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WriteBackConnector.class);
+
+  @Override
+  public void validate(PipeParameterValidator validator) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void heartbeat() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    // PipeProcessor can change the type of TabletInsertionEvent
+    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+      LOGGER.warn(
+          "WriteBackConnector only support "
+              + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent. "
+              + "Ignore {}.",
+          tabletInsertionEvent);
+      return;
+    }
+
+    if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
+      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        transfer(
+            ((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent).parseEventWithPattern());
+      } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+        transfer(((PipeRawTabletInsertionEvent) 
tabletInsertionEvent).parseEventWithPattern());
+      }
+      return;
+    }
+
+    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+      doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+    } else {
+      doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+    }
+  }
+
+  @Override
+  public void transfer(Event event) throws Exception {
+    if (!(event instanceof PipeHeartbeatEvent)) {
+      LOGGER.warn("WriteBackConnector does not support transferring generic 
event: {}.", event);
+    }
+  }
+
+  private void doTransfer(PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
+      throws PipeException, WALPipeException {
+    final TSStatus status =
+        pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() 
== null
+            ? PipeAgent.receiver()
+                .thrift()
+                .receive(
+                    PipeTransferTabletBinaryReq.toTPipeTransferReq(
+                        pipeInsertNodeTabletInsertionEvent.getByteBuffer()),
+                    ClusterPartitionFetcher.getInstance(),
+                    ClusterSchemaFetcher.getInstance())
+                .getStatus()
+            : executeStatement(
+                PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
+                        pipeInsertNodeTabletInsertionEvent.getInsertNode())
+                    .constructStatement());
+
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Transfer PipeInsertNodeTabletInsertionEvent %s error, result 
status %s",
+              pipeInsertNodeTabletInsertionEvent, status));
+    }
+  }
+
+  private void doTransfer(PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
+      throws PipeException {
+    final TSStatus status =
+        executeStatement(
+            PipeTransferTabletRawReq.toTPipeTransferRawReq(
+                    pipeRawTabletInsertionEvent.convertToTablet(),
+                    pipeRawTabletInsertionEvent.isAligned())
+                .constructStatement());
+
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format(
+              "Transfer PipeRawTabletInsertionEvent %s error, result status 
%s",
+              pipeRawTabletInsertionEvent, status));
+    }
+  }
+
+  private TSStatus executeStatement(InsertBaseStatement statement) {
+    return Coordinator.getInstance()
+        .execute(
+            new PipeEnrichedInsertBaseStatement(statement),
+            SessionManager.getInstance().requestQueryId(),
+            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault().getId()),
+            "",
+            ClusterPartitionFetcher.getInstance(),
+            ClusterSchemaFetcher.getInstance(),
+            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+        .status;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Do nothing
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 52fbf18aef4..1c07e78f029 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector;
+import 
org.apache.iotdb.db.pipe.connector.protocol.writeback.WriteBackConnector;
 import 
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -202,6 +203,8 @@ public class PipeConnectorSubtaskManager {
         BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(), 
OpcUaConnector::new);
     CONNECTOR_CONSTRUCTORS.put(
         BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(), 
DoNothingConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName(), 
WriteBackConnector::new);
 
     CONNECTOR_CONSTRUCTORS.put(
         BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(), 
IoTDBThriftAsyncConnector::new);
@@ -222,6 +225,8 @@ public class PipeConnectorSubtaskManager {
         BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(), 
OpcUaConnector::new);
     CONNECTOR_CONSTRUCTORS.put(
         BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(), 
DoNothingConnector::new);
+    CONNECTOR_CONSTRUCTORS.put(
+        BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName(), 
WriteBackConnector::new);
   }
 
   private static class PipeSubtaskManagerHolder {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index f702179004a..70814c34b45 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnect
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.connector.OpcUaConnector;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector;
+import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WriteBackConnector;
 import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
 import 
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
 
@@ -52,6 +53,7 @@ public enum BuiltinPipePlugin {
   IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", 
IoTDBAirGapConnector.class),
   WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
   OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
+  WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class),
 
   DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
   IOTDB_THRIFT_SINK("iotdb-thrift-sink", IoTDBThriftConnector.class),
@@ -61,6 +63,7 @@ public enum BuiltinPipePlugin {
   IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class),
   WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class),
   OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class),
+  WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class),
   ;
 
   private final String pipePluginName;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
index a9d3a8ccf9e..dd69ff3b484 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
 /**
- * This class is a placeholder and should not be initialized. It represents 
the IoTDB WebSocket
+ * This class is a placeholder and should not be initialized. It represents 
the Web Socket
  * connector. There is a real implementation in the server module but cannot 
be imported here. The
  * pipe agent in the server module will replace this class with the real 
implementation when
- * initializing the IoTDB Thrift connector.
+ * initializing the Web Socket connector.
  */
 public class WebSocketConnector extends PlaceholderConnector {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WriteBackConnector.java
similarity index 89%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WriteBackConnector.java
index a9d3a8ccf9e..44efbd9ab74 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WriteBackConnector.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
 
 /**
- * This class is a placeholder and should not be initialized. It represents 
the IoTDB WebSocket
+ * This class is a placeholder and should not be initialized. It represents 
the Write Back
  * connector. There is a real implementation in the server module but cannot 
be imported here. The
  * pipe agent in the server module will replace this class with the real 
implementation when
- * initializing the IoTDB Thrift connector.
+ * initializing the Write Back connector.
  */
-public class WebSocketConnector extends PlaceholderConnector {}
+public class WriteBackConnector extends PlaceholderConnector {}

Reply via email to