This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 83447eb30b1 [IOTDB-6235] Pipe: Implement write-back-sink to transfer
data back to the sender (#11466)
83447eb30b1 is described below
commit 83447eb30b1c4eb1c83fa45457df50ae91aed345
Author: Caideyipi <[email protected]>
AuthorDate: Fri Nov 3 19:11:28 2023 +0800
[IOTDB-6235] Pipe: Implement write-back-sink to transfer data back to the
sender (#11466)
---
.../request/PipeTransferTabletInsertNodeReq.java | 14 +-
.../request/PipeTransferTabletRawReq.java | 11 ++
.../protocol/airgap/IoTDBAirGapConnector.java | 21 ++-
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 3 +-
.../thrift/async/IoTDBThriftAsyncConnector.java | 3 +-
.../thrift/sync/IoTDBThriftSyncConnector.java | 3 +-
.../protocol/websocket/WebSocketConnector.java | 4 +-
.../protocol/writeback/WriteBackConnector.java | 176 +++++++++++++++++++++
.../connector/PipeConnectorSubtaskManager.java | 5 +
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 3 +
.../builtin/connector/WebSocketConnector.java | 4 +-
...ocketConnector.java => WriteBackConnector.java} | 6 +-
12 files changed, 230 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index b819ddace09..80b9f92042b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -25,7 +25,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -49,7 +49,7 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
return insertNode;
}
- public Statement constructStatement() {
+ public InsertBaseStatement constructStatement() {
if (insertNode instanceof InsertRowNode) {
final InsertRowNode node = (InsertRowNode) insertNode;
@@ -87,6 +87,16 @@ public class PipeTransferTabletInsertNodeReq extends
TPipeTransferReq {
insertNode));
}
+ /////////////////////////////// WriteBack ///////////////////////////////
+
+ public static PipeTransferTabletInsertNodeReq
toTPipeTransferRawReq(InsertNode insertNode) {
+ final PipeTransferTabletInsertNodeReq req = new
PipeTransferTabletInsertNodeReq();
+
+ req.insertNode = insertNode;
+
+ return req;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletInsertNodeReq toTPipeTransferReq(InsertNode
insertNode) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 4908d26d695..8540a9a5a85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -200,6 +200,17 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return sortedBitMap;
}
+ /////////////////////////////// WriteBack ///////////////////////////////
+
+ public static PipeTransferTabletRawReq toTPipeTransferRawReq(Tablet tablet,
boolean isAligned) {
+ final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
+
+ tabletReq.tablet = tablet;
+ tabletReq.isAligned = isAligned;
+
+ return tabletReq;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletRawReq toTPipeTransferReq(Tablet tablet,
boolean isAligned)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 854355ad916..547fe8f60f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -100,17 +100,14 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
empty -> {
try {
// Ensure the sink doesn't point to the air gap receiver on
DataNode itself
-
- if (!pipeConfig.getPipeAirGapReceiverEnabled()) {
- return true;
- }
-
- return !NodeUrlUtils.containsLocalAddress(
- givenNodeUrls.stream()
- .filter(
- tEndPoint -> tEndPoint.getPort() ==
pipeConfig.getPipeAirGapReceiverPort())
- .map(TEndPoint::getIp)
- .collect(Collectors.toList()));
+ return !(pipeConfig.getPipeAirGapReceiverEnabled()
+ && NodeUrlUtils.containsLocalAddress(
+ givenNodeUrls.stream()
+ .filter(
+ tEndPoint ->
+ tEndPoint.getPort() ==
pipeConfig.getPipeAirGapReceiverPort())
+ .map(TEndPoint::getIp)
+ .collect(Collectors.toList())));
} catch (UnknownHostException e) {
LOGGER.warn("Unknown host when checking pipe sink IP.", e);
return false;
@@ -318,7 +315,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
@Override
public void transfer(Event event) {
if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn("IoTDBAirGapConnector does not support transfer generic
event: {}.", event);
+ LOGGER.warn("IoTDBAirGapConnector does not support transferring generic
event: {}.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index e61dd3428ef..cf46b975e01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -278,7 +278,8 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
@Override
public void transfer(Event event) throws Exception {
if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn("IoTDBLegacyPipeConnector does not support transfer generic
event: {}.", event);
+ LOGGER.warn(
+ "IoTDBLegacyPipeConnector does not support transferring generic
event: {}.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 82966915c7d..b3e43b86101 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -360,7 +360,8 @@ public class IoTDBThriftAsyncConnector extends
IoTDBConnector {
transferBatchedEventsIfNecessary();
if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic
event: {}.", event);
+ LOGGER.warn(
+ "IoTDBThriftAsyncConnector does not support transferring generic
event: {}.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index 15041bdf09d..b6129f4acaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -307,7 +307,8 @@ public class IoTDBThriftSyncConnector extends
IoTDBConnector {
}
if (!(event instanceof PipeHeartbeatEvent)) {
- LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic
event: {}.", event);
+ LOGGER.warn(
+ "IoTDBThriftSyncConnector does not support transferring generic
event: {}.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 3329fd3bcf1..0bdc67498ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -62,7 +62,9 @@ public class WebSocketConnector implements PipeConnector {
new PriorityQueue<>(Comparator.comparing(o -> o.left));
@Override
- public void validate(PipeParameterValidator validator) throws Exception {}
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // Do nothing
+ }
@Override
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
new file mode 100644
index 00000000000..eb262704c68
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.writeback;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.PipeConnector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+
+public class WriteBackConnector implements PipeConnector {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WriteBackConnector.class);
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void heartbeat() throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ // PipeProcessor can change the type of TabletInsertionEvent
+ if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+ && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ LOGGER.warn(
+ "WriteBackConnector only support "
+ + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent. "
+ + "Ignore {}.",
+ tabletInsertionEvent);
+ return;
+ }
+
+ if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ transfer(
+ ((PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent).parseEventWithPattern());
+ } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+ transfer(((PipeRawTabletInsertionEvent)
tabletInsertionEvent).parseEventWithPattern());
+ }
+ return;
+ }
+
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+ } else {
+ doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ }
+ }
+
+ @Override
+ public void transfer(Event event) throws Exception {
+ if (!(event instanceof PipeHeartbeatEvent)) {
+ LOGGER.warn("WriteBackConnector does not support transferring generic
event: {}.", event);
+ }
+ }
+
+ private void doTransfer(PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
+ throws PipeException, WALPipeException {
+ final TSStatus status =
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible()
== null
+ ? PipeAgent.receiver()
+ .thrift()
+ .receive(
+ PipeTransferTabletBinaryReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer()),
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance())
+ .getStatus()
+ : executeStatement(
+ PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode())
+ .constructStatement());
+
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Transfer PipeInsertNodeTabletInsertionEvent %s error, result
status %s",
+ pipeInsertNodeTabletInsertionEvent, status));
+ }
+ }
+
+ private void doTransfer(PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
+ throws PipeException {
+ final TSStatus status =
+ executeStatement(
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned())
+ .constructStatement());
+
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(
+ String.format(
+ "Transfer PipeRawTabletInsertionEvent %s error, result status
%s",
+ pipeRawTabletInsertionEvent, status));
+ }
+ }
+
+ private TSStatus executeStatement(InsertBaseStatement statement) {
+ return Coordinator.getInstance()
+ .execute(
+ new PipeEnrichedInsertBaseStatement(statement),
+ SessionManager.getInstance().requestQueryId(),
+ new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault().getId()),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+ .status;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Do nothing
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 52fbf18aef4..1c07e78f029 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.pipe.connector.protocol.opcua.OpcUaConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.websocket.WebSocketConnector;
+import
org.apache.iotdb.db.pipe.connector.protocol.writeback.WriteBackConnector;
import
org.apache.iotdb.db.pipe.execution.executor.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -202,6 +203,8 @@ public class PipeConnectorSubtaskManager {
BuiltinPipePlugin.OPC_UA_CONNECTOR.getPipePluginName(),
OpcUaConnector::new);
CONNECTOR_CONSTRUCTORS.put(
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName(),
DoNothingConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.WRITE_BACK_CONNECTOR.getPipePluginName(),
WriteBackConnector::new);
CONNECTOR_CONSTRUCTORS.put(
BuiltinPipePlugin.IOTDB_THRIFT_SINK.getPipePluginName(),
IoTDBThriftAsyncConnector::new);
@@ -222,6 +225,8 @@ public class PipeConnectorSubtaskManager {
BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName(),
OpcUaConnector::new);
CONNECTOR_CONSTRUCTORS.put(
BuiltinPipePlugin.DO_NOTHING_SINK.getPipePluginName(),
DoNothingConnector::new);
+ CONNECTOR_CONSTRUCTORS.put(
+ BuiltinPipePlugin.WRITE_BACK_SINK.getPipePluginName(),
WriteBackConnector::new);
}
private static class PipeSubtaskManagerHolder {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index f702179004a..70814c34b45 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnect
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftSyncConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.OpcUaConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WebSocketConnector;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.WriteBackConnector;
import org.apache.iotdb.commons.pipe.plugin.builtin.extractor.IoTDBExtractor;
import
org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
@@ -52,6 +53,7 @@ public enum BuiltinPipePlugin {
IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector",
IoTDBAirGapConnector.class),
WEBSOCKET_CONNECTOR("websocket-connector", WebSocketConnector.class),
OPC_UA_CONNECTOR("opc-ua-connector", OpcUaConnector.class),
+ WRITE_BACK_CONNECTOR("write-back-connector", WriteBackConnector.class),
DO_NOTHING_SINK("do-nothing-sink", DoNothingConnector.class),
IOTDB_THRIFT_SINK("iotdb-thrift-sink", IoTDBThriftConnector.class),
@@ -61,6 +63,7 @@ public enum BuiltinPipePlugin {
IOTDB_AIR_GAP_SINK("iotdb-air-gap-sink", IoTDBAirGapConnector.class),
WEBSOCKET_SINK("websocket-sink", WebSocketConnector.class),
OPC_UA_SINK("opc-ua-sink", OpcUaConnector.class),
+ WRITE_BACK_SINK("write-back-sink", WriteBackConnector.class),
;
private final String pipePluginName;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
index a9d3a8ccf9e..dd69ff3b484 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
/**
- * This class is a placeholder and should not be initialized. It represents
the IoTDB WebSocket
+ * This class is a placeholder and should not be initialized. It represents
the Web Socket
* connector. There is a real implementation in the server module but cannot
be imported here. The
* pipe agent in the server module will replace this class with the real
implementation when
- * initializing the IoTDB Thrift connector.
+ * initializing the Web Socket connector.
*/
public class WebSocketConnector extends PlaceholderConnector {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WriteBackConnector.java
similarity index 89%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WriteBackConnector.java
index a9d3a8ccf9e..44efbd9ab74 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WebSocketConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/WriteBackConnector.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
/**
- * This class is a placeholder and should not be initialized. It represents
the IoTDB WebSocket
+ * This class is a placeholder and should not be initialized. It represents
the Write Back
* connector. There is a real implementation in the server module but cannot
be imported here. The
* pipe agent in the server module will replace this class with the real
implementation when
- * initializing the IoTDB Thrift connector.
+ * initializing the Write Back connector.
*/
-public class WebSocketConnector extends PlaceholderConnector {}
+public class WriteBackConnector extends PlaceholderConnector {}