This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new ee0b463e189 [IOTDB-6118] Pipe: Support transfer data through air gap
(IoTDBAirGapConnector) (#10883)
ee0b463e189 is described below
commit ee0b463e189aead00ee5399a89b57b3975f2dfac
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 17 14:53:26 2023 +0800
[IOTDB-6118] Pipe: Support transfer data through air gap
(IoTDBAirGapConnector) (#10883)
---
.../src/main/java/org/apache/iotdb/SSLClient.java | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 +-
.../db/pipe/agent/receiver/PipeReceiverAgent.java | 70 ++---
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 6 +-
.../config/constant/PipeConnectorConstant.java | 4 +
.../payload/airgap/AirGapOneByteResponse.java} | 8 +-
.../airgap/AirGapPseudoTPipeTransferRequest.java} | 6 +-
.../request/PipeTransferFilePieceReq.java | 22 +-
.../evolvable/request/PipeTransferFileSealReq.java | 22 +-
.../request/PipeTransferHandshakeReq.java | 20 +-
.../request/PipeTransferInsertNodeReq.java | 24 +-
.../evolvable/request/PipeTransferTabletReq.java | 111 ++++----
...TDBThriftConnector.java => IoTDBConnector.java} | 6 +-
...sion.java => IoTDBConnectorRequestVersion.java} | 6 +-
.../protocol/airgap/IoTDBAirGapConnector.java | 315 ++++++++++++++++++++-
.../thrift/async/IoTDBThriftAsyncConnector.java | 9 +-
.../thrift/sync/IoTDBThriftSyncConnector.java | 33 ++-
.../pipe/receiver/airgap/IoTDBAirGapReceiver.java | 180 +++++++++++-
.../receiver/airgap/IoTDBAirGapReceiverAgent.java | 97 +++++++
...iver.java => IoTDBLegacyPipeReceiverAgent.java} | 20 +-
.../pipe/receiver/thrift/IoTDBThriftReceiver.java | 4 +-
.../thrift/IoTDBThriftReceiverAgent.java} | 38 +--
.../receiver/thrift/IoTDBThriftReceiverV1.java | 22 +-
.../connector/PipeConnectorSubtaskManager.java | 4 +
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 14 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 4 +
.../resources/conf/iotdb-common.properties | 7 +
.../iotdb/commons/concurrent/ThreadName.java | 2 +
.../apache/iotdb/commons/conf/CommonConfig.java | 19 ++
.../iotdb/commons/conf/CommonDescriptor.java | 11 +
.../iotdb/commons/pipe/config/PipeConfig.java | 20 ++
.../pipe/plugin/builtin/BuiltinPipePlugin.java | 2 +
.../builtin/connector/IoTDBAirGapConnector.java} | 23 +-
.../connector/IoTDBLegacyPipeConnector.java | 54 +---
.../builtin/connector/IoTDBThriftConnector.java | 54 +---
...ipeConnector.java => PlaceholderConnector.java} | 12 +-
.../apache/iotdb/commons/service/ServiceType.java | 1 +
.../org/apache/iotdb/tsfile/utils/BytesUtils.java | 4 +-
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 1 +
39 files changed, 917 insertions(+), 362 deletions(-)
diff --git
a/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java
b/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java
index 8366fa507b3..e4b9f393db6 100644
--- a/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java
+++ b/example/rest-java-example/src/main/java/org/apache/iotdb/SSLClient.java
@@ -40,7 +40,7 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
public class SSLClient {
- private static Logger logger = LoggerFactory.getLogger(SSLClient.class);
+ private static final Logger logger =
LoggerFactory.getLogger(SSLClient.class);
private static SSLConnectionSocketFactory sslConnectionSocketFactory = null;
private static PoolingHttpClientConnectionManager
poolingHttpClientConnectionManager = null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c85043f011c..061c290f12d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -92,22 +92,22 @@ public class IoTDBConfig {
public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
public static final String SYSTEM_DATABASE = "root.__system";
- /** whether to enable the mqtt service. */
+ /** Whether to enable the mqtt service. */
private boolean enableMQTTService = false;
- /** the mqtt service binding host. */
+ /** The mqtt service binding host. */
private String mqttHost = "127.0.0.1";
- /** the mqtt service binding port. */
+ /** The mqtt service binding port. */
private int mqttPort = 1883;
- /** the handler pool size for handing the mqtt messages. */
+ /** The handler pool size for handing the mqtt messages. */
private int mqttHandlerPoolSize = 1;
- /** the mqtt message payload formatter. */
+ /** The mqtt message payload formatter. */
private String mqttPayloadFormatter = "json";
- /** max mqtt message size. Unit: byte */
+ /** Max mqtt message size. Unit: byte */
private int mqttMaxMessageSize = 1048576;
/** Rpc binding address. */
@@ -1075,7 +1075,7 @@ public class IoTDBConfig {
private double maxMemoryRatioForQueue = 0.6;
/** Pipe related */
- private String pipeReceiveFileDir =
+ private String pipeReceiverFileDir =
systemDir + File.separator + "pipe" + File.separator + "receiver";
/** Resource control */
@@ -1209,7 +1209,7 @@ public class IoTDBConfig {
triggerTemporaryLibDir = addDataHomeDir(triggerTemporaryLibDir);
pipeDir = addDataHomeDir(pipeDir);
pipeTemporaryLibDir = addDataHomeDir(pipeTemporaryLibDir);
- pipeReceiveFileDir = addDataHomeDir(pipeReceiveFileDir);
+ pipeReceiverFileDir = addDataHomeDir(pipeReceiverFileDir);
mqttDir = addDataHomeDir(mqttDir);
extPipeDir = addDataHomeDir(extPipeDir);
queryDir = addDataHomeDir(queryDir);
@@ -3681,12 +3681,12 @@ public class IoTDBConfig {
return modeMapSizeThreshold;
}
- public void setPipeReceiverFileDir(String pipeReceiveFileDir) {
- this.pipeReceiveFileDir = pipeReceiveFileDir;
+ public void setPipeReceiverFileDir(String pipeReceiverFileDir) {
+ this.pipeReceiverFileDir = pipeReceiverFileDir;
}
public String getPipeReceiverFileDir() {
- return pipeReceiveFileDir;
+ return pipeReceiverFileDir;
}
public boolean isQuotaEnable() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
index 5b5d0b2bd42..53967e17757 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -20,15 +20,9 @@
package org.apache.iotdb.db.pipe.agent.receiver;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.db.pipe.receiver.airgap.IoTDBAirGapReceiverAgent;
+import org.apache.iotdb.db.pipe.receiver.legacy.IoTDBLegacyPipeReceiverAgent;
+import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -37,61 +31,31 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+/** PipeReceiverAgent is the entry point of all pipe receivers' logic. */
public class PipeReceiverAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeReceiverAgent.class);
- private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new
ThreadLocal<>();
+ private final IoTDBThriftReceiverAgent thriftAgent;
+ private final IoTDBAirGapReceiverAgent airGapAgent;
+ private final IoTDBLegacyPipeReceiverAgent legacyAgent;
- public TPipeTransferResp receive(
- TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
- final byte reqVersion = req.getVersion();
- if (reqVersion ==
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
- return getReceiver(reqVersion).receive(req, partitionFetcher,
schemaFetcher);
- } else {
- return new TPipeTransferResp(
- RpcUtils.getStatus(
- TSStatusCode.PIPE_VERSION_ERROR,
- String.format("Unsupported pipe version %d", reqVersion)));
- }
+ public PipeReceiverAgent() {
+ thriftAgent = new IoTDBThriftReceiverAgent();
+ airGapAgent = new IoTDBAirGapReceiverAgent();
+ legacyAgent = new IoTDBLegacyPipeReceiverAgent();
}
- private IoTDBThriftReceiver getReceiver(byte reqVersion) {
- if (receiverThreadLocal.get() == null) {
- return setAndGetReceiver(reqVersion);
- }
-
- final byte receiverThreadLocalVersion =
receiverThreadLocal.get().getVersion().getVersion();
- if (receiverThreadLocalVersion != reqVersion) {
- LOGGER.warn(
- "The receiver version {} is different from the sender version {},"
- + " the receiver will be reset to the sender version.",
- receiverThreadLocalVersion,
- reqVersion);
- receiverThreadLocal.get().handleExit();
- receiverThreadLocal.remove();
- return setAndGetReceiver(reqVersion);
- }
-
- return receiverThreadLocal.get();
+ public IoTDBThriftReceiverAgent thrift() {
+ return thriftAgent;
}
- private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
- if (reqVersion ==
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
- receiverThreadLocal.set(new IoTDBThriftReceiverV1());
- } else {
- throw new UnsupportedOperationException(
- String.format("Unsupported pipe version %d", reqVersion));
- }
- return receiverThreadLocal.get();
+ public IoTDBAirGapReceiverAgent airGap() {
+ return airGapAgent;
}
- public void handleClientExit() {
- final IoTDBThriftReceiver receiver = receiverThreadLocal.get();
- if (receiver != null) {
- receiver.handleExit();
- receiverThreadLocal.remove();
- }
+ public IoTDBLegacyPipeReceiverAgent legacy() {
+ return legacyAgent;
}
public void cleanPipeReceiverDir() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 5112d1e730d..d21b7e270df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -52,8 +52,12 @@ public class PipeRuntimeAgent implements IService {
public synchronized void preparePipeResources(
ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
- PipeAgent.receiver().cleanPipeReceiverDir();
+ // clean sender (connector) hardlink file dir
PipeHardlinkFileDirStartupCleaner.clean();
+
+ // clean receiver file dir
+ PipeAgent.receiver().cleanPipeReceiverDir();
+
PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
simpleConsensusProgressIndexAssigner.start();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 24bd5a51e75..96477c3a3aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -33,6 +33,10 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_PASSWORD_KEY =
"connector.password";
public static final String CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE = "root";
+ public static final String CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY =
+ "connector.air-gap.handshake-timeout-ms";
+ public static final int CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE
= 5000;
+
public static final String CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY =
"connector.version";
public static final String
CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE = "1.1";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
similarity index 79%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
index 2c70bd57107..b9b3bbaf086 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapOneByteResponse.java
@@ -17,6 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.receiver.airgap;
+package org.apache.iotdb.db.pipe.connector.payload.airgap;
-public class IoTDBAirGapReceiver {}
+public class AirGapOneByteResponse {
+
+ public static final byte[] OK = new byte[] {0};
+ public static final byte[] FAIL = new byte[] {(byte) 0xFF};
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapPseudoTPipeTransferRequest.java
similarity index 80%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapPseudoTPipeTransferRequest.java
index 2c70bd57107..5441176077d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/airgap/AirGapPseudoTPipeTransferRequest.java
@@ -17,6 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.receiver.airgap;
+package org.apache.iotdb.db.pipe.connector.payload.airgap;
-public class IoTDBAirGapReceiver {}
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+public class AirGapPseudoTPipeTransferRequest extends TPipeTransferReq {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
index ead6139d4e8..12f7843d906 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -54,6 +54,8 @@ public class PipeTransferFilePieceReq extends
TPipeTransferReq {
return filePiece;
}
+ /////////////////////////////// Thrift ///////////////////////////////
+
public static PipeTransferFilePieceReq toTPipeTransferReq(
String fileName, long startWritingOffset, byte[] filePiece) throws
IOException {
final PipeTransferFilePieceReq filePieceReq = new
PipeTransferFilePieceReq();
@@ -62,7 +64,7 @@ public class PipeTransferFilePieceReq extends
TPipeTransferReq {
filePieceReq.startWritingOffset = startWritingOffset;
filePieceReq.filePiece = filePiece;
- filePieceReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+ filePieceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
@@ -90,6 +92,22 @@ public class PipeTransferFilePieceReq extends
TPipeTransferReq {
return filePieceReq;
}
+ /////////////////////////////// Air Gap ///////////////////////////////
+ public static byte[] toTPipeTransferBytes(
+ String fileName, long startWritingOffset, byte[] filePiece) throws
IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_PIECE.getType(),
outputStream);
+ ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(startWritingOffset, outputStream);
+ ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
index b77fa735045..e9dc715bdd1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -47,6 +47,8 @@ public class PipeTransferFileSealReq extends TPipeTransferReq
{
return fileLength;
}
+ /////////////////////////////// Thrift ///////////////////////////////
+
public static PipeTransferFileSealReq toTPipeTransferReq(String fileName,
long fileLength)
throws IOException {
final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
@@ -54,7 +56,7 @@ public class PipeTransferFileSealReq extends TPipeTransferReq
{
fileSealReq.fileName = fileName;
fileSealReq.fileLength = fileLength;
- fileSealReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+ fileSealReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
@@ -80,6 +82,22 @@ public class PipeTransferFileSealReq extends
TPipeTransferReq {
return fileSealReq;
}
+ /////////////////////////////// Air Gap ///////////////////////////////
+
+ public static byte[] toTPipeTransferFileSealBytes(String fileName, long
fileLength)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_SEAL.getType(),
outputStream);
+ ReadWriteIOUtils.write(fileName, outputStream);
+ ReadWriteIOUtils.write(fileLength, outputStream);
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
index 1fe2123a425..11d6ce7ef4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -42,13 +42,15 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
return timestampPrecision;
}
+ /////////////////////////////// Thrift ///////////////////////////////
+
public static PipeTransferHandshakeReq toTPipeTransferReq(String
timestampPrecision)
throws IOException {
final PipeTransferHandshakeReq handshakeReq = new
PipeTransferHandshakeReq();
handshakeReq.timestampPrecision = timestampPrecision;
- handshakeReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+ handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
@@ -72,6 +74,20 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
return handshakeReq;
}
+ /////////////////////////////// Air Gap ///////////////////////////////
+
+ public static byte[] toTransferHandshakeBytes(String timestampPrecision)
throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(),
outputStream);
+ ReadWriteIOUtils.write(timestampPrecision, outputStream);
+ return byteArrayOutputStream.toByteArray();
+ }
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
index 7c375dbc6d4..c3da0985b45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -29,7 +29,12 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Objects;
public class PipeTransferInsertNodeReq extends TPipeTransferReq {
@@ -82,12 +87,14 @@ public class PipeTransferInsertNodeReq extends
TPipeTransferReq {
insertNode));
}
+ /////////////////////////////// Thrift ///////////////////////////////
+
public static PipeTransferInsertNodeReq toTPipeTransferReq(InsertNode
insertNode) {
final PipeTransferInsertNodeReq req = new PipeTransferInsertNodeReq();
req.insertNode = insertNode;
- req.version = IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
+ req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
req.type = PipeRequestType.TRANSFER_INSERT_NODE.getType();
req.body = insertNode.serializeToByteBuffer();
@@ -106,6 +113,19 @@ public class PipeTransferInsertNodeReq extends
TPipeTransferReq {
return insertNodeReq;
}
+ /////////////////////////////// Air Gap ///////////////////////////////
+ public static byte[] toTransferInsertNodeBytes(InsertNode insertNode) throws
IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_INSERT_NODE.getType(),
outputStream);
+ return BytesUtils.concatByteArray(
+ byteArrayOutputStream.toByteArray(),
insertNode.serializeToByteBuffer().array());
+ }
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
index 654aac9cb53..58e64520133 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
@@ -22,7 +22,7 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -36,7 +36,6 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,23 +53,32 @@ public class PipeTransferTabletReq extends TPipeTransferReq
{
private Tablet tablet;
private boolean isAligned;
- public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet,
boolean isAligned)
- throws IOException {
- final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+ public InsertTabletStatement constructStatement() {
+ if (!checkSorted(tablet)) {
+ sortTablet(tablet);
+ }
- tabletReq.tablet = tablet;
+ try {
+ final TSInsertTabletReq request = new TSInsertTabletReq();
- tabletReq.version =
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion();
- tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
- try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
- final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- tablet.serialize(outputStream);
- ReadWriteIOUtils.write(isAligned, outputStream);
- tabletReq.body =
- ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
- }
+ for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+ request.addToMeasurements(measurementSchema.getMeasurementId());
+ request.addToTypes(measurementSchema.getType().ordinal());
+ }
- return tabletReq;
+ request.setPrefixPath(tablet.deviceId);
+ request.setIsAligned(isAligned);
+ request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
+ request.setValues(SessionUtils.getValueBuffer(tablet));
+ request.setSize(tablet.rowSize);
+ request.setMeasurements(
+
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
+
+ return StatementGenerator.createStatement(request);
+ } catch (MetadataException e) {
+ LOGGER.warn(String.format("Generate Statement from tablet %s error.",
tablet), e);
+ return null;
+ }
}
private static boolean checkSorted(Tablet tablet) {
@@ -97,25 +105,12 @@ public class PipeTransferTabletReq extends
TPipeTransferReq {
int columnIndex = 0;
for (int i = 0; i < tablet.getSchemas().size(); i++) {
IMeasurementSchema schema = tablet.getSchemas().get(i);
- if (schema instanceof MeasurementSchema) {
+ if (schema != null) {
tablet.values[columnIndex] = sortList(tablet.values[columnIndex],
schema.getType(), index);
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
tablet.bitMaps[columnIndex] =
sortBitMap(tablet.bitMaps[columnIndex], index);
}
columnIndex++;
- } else {
- int measurementSize = schema.getSubMeasurementsList().size();
- for (int j = 0; j < measurementSize; j++) {
- tablet.values[columnIndex] =
- sortList(
- tablet.values[columnIndex],
- schema.getSubMeasurementsTSDataTypeList().get(j),
- index);
- if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
- tablet.bitMaps[columnIndex] =
sortBitMap(tablet.bitMaps[columnIndex], index);
- }
- columnIndex++;
- }
}
}
}
@@ -196,6 +191,27 @@ public class PipeTransferTabletReq extends
TPipeTransferReq {
return sortedBitMap;
}
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ public static PipeTransferTabletReq toTPipeTransferReq(Tablet tablet,
boolean isAligned)
+ throws IOException {
+ final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
+
+ tabletReq.tablet = tablet;
+
+ tabletReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+ tabletReq.type = PipeRequestType.TRANSFER_TABLET.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ tablet.serialize(outputStream);
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ tabletReq.body =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ return tabletReq;
+ }
+
public static PipeTransferTabletReq fromTPipeTransferReq(TPipeTransferReq
transferReq) {
final PipeTransferTabletReq tabletReq = new PipeTransferTabletReq();
@@ -209,31 +225,16 @@ public class PipeTransferTabletReq extends
TPipeTransferReq {
return tabletReq;
}
- public InsertTabletStatement constructStatement() {
- if (!checkSorted(tablet)) {
- sortTablet(tablet);
- }
-
- try {
- final TSInsertTabletReq request = new TSInsertTabletReq();
-
- for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
- request.addToMeasurements(measurementSchema.getMeasurementId());
- request.addToTypes(measurementSchema.getType().ordinal());
- }
-
- request.setPrefixPath(tablet.deviceId);
- request.setIsAligned(isAligned);
- request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
- request.setValues(SessionUtils.getValueBuffer(tablet));
- request.setSize(tablet.rowSize);
- request.setMeasurements(
-
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
-
- return StatementGenerator.createStatement(request);
- } catch (MetadataException e) {
- LOGGER.warn(String.format("Generate Statement from tablet %s error.",
tablet), e);
- return null;
+ /////////////////////////////// Air Gap ///////////////////////////////
+ public static byte[] toTPipeTransferTabletBytes(Tablet tablet, boolean
isAligned)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET.getType(),
outputStream);
+ tablet.serialize(outputStream);
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ return byteArrayOutputStream.toByteArray();
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
similarity index 95%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnector.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index f369d80840b..3b218c60e04 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+package org.apache.iotdb.db.pipe.connector.protocol;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -39,9 +39,9 @@ import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
-public abstract class IoTDBThriftConnector implements PipeConnector {
+public abstract class IoTDBConnector implements PipeConnector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftConnector.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConnector.class);
protected final List<TEndPoint> nodeUrls = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
similarity index 85%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
index 29e59abc304..941a372a498 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
@@ -17,15 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+package org.apache.iotdb.db.pipe.connector.protocol;
-public enum IoTDBThriftConnectorRequestVersion {
+public enum IoTDBConnectorRequestVersion {
VERSION_1((byte) 1),
;
private final byte version;
- IoTDBThriftConnectorRequestVersion(byte type) {
+ IoTDBConnectorRequestVersion(byte type) {
this.version = type;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 928e8eebbb1..8a0e11f5ebf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -19,39 +19,332 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;
-import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class IoTDBAirGapConnector implements PipeConnector {
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.CRC32;
+
+import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE;
+import static
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY;
+
+public class IoTDBAirGapConnector extends IoTDBConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBAirGapConnector.class);
- @Override
- public void validate(PipeParameterValidator validator) throws Exception {}
+ private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
+
+ private final List<Socket> sockets = new ArrayList<>();
+ private final List<Boolean> isSocketAlive = new ArrayList<>();
+
+ private int handshakeTimeoutMs;
+
+ private long currentClientIndex = 0;
@Override
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
- throws Exception {}
+ throws Exception {
+ super.customize(parameters, configuration);
+ for (int i = 0; i < nodeUrls.size(); i++) {
+ isSocketAlive.add(false);
+ sockets.add(null);
+ }
+
+ handshakeTimeoutMs =
+ parameters.getIntOrDefault(
+ CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_KEY,
+ CONNECTOR_AIR_GAP_HANDSHAKE_TIMEOUT_MS_DEFAULT_VALUE);
+ LOGGER.info(
+ "IoTDBAirGapConnector is customized with handshakeTimeoutMs: {}.",
handshakeTimeoutMs);
+ }
@Override
- public void handshake() throws Exception {}
+ public void handshake() throws Exception {
+ for (int i = 0; i < sockets.size(); i++) {
+ if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
+ continue;
+ }
+
+ final String ip = nodeUrls.get(i).getIp();
+ final int port = nodeUrls.get(i).getPort();
+
+ // close the socket if necessary
+ if (sockets.get(i) != null) {
+ try {
+ sockets.set(i, null).close();
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to close socket with target server ip: {}, port: {},
because: {}. Ignore it.",
+ ip,
+ port,
+ e.getMessage());
+ }
+ }
+
+ final Socket socket = new Socket();
+
+ try {
+ socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs);
+ socket.setKeepAlive(true);
+ socket.setSoTimeout(handshakeTimeoutMs);
+ sockets.set(i, socket);
+ LOGGER.info("Successfully connected to target server ip: {}, port:
{}.", ip, port);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to connect to target server ip: {}, port: {}, because: {}.
Ignore it.",
+ ip,
+ port,
+ e.getMessage());
+ continue;
+ }
+
+ if (!send(
+ socket,
+ PipeTransferHandshakeReq.toTransferHandshakeBytes(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
+ throw new PipeException("Handshake error with target server ip: " + ip
+ ", port: " + port);
+ } else {
+ isSocketAlive.set(i, true);
+ socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTimeoutMs());
+ LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip,
port);
+ }
+ }
+
+ for (int i = 0; i < sockets.size(); i++) {
+ if (Boolean.TRUE.equals(isSocketAlive.get(i))) {
+ return;
+ }
+ }
+ throw new PipeConnectionException(
+ String.format("All target servers %s are not available.", nodeUrls));
+ }
@Override
- public void heartbeat() throws Exception {}
+ public void heartbeat() {
+ try {
+ handshake();
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to reconnect to target server, because: {}. Try to reconnect
later.",
+ e.getMessage(),
+ e);
+ }
+ }
@Override
- public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {}
+ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws
Exception {
+ // PipeProcessor can change the type of TabletInsertionEvent
+
+ final int socketIndex = nextSocketIndex();
+ final Socket socket = sockets.get(socketIndex);
+
+ try {
+ if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+ doTransfer(socket, (PipeInsertNodeTabletInsertionEvent)
tabletInsertionEvent);
+ } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+ doTransfer(socket, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
+ } else {
+ LOGGER.warn(
+ "IoTDBAirGapConnector only support "
+ + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent. "
+ + "Ignore {}.",
+ tabletInsertionEvent);
+ }
+ } catch (IOException e) {
+ isSocketAlive.set(socketIndex, false);
+
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tablet insertion event %s, because
%s.",
+ tabletInsertionEvent, e.getMessage()),
+ e);
+ }
+ }
@Override
- public void transfer(Event event) throws Exception {}
+ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
+ // PipeProcessor can change the type of tsFileInsertionEvent
+ if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
+ LOGGER.warn(
+ "IoTDBAirGapConnector only support PipeTsFileInsertionEvent. Ignore
{}.",
+ tsFileInsertionEvent);
+ return;
+ }
+
+ final int socketIndex = nextSocketIndex();
+ final Socket socket = sockets.get(socketIndex);
+
+ try {
+ doTransfer(socket, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
+ } catch (IOException e) {
+ isSocketAlive.set(socketIndex, false);
+
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer tsfile insertion event %s, because
%s.",
+ tsFileInsertionEvent, e.getMessage()),
+ e);
+ }
+ }
+
+ @Override
+ public void transfer(Event event) {
+ LOGGER.warn("IoTDBAirGapConnector does not support transfer generic event:
{}.", event);
+ }
+
+ private void doTransfer(
+ Socket socket, PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
+ throws PipeException, WALPipeException, IOException {
+ if (!send(
+ socket,
+ PipeTransferInsertNodeReq.toTransferInsertNodeBytes(
+ pipeInsertNodeTabletInsertionEvent.getInsertNode()))) {
+ throw new PipeException(
+ String.format(
+ "Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket:
%s",
+ pipeInsertNodeTabletInsertionEvent, socket));
+ }
+ }
+
+ private void doTransfer(Socket socket, PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
+ throws PipeException, IOException {
+ if (!send(
+ socket,
+ PipeTransferTabletReq.toTPipeTransferTabletBytes(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned()))) {
+ throw new PipeException(
+ String.format(
+ "Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
+ pipeRawTabletInsertionEvent, socket));
+ }
+ }
+
+ private void doTransfer(Socket socket, PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
+ throws PipeException, InterruptedException, IOException {
+ pipeTsFileInsertionEvent.waitForTsFileClose();
+
+ final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+
+ // 1. Transfer file piece by piece
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final byte[] readBuffer = new byte[readFileBufferSize];
+ long position = 0;
+ try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
+ while (true) {
+ final int readLength = reader.read(readBuffer);
+ if (readLength == -1) {
+ break;
+ }
+
+ if (!send(
+ socket,
+ PipeTransferFilePieceReq.toTPipeTransferBytes(
+ tsFile.getName(),
+ position,
+ readLength == readFileBufferSize
+ ? readBuffer
+ : Arrays.copyOfRange(readBuffer, 0, readLength)))) {
+ throw new PipeException(
+ String.format("Transfer file %s error. Socket %s.", tsFile,
socket));
+ } else {
+ position += readLength;
+ }
+ }
+ }
+
+ // 2. Transfer file seal signal, which means the file is transferred
completely
+ if (!send(
+ socket,
+ PipeTransferFileSealReq.toTPipeTransferFileSealBytes(tsFile.getName(),
tsFile.length()))) {
+ throw new PipeException(String.format("Seal file %s error. Socket %s.",
tsFile, socket));
+ } else {
+ LOGGER.info("Successfully transferred file {}.", tsFile);
+ }
+ }
+
+ private int nextSocketIndex() {
+ final int socketSize = sockets.size();
+ // Round-robin, find the next alive client
+ for (int tryCount = 0; tryCount < socketSize; ++tryCount) {
+ final int clientIndex = (int) (currentClientIndex++ % socketSize);
+ if (Boolean.TRUE.equals(isSocketAlive.get(clientIndex))) {
+ return clientIndex;
+ }
+ }
+ throw new PipeConnectionException(
+ "All sockets are dead, please check the connection to the receiver.");
+ }
+
+ private boolean send(Socket socket, byte[] bytes) throws IOException {
+ if (!socket.isConnected()) {
+ return false;
+ }
+
+ final BufferedOutputStream outputStream = new
BufferedOutputStream(socket.getOutputStream());
+ outputStream.write(enrichWithLengthAndChecksum(bytes));
+ outputStream.flush();
+
+ final byte[] response = new byte[1];
+ final int size = socket.getInputStream().read(response);
+ return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
+ }
+
+ private byte[] enrichWithLengthAndChecksum(byte[] bytes) {
+ // length of checksum and bytes payload
+ final byte[] length = BytesUtils.intToBytes(bytes.length + LONG_LEN);
+
+ final CRC32 crc32 = new CRC32();
+ crc32.update(bytes, 0, bytes.length);
+
+ // double length as simple checksum
+ return BytesUtils.concatByteArrayList(
+ Arrays.asList(length, length,
BytesUtils.longToBytes(crc32.getValue()), bytes));
+ }
@Override
- public void close() throws Exception {}
+ public void close() {
+ for (int i = 0; i < sockets.size(); ++i) {
+ try {
+ if (sockets.get(i) != null) {
+ sockets.set(i, null).close();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close client {}.", i, e);
+ } finally {
+ isSocketAlive.set(i, false);
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 28f6e9b10f8..a7ee768226b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferRawTabletInsertionEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
@@ -63,13 +63,14 @@ import java.util.Comparator;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-public class IoTDBThriftAsyncConnector extends IoTDBThriftConnector {
+public class IoTDBThriftAsyncConnector extends IoTDBConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
@@ -87,8 +88,8 @@ public class IoTDBThriftAsyncConnector extends
IoTDBThriftConnector {
private static final int RETRY_TRIGGER_INTERVAL_MINUTES = 1;
private final AtomicReference<Future<?>> retryTriggerFuture = new
AtomicReference<>();
private final IoTDBThriftSyncConnector retryConnector = new
IoTDBThriftSyncConnector();
- private final PriorityQueue<Pair<Long, Event>> retryEventQueue =
- new PriorityQueue<>(Comparator.comparing(o -> o.left));
+ private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue =
+ new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
private final AtomicLong commitIdGenerator = new AtomicLong(0);
private final AtomicLong lastCommitId = new AtomicLong(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index a025f1742ae..e682247cc66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -29,7 +28,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnector;
+import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -55,7 +54,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-public class IoTDBThriftSyncConnector extends IoTDBThriftConnector {
+public class IoTDBThriftSyncConnector extends IoTDBConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftSyncConnector.class);
@@ -70,10 +69,6 @@ public class IoTDBThriftSyncConnector extends
IoTDBThriftConnector {
// Do nothing
}
- public IoTDBThriftSyncConnector(String ipAddress, int port) {
- nodeUrls.add(new TEndPoint(ipAddress, port));
- }
-
@Override
public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
throws Exception {
@@ -87,7 +82,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBThriftConnector {
@Override
public void handshake() throws Exception {
for (int i = 0; i < clients.size(); i++) {
- if (isClientAlive.get(i)) {
+ if (Boolean.TRUE.equals(isClientAlive.get(i))) {
continue;
}
@@ -126,22 +121,26 @@ public class IoTDBThriftSyncConnector extends
IoTDBThriftConnector {
PipeTransferHandshakeReq.toTPipeTransferReq(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(String.format("Handshake error, result
status %s.", resp.status));
+ LOGGER.warn(
+ "Handshake error with target server ip: {}, port: {}, because:
{}.",
+ ip,
+ port,
+ resp.status);
} else {
isClientAlive.set(i, true);
LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip,
port);
}
} catch (TException e) {
- throw new PipeConnectionException(
- String.format(
- "Handshake error with target server ip: %s, port: %s, because:
%s",
- ip, port, e.getMessage()),
- e);
+ LOGGER.warn(
+ "Handshake error with target server ip: {}, port: {}, because:
{}.",
+ ip,
+ port,
+ e.getMessage());
}
}
for (int i = 0; i < clients.size(); i++) {
- if (isClientAlive.get(i)) {
+ if (Boolean.TRUE.equals(isClientAlive.get(i))) {
return;
}
}
@@ -193,7 +192,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBThriftConnector {
@Override
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws
Exception {
- // PipeProcessor can change the type of TabletInsertionEvent
+ // PipeProcessor can change the type of tsFileInsertionEvent
if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
LOGGER.warn(
"IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent.
Ignore {}.",
@@ -320,7 +319,7 @@ public class IoTDBThriftSyncConnector extends
IoTDBThriftConnector {
// Round-robin, find the next alive client
for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
final int clientIndex = (int) (currentClientIndex++ % clientSize);
- if (isClientAlive.get(clientIndex)) {
+ if (Boolean.TRUE.equals(isClientAlive.get(clientIndex))) {
return clientIndex;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
index 2c70bd57107..dd7a7d83556 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.java
@@ -19,4 +19,182 @@
package org.apache.iotdb.db.pipe.receiver.airgap;
-public class IoTDBAirGapReceiver {}
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
+import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+
+import static org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
+
+public class IoTDBAirGapReceiver extends WrappedRunnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBAirGapReceiver.class);
+
+ private final Socket socket;
+ private final long receiverId;
+
+ private final IoTDBThriftReceiverAgent agent;
+ private final IPartitionFetcher partitionFetcher;
+ private final ISchemaFetcher schemaFetcher;
+
+ public IoTDBAirGapReceiver(Socket socket, long receiverId) {
+ this.socket = socket;
+ this.receiverId = receiverId;
+
+ agent = PipeAgent.receiver().thrift();
+ partitionFetcher = ClusterPartitionFetcher.getInstance();
+ schemaFetcher = ClusterSchemaFetcher.getInstance();
+ }
+
+ @Override
+ public void runMayThrow() throws Throwable {
+ socket.setSoTimeout((int)
PipeConfig.getInstance().getPipeConnectorTimeoutMs());
+ socket.setKeepAlive(true);
+
+ LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId,
socket);
+
+ try {
+ while (!socket.isClosed()) {
+ receive();
+ }
+ LOGGER.info(
+ "Pipe air gap receiver {} closed because socket is closed. Socket:
{}",
+ receiverId,
+ socket);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Pipe air gap receiver {} closed because of exception. Socket: {}",
+ receiverId,
+ socket,
+ e);
+ throw e;
+ } finally {
+ PipeAgent.receiver().thrift().handleClientExit();
+ socket.close();
+ }
+ }
+
+ private void receive() throws IOException {
+ final InputStream inputStream = new
BufferedInputStream(socket.getInputStream());
+
+ try {
+ final byte[] data = readData(inputStream);
+
+ if (!checkSum(data)) {
+ LOGGER.warn("Checksum failed, receiverId: {}", receiverId);
+ fail();
+ return;
+ }
+
+ // Removed the used checksum
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN,
data.length - LONG_LEN);
+ // Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
+ final AirGapPseudoTPipeTransferRequest req =
+ (AirGapPseudoTPipeTransferRequest)
+ new AirGapPseudoTPipeTransferRequest()
+ .setVersion(ReadWriteIOUtils.readByte(byteBuffer))
+ .setType(ReadWriteIOUtils.readShort(byteBuffer))
+ .setBody(byteBuffer.slice());
+ final TPipeTransferResp resp = agent.receive(req, partitionFetcher,
schemaFetcher);
+
+ if (resp.getStatus().code ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ ok();
+ } else {
+ LOGGER.warn(
+ "Handle data failed, receiverId: {}, status: {}, req: {}",
+ receiverId,
+ resp.getStatus(),
+ req);
+ fail();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Exception during handling receiving, receiverId: {}",
receiverId, e);
+ fail();
+ }
+ }
+
+ private void ok() throws IOException {
+ final OutputStream outputStream = socket.getOutputStream();
+ outputStream.write(AirGapOneByteResponse.OK);
+ outputStream.flush();
+ }
+
+ private void fail() throws IOException {
+ final OutputStream outputStream = socket.getOutputStream();
+ outputStream.write(AirGapOneByteResponse.FAIL);
+ outputStream.flush();
+ }
+
+ private boolean checkSum(byte[] bytes) {
+ try {
+ final CRC32 crc32 = new CRC32();
+ crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
+ return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN))
== crc32.getValue();
+ } catch (Exception e) {
+ // ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
+ return false;
+ }
+ }
+
+ private byte[] readData(InputStream inputStream) throws IOException {
+ final int length = readLength(inputStream);
+
+ if (length == 0) {
+ // Will fail() after checkSum()
+ return new byte[0];
+ }
+
+ final ByteBuffer resultBuffer = ByteBuffer.allocate(length);
+ final byte[] readBuffer = new byte[length];
+
+ int alreadyReadBytes = 0;
+ int currentReadBytes;
+ while (alreadyReadBytes < length) {
+ currentReadBytes = inputStream.read(readBuffer, 0, length -
alreadyReadBytes);
+ resultBuffer.put(readBuffer, 0, currentReadBytes);
+ alreadyReadBytes += currentReadBytes;
+ }
+ return resultBuffer.array();
+ }
+
+ /**
+ * Read the length of the following data. The thread may typically block
here when there is no
+ * data to read.
+ */
+ private int readLength(InputStream inputStream) throws IOException {
+ byte[] lengthBytes0 = new byte[4];
+ if (inputStream.read(lengthBytes0) < 4) {
+ return 0;
+ }
+
+ // for double check
+ byte[] lengthBytes1 = new byte[4];
+ if (inputStream.read(lengthBytes1) < 4) {
+ return 0;
+ }
+
+ return Arrays.equals(lengthBytes0, lengthBytes1) ?
BytesUtils.bytesToInt(lengthBytes0) : 0;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
new file mode 100644
index 00000000000..497ae6ce834
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiverAgent.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.airgap;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class IoTDBAirGapReceiverAgent implements IService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBAirGapReceiverAgent.class);
+
+ private final ExecutorService listenExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
+ ThreadName.PIPE_RECEIVER_AIR_GAP_AGENT.getName());
+ private final AtomicBoolean allowSubmitListen = new AtomicBoolean(false);
+
+ private ServerSocket serverSocket;
+
+ private final AtomicLong receiverId = new AtomicLong(0);
+
+ public void listen() {
+ try {
+ final Socket socket = serverSocket.accept();
+ new Thread(new IoTDBAirGapReceiver(socket,
receiverId.incrementAndGet())).start();
+ } catch (IOException e) {
+ LOGGER.warn("Unhandled exception during pipe air gap receiver
listening", e);
+ }
+
+ if (allowSubmitListen.get()) {
+ listenExecutor.submit(this::listen);
+ }
+ }
+
+ @Override
+ public void start() throws StartupException {
+ try {
+ serverSocket = new
ServerSocket(PipeConfig.getInstance().getPipeAirGapReceiverPort());
+ } catch (IOException e) {
+ throw new StartupException(e);
+ }
+
+ allowSubmitListen.set(true);
+ listenExecutor.submit(this::listen);
+
+ LOGGER.info("IoTDBAirGapReceiverAgent {} started.", serverSocket);
+ }
+
+ @Override
+ public void stop() {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close IoTDBAirGapReceiverAgent's server socket",
e);
+ }
+
+ allowSubmitListen.set(false);
+ listenExecutor.shutdown();
+
+ LOGGER.info("IoTDBAirGapReceiverAgent {} stopped.", serverSocket);
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.AIR_GAP_SERVICE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
similarity index 96%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiver.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index a3c7020c71c..19dc3d51781 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -52,9 +52,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-public class IoTDBLegacyPipeReceiver {
+public class IoTDBLegacyPipeReceiverAgent {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBLegacyPipeReceiver.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBLegacyPipeReceiverAgent.class);
private static final String PATCH_SUFFIX = ".patch";
@@ -162,7 +162,7 @@ public class IoTDBLegacyPipeReceiver {
* @return {@link TSStatusCode#PIPESERVER_ERROR} if fail to receive or load;
{@link
* TSStatusCode#SUCCESS_STATUS} if load successfully.
* @throws TException The connection between the sender and the receiver has
not been established
- * by {@link IoTDBLegacyPipeReceiver#handshake}
+ * by {@link IoTDBLegacyPipeReceiverAgent#handshake}
*/
public TSStatus transportPipeData(ByteBuffer buff) throws TException {
// step1. check connection
@@ -260,7 +260,7 @@ public class IoTDBLegacyPipeReceiver {
* TSStatusCode#SYNC_FILE_REDIRECTION_ERROR} if startIndex needs to
rollback because
* mismatched; {@link TSStatusCode#SYNC_FILE_ERROR} if fail to receive
file.
* @throws TException The connection between the sender and the receiver has
not been established
- * by {@link IoTDBLegacyPipeReceiver#handshake}
+ * by {@link IoTDBLegacyPipeReceiverAgent#handshake}
*/
public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff)
throws TException {
@@ -439,16 +439,4 @@ public class IoTDBLegacyPipeReceiver {
return index;
}
}
-
- ///////////////////////// singleton /////////////////////////
-
- private IoTDBLegacyPipeReceiver() {}
-
- public static IoTDBLegacyPipeReceiver getInstance() {
- return IoTDBSyncReceiverHolder.INSTANCE;
- }
-
- private static class IoTDBSyncReceiverHolder {
- private static final IoTDBLegacyPipeReceiver INSTANCE = new
IoTDBLegacyPipeReceiver();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
index d898355bfd1..b34b5c123da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.receiver.thrift;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -27,7 +27,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
public interface IoTDBThriftReceiver {
- IoTDBThriftConnectorRequestVersion getVersion();
+ IoTDBConnectorRequestVersion getVersion();
TPipeTransferResp receive(
TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
similarity index 68%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
index 5b5d0b2bd42..848e31d3400 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
@@ -17,12 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.agent.receiver;
+package org.apache.iotdb.db.pipe.receiver.thrift;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.rpc.RpcUtils;
@@ -30,23 +27,19 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
+public class IoTDBThriftReceiverAgent {
-public class PipeReceiverAgent {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeReceiverAgent.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftReceiverAgent.class);
private final ThreadLocal<IoTDBThriftReceiver> receiverThreadLocal = new
ThreadLocal<>();
public TPipeTransferResp receive(
TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
final byte reqVersion = req.getVersion();
- if (reqVersion ==
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
+ if (reqVersion == IoTDBConnectorRequestVersion.VERSION_1.getVersion()) {
return getReceiver(reqVersion).receive(req, partitionFetcher,
schemaFetcher);
} else {
return new TPipeTransferResp(
@@ -77,7 +70,7 @@ public class PipeReceiverAgent {
}
private IoTDBThriftReceiver setAndGetReceiver(byte reqVersion) {
- if (reqVersion ==
IoTDBThriftConnectorRequestVersion.VERSION_1.getVersion()) {
+ if (reqVersion == IoTDBConnectorRequestVersion.VERSION_1.getVersion()) {
receiverThreadLocal.set(new IoTDBThriftReceiverV1());
} else {
throw new UnsupportedOperationException(
@@ -93,23 +86,4 @@ public class PipeReceiverAgent {
receiverThreadLocal.remove();
}
}
-
- public void cleanPipeReceiverDir() {
- final File receiverFileDir =
- new
File(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDir());
-
- try {
- FileUtils.deleteDirectory(receiverFileDir);
- LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
- } catch (Exception e) {
- LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
- }
-
- try {
- FileUtils.forceMkdir(receiverFileDir);
- LOGGER.info("Create pipe receiver dir {} successfully.",
receiverFileDir);
- } catch (IOException e) {
- LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 1f79ebcdf5d..2aebbd7a3e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
@@ -30,7 +31,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
-import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
+import
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -84,7 +85,9 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
return handleTransferTablet(
PipeTransferTabletReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
case TRANSFER_FILE_PIECE:
- return
handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(req));
+ return handleTransferFilePiece(
+ PipeTransferFilePieceReq.fromTPipeTransferReq(req),
+ req instanceof AirGapPseudoTPipeTransferRequest);
case TRANSFER_FILE_SEAL:
return handleTransferFileSeal(
PipeTransferFileSealReq.fromTPipeTransferReq(req),
partitionFetcher, schemaFetcher);
@@ -181,11 +184,20 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
: executeStatement(statement, partitionFetcher, schemaFetcher));
}
- private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq
req) {
+ private TPipeTransferResp handleTransferFilePiece(
+ PipeTransferFilePieceReq req, boolean isRequestThroughAirGap) {
try {
updateWritingFileIfNeeded(req.getFileName());
if (!isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
+ if (isRequestThroughAirGap) {
+ // If the request is through air gap, the sender will resend the
file piece from the
+ // beginning of the file.
+ // So the receiver should reset the offset of the writing file to
the beginning of the
+ // file.
+ writingFileWriter.setLength(0);
+ }
+
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET,
@@ -490,7 +502,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
}
@Override
- public IoTDBThriftConnectorRequestVersion getVersion() {
- return IoTDBThriftConnectorRequestVersion.VERSION_1;
+ public IoTDBConnectorRequestVersion getVersion() {
+ return IoTDBConnectorRequestVersion.VERSION_1;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 6fd0d776219..fb06743874f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
import
org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import org.apache.iotdb.db.pipe.connector.protocol.airgap.IoTDBAirGapConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.legacy.IoTDBLegacyPipeConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
@@ -74,6 +75,9 @@ public class PipeConnectorSubtaskManager {
} else if (connectorKey.equals(
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName())) {
pipeConnector = new IoTDBLegacyPipeConnector();
+ } else if (connectorKey.equals(
+ BuiltinPipePlugin.IOTDB_AIR_GAP_CONNECTOR.getPipePluginName())) {
+ pipeConnector = new IoTDBAirGapConnector();
} else {
pipeConnector =
PipeAgent.plugin().reflectConnector(pipeConnectorParameters);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 25e7d3db2c6..b50766b8f08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.receiver.legacy.IoTDBLegacyPipeReceiver;
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
@@ -2528,7 +2527,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
@Override
public TSStatus handshake(TSyncIdentityInfo info) throws TException {
- return IoTDBLegacyPipeReceiver.getInstance()
+ return PipeAgent.receiver()
+ .legacy()
.handshake(
info,
SESSION_MANAGER.getCurrSession().getClientAddress(),
@@ -2538,17 +2538,17 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
@Override
public TSStatus sendPipeData(ByteBuffer buff) throws TException {
- return IoTDBLegacyPipeReceiver.getInstance().transportPipeData(buff);
+ return PipeAgent.receiver().legacy().transportPipeData(buff);
}
@Override
public TSStatus sendFile(TSyncTransportMetaInfo metaInfo, ByteBuffer buff)
throws TException {
- return IoTDBLegacyPipeReceiver.getInstance().transportFile(metaInfo, buff);
+ return PipeAgent.receiver().legacy().transportFile(metaInfo, buff);
}
@Override
public TPipeTransferResp pipeTransfer(TPipeTransferReq req) {
- return PipeAgent.receiver().receive(req, partitionFetcher, schemaFetcher);
+ return PipeAgent.receiver().thrift().receive(req, partitionFetcher,
schemaFetcher);
}
@Override
@@ -2688,7 +2688,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
TSCloseSessionReq req = new TSCloseSessionReq();
closeSession(req);
}
- IoTDBLegacyPipeReceiver.getInstance().handleClientExit();
- PipeAgent.receiver().handleClientExit();
+ PipeAgent.receiver().thrift().handleClientExit();
+ PipeAgent.receiver().legacy().handleClientExit();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1230d9be8d2..37b4bb4a8d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
@@ -886,6 +887,9 @@ public class DataNode implements DataNodeMBean {
if
(IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
registerManager.register(RestService.getInstance());
}
+ if (PipeConfig.getInstance().getPipeAirGapReceiverEnabled()) {
+ registerManager.register(PipeAgent.receiver().airGap());
+ }
}
private void deactivate() {
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 22b9083f14e..d3279721947 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -960,6 +960,13 @@ cluster_name=defaultCluster
# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16
+# Whether to enable receiving pipe data through air gap.
+# The receiver can only return 0 or 1 in tcp mode to indicate whether the data
is received successfully.
+# pipe_air_gap_receiver_enabled=false
+
+# The port for the server to receive pipe data through air gap.
+# pipe_air_gap_receiver_port=9780
+
####################
### RatisConsensus Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3d26b9e8ee7..ebc7a2ef511 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -134,6 +134,7 @@ public enum ThreadName {
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER("Pipe-Async-Connector-Retry-Trigger"),
PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
+ PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"),
// -------------------------- JVM --------------------------
@@ -270,6 +271,7 @@ public enum ThreadName {
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER,
PIPE_WAL_RESOURCE_TTL_CHECKER,
+ PIPE_RECEIVER_AIR_GAP_AGENT,
WINDOW_EVALUATION_SERVICE,
STATEFUL_TRIGGER_INFORMATION_UPDATER));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 97932889caa..abc39f11ee6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -180,6 +180,9 @@ public class CommonConfig {
private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
private boolean pipeAutoRestartEnabled = true;
+ private boolean pipeAirGapReceiverEnabled = false;
+ private int pipeAirGapReceiverPort = 9780;
+
/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
@@ -702,6 +705,22 @@ public class CommonConfig {
pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
}
+ public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
+ this.pipeAirGapReceiverEnabled = pipeAirGapReceiverEnabled;
+ }
+
+ public boolean getPipeAirGapReceiverEnabled() {
+ return pipeAirGapReceiverEnabled;
+ }
+
+ public void setPipeAirGapReceiverPort(int pipeAirGapReceiverPort) {
+ this.pipeAirGapReceiverPort = pipeAirGapReceiverPort;
+ }
+
+ public int getPipeAirGapReceiverPort() {
+ return pipeAirGapReceiverPort;
+ }
+
public String getSchemaEngineMode() {
return schemaEngineMode;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 8ca5dbbd85e..0922b96452d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -377,6 +377,17 @@ public class CommonDescriptor {
Boolean.parseBoolean(
properties.getProperty(
"pipe_auto_restart_enabled",
String.valueOf(config.getPipeAutoRestartEnabled()))));
+
+ config.setPipeAirGapReceiverEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "pipe_air_gap_receiver_enabled",
+ Boolean.toString(config.getPipeAirGapReceiverEnabled()))));
+ config.setPipeAirGapReceiverPort(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_air_gap_receiver_port",
+ Integer.toString(config.getPipeAirGapReceiverPort()))));
}
public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 355d0851207..4b46ac5e0fa 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -149,6 +149,16 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAutoRestartEnabled();
}
+ /////////////////////////////// Air Gap Receiver
///////////////////////////////
+
+ public boolean getPipeAirGapReceiverEnabled() {
+ return COMMON_CONFIG.getPipeAirGapReceiverEnabled();
+ }
+
+ public int getPipeAirGapReceiverPort() {
+ return COMMON_CONFIG.getPipeAirGapReceiverPort();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
@@ -184,6 +194,13 @@ public class PipeConfig {
LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
LOGGER.info("PipeConnectorPendingQueueSize: {}",
getPipeConnectorPendingQueueSize());
+ LOGGER.info(
+ "PipeConnectorRPCThriftCompressionEnabled: {}",
+ isPipeConnectorRPCThriftCompressionEnabled());
+
+ LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
+ LOGGER.info("PipeAsyncConnectorCoreClientNumber: {}",
getPipeAsyncConnectorCoreClientNumber());
+ LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
@@ -196,6 +213,9 @@ public class PipeConfig {
"PipeMetaSyncerAutoRestartPipeCheckIntervalRound: {}",
getPipeMetaSyncerAutoRestartPipeCheckIntervalRound());
LOGGER.info("PipeAutoRestartEnabled: {}", getPipeAutoRestartEnabled());
+
+ LOGGER.info("PipeAirGapReceiverEnabled: {}",
getPipeAirGapReceiverEnabled());
+ LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
}
/////////////////////////////// Singleton ///////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
index a2f1795ed27..ef973b1cc30 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/BuiltinPipePlugin.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.plugin.builtin;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.DoNothingConnector;
+import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBAirGapConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBLegacyPipeConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftAsyncConnector;
import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.IoTDBThriftConnector;
@@ -41,6 +42,7 @@ public enum BuiltinPipePlugin {
IOTDB_THRIFT_SYNC_CONNECTOR("iotdb-thrift-sync-connector",
IoTDBThriftSyncConnector.class),
IOTDB_THRIFT_ASYNC_CONNECTOR("iotdb-thrift-async-connector",
IoTDBThriftAsyncConnector.class),
IOTDB_LEGACY_PIPE_CONNECTOR("iotdb-legacy-pipe-connector",
IoTDBLegacyPipeConnector.class),
+ IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector",
IoTDBAirGapConnector.class),
;
private final String pipePluginName;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBAirGapConnector.java
similarity index 63%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBAirGapConnector.java
index 29e59abc304..81fa0b03a3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/IoTDBThriftConnectorRequestVersion.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBAirGapConnector.java
@@ -17,19 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.pipe.connector.protocol.thrift;
+package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
-public enum IoTDBThriftConnectorRequestVersion {
- VERSION_1((byte) 1),
- ;
-
- private final byte version;
-
- IoTDBThriftConnectorRequestVersion(byte type) {
- this.version = type;
- }
-
- public byte getVersion() {
- return version;
- }
-}
+/**
+ * This class is a placeholder and should not be initialized. It represents
the IoTDB Air Gap
+ * connector. There is a real implementation in the server module but cannot
be imported here. The
+ * pipe agent in the server module will replace this class with the real
implementation when
+ * initializing the IoTDB Air Gap connector.
+ */
+public class IoTDBAirGapConnector extends PlaceholderConnector {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
index e527f1c72d3..59b0d12ccdd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
@@ -19,62 +19,10 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
-import org.apache.iotdb.pipe.api.PipeConnector;
-import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-
/**
* This class is a placeholder and should not be initialized. It represents
the IoTDB legacy pipe
* connector (for IoTDB v1.1). There is a real implementation in the server
module but cannot be
* imported here. The pipe agent in the server module will replace this class
with the real
* implementation when initializing the IoTDB legacy pipe connector.
*/
-public class IoTDBLegacyPipeConnector implements PipeConnector {
- private static final String PLACEHOLDER_ERROR_MSG =
- "This class is a placeholder and should not be used.";
-
- @Override
- public void validate(PipeParameterValidator validator) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void customize(
- PipeParameters parameters, PipeConnectorRuntimeConfiguration
configuration) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void handshake() {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void heartbeat() {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void transfer(TabletInsertionEvent tabletInsertionEvent) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void transfer(Event event) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public void close() {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-}
+public class IoTDBLegacyPipeConnector extends PlaceholderConnector {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
index b3772d361d3..23f4065fe52 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBThriftConnector.java
@@ -19,62 +19,10 @@
package org.apache.iotdb.commons.pipe.plugin.builtin.connector;
-import org.apache.iotdb.pipe.api.PipeConnector;
-import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-
/**
* This class is a placeholder and should not be initialized. It represents
the IoTDB Thrift
* connector. There is a real implementation in the server module but cannot
be imported here. The
* pipe agent in the server module will replace this class with the real
implementation when
* initializing the IoTDB Thrift connector.
*/
-public class IoTDBThriftConnector implements PipeConnector {
- private static final String PLACEHOLDER_ERROR_MSG =
- "This class is a placeholder and should not be used.";
-
- @Override
- public final void validate(PipeParameterValidator validator) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public final void customize(
- PipeParameters parameters, PipeConnectorRuntimeConfiguration
configuration) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public final void handshake() {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public final void heartbeat() {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public final void transfer(TabletInsertionEvent tabletInsertionEvent) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public final void transfer(TsFileInsertionEvent tsFileInsertionEvent) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public final void transfer(Event event) {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-
- @Override
- public final void close() {
- throw new UnsupportedOperationException(PLACEHOLDER_ERROR_MSG);
- }
-}
+public class IoTDBThriftConnector extends PlaceholderConnector {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/PlaceholderConnector.java
similarity index 86%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/PlaceholderConnector.java
index e527f1c72d3..dc8f4d25579 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/plugin/builtin/connector/PlaceholderConnector.java
@@ -28,12 +28,14 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
/**
- * This class is a placeholder and should not be initialized. It represents
the IoTDB legacy pipe
- * connector (for IoTDB v1.1). There is a real implementation in the server
module but cannot be
- * imported here. The pipe agent in the server module will replace this class
with the real
- * implementation when initializing the IoTDB legacy pipe connector.
+ * This class is a placeholder and should not be initialized. It represents
the all the IoTDB pipe
+ * connectors that can not be implemented in the node-commons module. Each
IoTDB pipe connector has
+ * a real implementation in the server module but cannot be imported here. The
pipe agent in the
+ * server module will replace this class with the real implementation when
initializing the IoTDB
+ * pipe connector.
*/
-public class IoTDBLegacyPipeConnector implements PipeConnector {
+public class PlaceholderConnector implements PipeConnector {
+
private static final String PLACEHOLDER_ERROR_MSG =
"This class is a placeholder and should not be used.";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 5e43452898b..74be88f288c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -26,6 +26,7 @@ public enum ServiceType {
RPC_SERVICE("RPC ServerService", "RPCService"),
INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
MQTT_SERVICE("MQTTService", "MqttService"),
+ AIR_GAP_SERVICE("AirGapService", "AirGapService"),
MONITOR_SERVICE("Monitor ServerService", "Monitor"),
STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"),
WAL_SERVICE("WAL ServerService", "WalService"),
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
index 418f7d136ab..321799d0d4a 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BytesUtils.java
@@ -688,10 +688,10 @@ public class BytesUtils {
*/
public static byte[] subBytes(byte[] src, int start, int length) {
if ((start + length) > src.length) {
- return null;
+ return new byte[0];
}
if (length <= 0) {
- return null;
+ return new byte[0];
}
byte[] result = new byte[length];
System.arraycopy(src, start, result, 0, length);
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 90c2d0f22cf..18a459a6ee0 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.tsfile.utils;
import java.io.ByteArrayOutputStream;