This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 28bb9b8cd6c Pipe: Support pipe's endpoint & global connector transfer
rate limit & Fix RPC compression not enabled in stream batch mode (#12543)
28bb9b8cd6c is described below
commit 28bb9b8cd6cf54506d90eb477384023f581293d0
Author: Itami Sho <[email protected]>
AuthorDate: Mon May 27 12:22:11 2024 +0800
Pipe: Support pipe's endpoint & global connector transfer rate limit & Fix
RPC compression not enabled in stream batch mode (#12543)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../protocol/IoTDBConfigRegionAirGapConnector.java | 51 +++++----
.../protocol/IoTDBConfigRegionConnector.java | 52 +++++----
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 14 ++-
.../payload/evolvable/builder/PipeEventBatch.java | 18 +++
.../airgap/IoTDBDataNodeAirGapConnector.java | 20 ++--
.../airgap/IoTDBDataRegionAirGapConnector.java | 55 +++++----
.../airgap/IoTDBSchemaRegionAirGapConnector.java | 32 +++---
.../async/IoTDBDataRegionAsyncConnector.java | 11 +-
.../PipeTransferTabletBatchEventHandler.java | 19 ++-
.../PipeTransferTabletInsertionEventHandler.java | 5 +
.../PipeTransferTsFileInsertionEventHandler.java | 35 ++++--
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 17 +--
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 127 ++++++++++++---------
.../thrift/sync/IoTDBSchemaRegionConnector.java | 36 +++---
.../execution/load/LoadTsFileRateLimiter.java | 32 +++---
.../queryengine/plan/planner/TreeModelPlanner.java | 4 -
.../resources/conf/iotdb-common.properties | 10 ++
.../async/AsyncPipeDataTransferServiceClient.java | 4 +
.../apache/iotdb/commons/conf/CommonConfig.java | 10 ++
.../iotdb/commons/conf/CommonDescriptor.java | 6 +
.../iotdb/commons/pipe/config/PipeConfig.java | 8 ++
.../config/constant/PipeConnectorConstant.java | 4 +
.../pipe/connector/client/IoTDBSyncClient.java | 7 ++
.../pipe/connector/limiter/GlobalRateLimiter.java} | 55 ++++-----
.../connector/limiter/PipeEndPointRateLimiter.java | 59 ++++++++++
.../connector/protocol/IoTDBAirGapConnector.java | 48 ++++++--
.../pipe/connector/protocol/IoTDBConnector.java | 45 +++++++-
.../connector/protocol/IoTDBSslSyncConnector.java | 24 ++--
29 files changed, 536 insertions(+), 274 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index ec09b546648..9ea34939864 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.net.Socket;
import java.util.HashMap;
import java.util.Objects;
@@ -56,9 +55,8 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
- return compressIfNeeded(
- PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+ return PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes(
+ CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
}
@Override
@@ -71,7 +69,7 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
- return
compressIfNeeded(PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params));
+ return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
@Override
@@ -107,7 +105,7 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
@Override
public void transfer(final Event event) throws Exception {
final int socketIndex = nextSocketIndex();
- final Socket socket = sockets.get(socketIndex);
+ final AirGapSocket socket = sockets.get(socketIndex);
try {
if (event instanceof PipeConfigRegionWritePlanEvent) {
@@ -131,7 +129,8 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
}
private void doTransferWrapper(
- final Socket socket, final PipeConfigRegionWritePlanEvent
pipeConfigRegionWritePlanEvent)
+ final AirGapSocket socket,
+ final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the
event may be released.
@@ -147,13 +146,14 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
}
private void doTransfer(
- final Socket socket, final PipeConfigRegionWritePlanEvent
pipeConfigRegionWritePlanEvent)
+ final AirGapSocket socket,
+ final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
+ pipeConfigRegionWritePlanEvent.getPipeName(),
socket,
- compressIfNeeded(
- PipeTransferConfigPlanReq.toTPipeTransferBytes(
- pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())))) {
+ PipeTransferConfigPlanReq.toTPipeTransferBytes(
+ pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
final String errorMessage =
String.format(
"Transfer config region write plan %s error. Socket: %s.",
@@ -170,7 +170,7 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
}
private void doTransferWrapper(
- final Socket socket, final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
+ final AirGapSocket socket, final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the
event may be released.
@@ -186,29 +186,30 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
}
private void doTransfer(
- final Socket socket, final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
+ final AirGapSocket socket, final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
+ final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();
// 1. Transfer snapshotFile, and template file if exists
- transferFilePieces(snapshot, socket, true);
+ transferFilePieces(pipeName, snapshot, socket, true);
if (Objects.nonNull(templateFile)) {
- transferFilePieces(templateFile, socket, true);
+ transferFilePieces(pipeName, templateFile, socket, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred
completely
if (!send(
+ pipeName,
socket,
- compressIfNeeded(
- PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
- // The pattern is surely Non-null
- pipeConfigRegionSnapshotEvent.getPatternString(),
- snapshot.getName(),
- snapshot.length(),
- Objects.nonNull(templateFile) ? templateFile.getName() : null,
- Objects.nonNull(templateFile) ? templateFile.length() : 0,
- pipeConfigRegionSnapshotEvent.getFileType(),
- pipeConfigRegionSnapshotEvent.toSealTypeString())))) {
+ PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
+ // The pattern is surely Non-null
+ pipeConfigRegionSnapshotEvent.getPatternString(),
+ snapshot.getName(),
+ snapshot.length(),
+ Objects.nonNull(templateFile) ? templateFile.getName() : null,
+ Objects.nonNull(templateFile) ? templateFile.length() : 0,
+ pipeConfigRegionSnapshotEvent.getFileType(),
+ pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
final String errorMessage =
String.format("Seal config region snapshot %s error. Socket %s.",
snapshot, socket);
// Send handshake because we don't know whether the receiver side
configNode
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index b199df4c390..00279bf22d9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.Pair;
@@ -123,13 +124,15 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final TPipeTransferResp resp;
try {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferConfigPlanReq.toTPipeTransferReq(
-
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())));
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferConfigPlanReq.toTPipeTransferReq(
+ pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
+ rateLimitIfNeeded(
+ pipeConfigRegionWritePlanEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -177,32 +180,35 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
throws PipeException, IOException {
+ final String pipeName = snapshotEvent.getPipeName();
final File snapshotFile = snapshotEvent.getSnapshotFile();
final File templateFile = snapshotEvent.getTemplateFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
// 1. Transfer snapshotFile, and template File if exists
- transferFilePieces(snapshotFile, clientAndStatus, true);
+ transferFilePieces(pipeName, snapshotFile, clientAndStatus, true);
if (Objects.nonNull(templateFile)) {
- transferFilePieces(templateFile, clientAndStatus, true);
+ transferFilePieces(pipeName, templateFile, clientAndStatus, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred
completely
final TPipeTransferResp resp;
try {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
- // The pattern is surely Non-null
- snapshotEvent.getPatternString(),
- snapshotFile.getName(),
- snapshotFile.length(),
- Objects.nonNull(templateFile) ?
templateFile.getName() : null,
- Objects.nonNull(templateFile) ?
templateFile.length() : 0,
- snapshotEvent.getFileType(),
- snapshotEvent.toSealTypeString())));
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
+ // The pattern is surely Non-null
+ snapshotEvent.getPatternString(),
+ snapshotFile.getName(),
+ snapshotFile.length(),
+ Objects.nonNull(templateFile) ? templateFile.getName() :
null,
+ Objects.nonNull(templateFile) ? templateFile.length() : 0,
+ snapshotEvent.getFileType(),
+ snapshotEvent.toSealTypeString()));
+ rateLimitIfNeeded(
+ snapshotEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5e481330d09..2f4714d0cc5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1107,7 +1107,7 @@ public class IoTDBConfig {
private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min
- private double loadWriteThroughputBytesPerSecond = Double.MAX_VALUE; //
Bytes/s
+ private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s
/** Pipe related */
/** initialized as empty, updated based on the latest `systemDir` during
querying */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 82e2e09fbfa..6ac64b6c482 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -922,7 +922,7 @@ public class IoTDBDescriptor {
conf.setLoadWriteThroughputBytesPerSecond(
Double.parseDouble(
properties.getProperty(
- "load_write_throughput_bytes_per_sec",
+ "load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir",
conf.getExtPipeDir()).trim());
@@ -1718,9 +1718,19 @@ public class IoTDBDescriptor {
conf.setLoadWriteThroughputBytesPerSecond(
Double.parseDouble(
properties.getProperty(
- "load_write_throughput_bytes_per_sec",
+ "load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
+ // update pipe config
+ commonDescriptor
+ .getConfig()
+ .setPipeAllSinksRateLimitBytesPerSecond(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_all_sinks_rate_limit_bytes_per_second",
+ String.valueOf(
+
commonDescriptor.getConfig().getPipeAllSinksRateLimitBytesPerSecond()))));
+
// update merge_threshold_of_explain_analyze
conf.setMergeThresholdOfExplainAnalyze(
Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
index 74b49ce7272..44d72b1e1c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
@@ -39,7 +39,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
public class PipeEventBatch implements AutoCloseable {
@@ -61,6 +63,9 @@ public class PipeEventBatch implements AutoCloseable {
private final PipeMemoryBlock allocatedMemoryBlock;
private long totalBufferSize = 0;
+ // Used to rate limit when transferring data
+ private final Map<String, Long> pipeName2BytesAccumulated = new HashMap<>();
+
public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
this.maxDelayInMs = maxDelayInMs;
this.allocatedMemoryBlock =
@@ -112,6 +117,10 @@ public class PipeEventBatch implements AutoCloseable {
final int bufferSize = buildTabletInsertionBuffer(event);
totalBufferSize += bufferSize;
+ pipeName2BytesAccumulated.compute(
+ ((EnrichedEvent) event).getPipeName(),
+ (pipeName, bytesAccumulated) ->
+ bytesAccumulated == null ? bufferSize : bytesAccumulated +
bufferSize);
if (firstEventProcessingTime == Long.MIN_VALUE) {
firstEventProcessingTime = System.currentTimeMillis();
@@ -137,6 +146,7 @@ public class PipeEventBatch implements AutoCloseable {
firstEventProcessingTime = Long.MIN_VALUE;
totalBufferSize = 0;
+ pipeName2BytesAccumulated.clear();
}
public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
@@ -160,6 +170,14 @@ public class PipeEventBatch implements AutoCloseable {
return new ArrayList<>(requestCommitIds);
}
+ public Map<String, Long> deepCopyPipeName2BytesAccumulated() {
+ return new HashMap<>(pipeName2BytesAccumulated);
+ }
+
+ public Map<String, Long> getPipeName2BytesAccumulated() {
+ return pipeName2BytesAccumulated;
+ }
+
private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
throws IOException, WALPipeException {
final ByteBuffer buffer;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 01c7de76da2..f6e80a28917 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Set;
@@ -88,9 +87,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
@Override
protected byte[] generateHandShakeV1Payload() throws IOException {
- return compressIfNeeded(
- PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+ return PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes(
+ CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
}
@Override
@@ -103,11 +101,12 @@ public abstract class IoTDBDataNodeAirGapConnector
extends IoTDBAirGapConnector
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
- return
compressIfNeeded(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params));
+ return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
protected void doTransferWrapper(
- final Socket socket, final PipeSchemaRegionWritePlanEvent
pipeSchemaRegionWritePlanEvent)
+ final AirGapSocket socket,
+ final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the
event may be released.
@@ -123,13 +122,14 @@ public abstract class IoTDBDataNodeAirGapConnector
extends IoTDBAirGapConnector
}
private void doTransfer(
- final Socket socket, final PipeSchemaRegionWritePlanEvent
pipeSchemaRegionWritePlanEvent)
+ final AirGapSocket socket,
+ final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
+ pipeSchemaRegionWritePlanEvent.getPipeName(),
socket,
- compressIfNeeded(
- PipeTransferPlanNodeReq.toTPipeTransferBytes(
- pipeSchemaRegionWritePlanEvent.getPlanNode())))) {
+ PipeTransferPlanNodeReq.toTPipeTransferBytes(
+ pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
final String errorMessage =
String.format(
"Transfer data node write plan %s error. Socket: %s.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 84ca0b0ccdb..e83cd623fed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.net.Socket;
import java.util.Objects;
public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector {
@@ -69,7 +68,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
final int socketIndex = nextSocketIndex();
- final Socket socket = sockets.get(socketIndex);
+ final AirGapSocket socket = sockets.get(socketIndex);
try {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
@@ -106,7 +105,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
final int socketIndex = nextSocketIndex();
- final Socket socket = sockets.get(socketIndex);
+ final AirGapSocket socket = sockets.get(socketIndex);
try {
doTransferWrapper(socket, (PipeTsFileInsertionEvent)
tsFileInsertionEvent);
@@ -125,7 +124,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
@Override
public void transfer(final Event event) throws Exception {
final int socketIndex = nextSocketIndex();
- final Socket socket = sockets.get(socketIndex);
+ final AirGapSocket socket = sockets.get(socketIndex);
try {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
@@ -147,7 +146,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
private void doTransferWrapper(
- final Socket socket,
+ final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException, IOException {
try {
@@ -164,19 +163,18 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
private void doTransfer(
- final Socket socket,
+ final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException, IOException {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
final byte[] bytes =
- compressIfNeeded(
- Objects.isNull(insertNode)
- ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
- pipeInsertNodeTabletInsertionEvent.getByteBuffer())
- :
PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode));
+ Objects.isNull(insertNode)
+ ? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer())
+ : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
- if (!send(socket, bytes)) {
+ if (!send(pipeInsertNodeTabletInsertionEvent.getPipeName(), socket,
bytes)) {
final String errorMessage =
String.format(
"Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket:
%s",
@@ -190,7 +188,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
private void doTransferWrapper(
- final Socket socket, final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
+ final AirGapSocket socket, final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the
event may be released.
@@ -206,14 +204,14 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
private void doTransfer(
- final Socket socket, final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
+ final AirGapSocket socket, final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException, IOException {
if (!send(
+ pipeRawTabletInsertionEvent.getPipeName(),
socket,
- compressIfNeeded(
- PipeTransferTabletRawReq.toTPipeTransferBytes(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned())))) {
+ PipeTransferTabletRawReq.toTPipeTransferBytes(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned()))) {
final String errorMessage =
String.format(
"Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.",
@@ -227,7 +225,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
private void doTransferWrapper(
- final Socket socket, final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
+ final AirGapSocket socket, final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the
event may be released.
@@ -243,22 +241,23 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
}
private void doTransfer(
- final Socket socket, final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
+ final AirGapSocket socket, final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
throws PipeException, IOException {
+ final String pipeName = pipeTsFileInsertionEvent.getPipeName();
final File tsFile = pipeTsFileInsertionEvent.getTsFile();
final String errorMessage = String.format("Seal file %s error. Socket
%s.", tsFile, socket);
// 1. Transfer file piece by piece, and mod if needed
if (pipeTsFileInsertionEvent.isWithMod() &&
supportModsIfIsDataNodeReceiver) {
final File modFile = pipeTsFileInsertionEvent.getModFile();
- transferFilePieces(modFile, socket, true);
- transferFilePieces(tsFile, socket, true);
+ transferFilePieces(pipeName, modFile, socket, true);
+ transferFilePieces(pipeName, tsFile, socket, true);
// 2. Transfer file seal signal with mod, which means the file is
transferred completely
if (!send(
+ pipeName,
socket,
- compressIfNeeded(
- PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())))) {
+ PipeTransferTsFileSealWithModReq.toTPipeTransferBytes(
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length()))) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
@@ -268,12 +267,12 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
LOGGER.info("Successfully transferred file {}.", tsFile);
}
} else {
- transferFilePieces(tsFile, socket, false);
+ transferFilePieces(pipeName, tsFile, socket, false);
// 2. Transfer file seal signal without mod, which means the file is
transferred completely
if (!send(
+ pipeName,
socket,
- compressIfNeeded(
- PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(),
tsFile.length())))) {
+ PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(),
tsFile.length()))) {
receiverStatusHandler.handle(
new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index f390994e4f0..a97408266b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.net.Socket;
import java.util.Objects;
public class IoTDBSchemaRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector {
@@ -61,7 +60,7 @@ public class IoTDBSchemaRegionAirGapConnector extends
IoTDBDataNodeAirGapConnect
@Override
public void transfer(final Event event) throws Exception {
final int socketIndex = nextSocketIndex();
- final Socket socket = sockets.get(socketIndex);
+ final AirGapSocket socket = sockets.get(socketIndex);
try {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
@@ -85,7 +84,7 @@ public class IoTDBSchemaRegionAirGapConnector extends
IoTDBDataNodeAirGapConnect
}
private void doTransferWrapper(
- final Socket socket, final PipeSchemaRegionSnapshotEvent
pipeSchemaRegionSnapshotEvent)
+ final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent
pipeSchemaRegionSnapshotEvent)
throws PipeException, IOException {
try {
// We increase the reference count for this event to determine if the
event may be released.
@@ -101,29 +100,30 @@ public class IoTDBSchemaRegionAirGapConnector extends
IoTDBDataNodeAirGapConnect
}
private void doTransfer(
- final Socket socket, final PipeSchemaRegionSnapshotEvent
pipeSchemaRegionSnapshotEvent)
+ final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent
pipeSchemaRegionSnapshotEvent)
throws PipeException, IOException {
+ final String pipeName = pipeSchemaRegionSnapshotEvent.getPipeName();
final File mtreeSnapshotFile =
pipeSchemaRegionSnapshotEvent.getMTreeSnapshotFile();
final File tagLogSnapshotFile =
pipeSchemaRegionSnapshotEvent.getTagLogSnapshotFile();
// 1. Transfer mTreeSnapshotFile, and tLog file if exists
- transferFilePieces(mtreeSnapshotFile, socket, true);
+ transferFilePieces(pipeName, mtreeSnapshotFile, socket, true);
if (Objects.nonNull(tagLogSnapshotFile)) {
- transferFilePieces(tagLogSnapshotFile, socket, true);
+ transferFilePieces(pipeName, tagLogSnapshotFile, socket, true);
}
// 2. Transfer file seal signal, which means the snapshots is transferred
completely
if (!send(
+ pipeName,
socket,
- compressIfNeeded(
- PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
- // The pattern is surely Non-null
- pipeSchemaRegionSnapshotEvent.getPatternString(),
- mtreeSnapshotFile.getName(),
- mtreeSnapshotFile.length(),
- Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.getName() : null,
- Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.length() : 0,
- pipeSchemaRegionSnapshotEvent.getDatabaseName(),
- pipeSchemaRegionSnapshotEvent.toSealTypeString())))) {
+ PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes(
+ // The pattern is surely Non-null
+ pipeSchemaRegionSnapshotEvent.getPatternString(),
+ mtreeSnapshotFile.getName(),
+ mtreeSnapshotFile.length(),
+ Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName()
: null,
+ Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length()
: 0,
+ pipeSchemaRegionSnapshotEvent.getDatabaseName(),
+ pipeSchemaRegionSnapshotEvent.toSealTypeString()))) {
final String errorMessage =
String.format(
"Seal schema region snapshot file %s and %s error. Socket %s.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 632d929e372..3edcedc31d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -196,11 +196,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
transfer(
- // insertNode.getDevicePath() is null for InsertRowsNode
- Objects.nonNull(insertNode) &&
Objects.nonNull(insertNode.getDevicePath())
- ? insertNode.getDevicePath().getFullPath()
- : null,
- pipeTransferInsertNodeReqHandler);
+ // getDeviceId() may return null for InsertRowsNode
+ pipeInsertNodeTabletInsertionEvent.getDeviceId(),
pipeTransferInsertNodeReqHandler);
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -491,7 +488,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
// synchronized to avoid close connector when transfer event
- public synchronized void close() throws Exception {
+ public synchronized void close() {
isClosed.set(true);
retryConnector.close();
@@ -500,6 +497,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
if (tabletBatchBuilder != null) {
tabletBatchBuilder.close();
}
+
+ super.close();
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 88e41343950..7407b1b9554 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<TPipeTransferResp> {
@@ -50,7 +51,10 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
private final List<Long> requestCommitIds;
private final List<Event> events;
+ private final Map<String, Long> pipeName2BytesAccumulated;
+
private final TPipeTransferReq req;
+ private final double reqCompressionRatio;
private final IoTDBDataRegionAsyncConnector connector;
@@ -60,16 +64,25 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
// Deep copy to keep Ids' and events' reference
requestCommitIds = batch.deepCopyRequestCommitIds();
events = batch.deepCopyEvents();
+ pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
+
+ final TPipeTransferReq uncompressedReq = batch.toTPipeTransferReq();
req =
connector.isRpcCompressionEnabled()
- ? batch.toTPipeTransferReq()
- : PipeTransferCompressedReq.toTPipeTransferReq(
- batch.toTPipeTransferReq(), connector.getCompressors());
+ ? PipeTransferCompressedReq.toTPipeTransferReq(
+ uncompressedReq, connector.getCompressors())
+ : uncompressedReq;
+ reqCompressionRatio = (double) req.getBody().length /
uncompressedReq.getBody().length;
this.connector = connector;
}
public void transfer(final AsyncPipeDataTransferServiceClient client) throws
TException {
+ for (final Map.Entry<String, Long> entry :
pipeName2BytesAccumulated.entrySet()) {
+ connector.rateLimitIfNeeded(
+ entry.getKey(), client.getEndPoint(), (long) (entry.getValue() *
reqCompressionRatio));
+ }
+
client.pipeTransfer(req, this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 5d834440d7f..94fa6d5ac1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -53,6 +53,11 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException {
+ if (event instanceof EnrichedEvent) {
+ connector.rateLimitIfNeeded(
+ ((EnrichedEvent) event).getPipeName(), client.getEndPoint(),
req.getBody().length);
+ }
+
doTransfer(client, req);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 8d02aeb8fa8..e7e374443c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -122,15 +122,22 @@ public class PipeTransferTsFileInsertionEventHandler
transfer(clientManager, client);
} else if (currentFile == tsFile) {
isSealSignalSent.set(true);
- client.pipeTransfer(
- PipeTransferCompressedReq.toTPipeTransferReq(
- transferMod
- ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
- modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())
- : PipeTransferTsFileSealReq.toTPipeTransferReq(
- tsFile.getName(), tsFile.length()),
- connector.getCompressors()),
- this);
+
+ final TPipeTransferReq uncompressedReq =
+ transferMod
+ ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length())
+ :
PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
+ final TPipeTransferReq req =
+ connector.isRpcCompressionEnabled()
+ ? PipeTransferCompressedReq.toTPipeTransferReq(
+ uncompressedReq, connector.getCompressors())
+ : uncompressedReq;
+
+ connector.rateLimitIfNeeded(
+ event.getPipeName(), client.getEndPoint(), req.getBody().length);
+
+ client.pipeTransfer(req, this);
}
return;
}
@@ -145,12 +152,16 @@ public class PipeTransferTsFileInsertionEventHandler
currentFile.getName(), position, payload)
: PipeTransferTsFilePieceReq.toTPipeTransferReq(
currentFile.getName(), position, payload);
- client.pipeTransfer(
+ final TPipeTransferReq req =
connector.isRpcCompressionEnabled()
? PipeTransferCompressedReq.toTPipeTransferReq(
uncompressedReq, connector.getCompressors())
- : uncompressedReq,
- this);
+ : uncompressedReq;
+
+ connector.rateLimitIfNeeded(event.getPipeName(), client.getEndPoint(),
req.getBody().length);
+
+ client.pipeTransfer(req, this);
+
position += readLength;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 33c97ce27f2..aef655c3240 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.Pair;
@@ -113,13 +114,15 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
final TPipeTransferResp resp;
try {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferPlanNodeReq.toTPipeTransferReq(
- pipeSchemaRegionWritePlanEvent.getPlanNode())));
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferPlanNodeReq.toTPipeTransferReq(
+ pipeSchemaRegionWritePlanEvent.getPlanNode()));
+ rateLimitIfNeeded(
+ pipeSchemaRegionWritePlanEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 8ee65ef94ed..cf93bac6ced 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.Pair;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Map;
import java.util.Objects;
public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
@@ -176,10 +178,23 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
final TPipeTransferResp resp;
try {
- resp =
- clientAndStatus
- .getLeft()
-
.pipeTransfer(compressIfNeeded(batchToTransfer.toTPipeTransferReq()));
+ final TPipeTransferReq uncompressedReq =
batchToTransfer.toTPipeTransferReq();
+ final long uncompressedSize = uncompressedReq.getBody().length;
+
+ final TPipeTransferReq req = compressIfNeeded(uncompressedReq);
+ final long compressedSize = req.getBody().length;
+
+ final double compressionRatio = (double) compressedSize /
uncompressedSize;
+
+ for (final Map.Entry<String, Long> entry :
+ batchToTransfer.getPipeName2BytesAccumulated().entrySet()) {
+ rateLimitIfNeeded(
+ entry.getKey(),
+ clientAndStatus.getLeft().getEndPoint(),
+ (long) (entry.getValue() * compressionRatio));
+ }
+
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -230,30 +245,26 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
private void doTransfer(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException {
- final InsertNode insertNode;
- Pair<IoTDBSyncClient, Boolean> clientAndStatus = null;
final TPipeTransferResp resp;
+ Pair<IoTDBSyncClient, Boolean> clientAndStatus = null;
try {
- insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
// getDeviceId() may return null for InsertRowsNode, will be equal to
getClient(null)
clientAndStatus =
clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId());
- if (insertNode != null) {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
-
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)));
- } else {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferTabletBinaryReq.toTPipeTransferReq(
-
pipeInsertNodeTabletInsertionEvent.getByteBuffer())));
- }
+
+ final InsertNode insertNode =
+ pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ insertNode != null
+ ?
PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode)
+ : PipeTransferTabletBinaryReq.toTPipeTransferReq(
+ pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
+ rateLimitIfNeeded(
+ pipeInsertNodeTabletInsertionEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
if (clientAndStatus != null) {
clientAndStatus.setRight(false);
@@ -276,10 +287,11 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status),
pipeInsertNodeTabletInsertionEvent.toString());
}
- // insertNode.getDevicePath() is null for InsertRowsNode
- if (insertNode != null && insertNode.getDevicePath() != null &&
status.isSetRedirectNode()) {
+ // pipeInsertNodeTabletInsertionEvent.getDeviceId() is null for
InsertRowsNode
+ if (Objects.nonNull(pipeInsertNodeTabletInsertionEvent.getDeviceId())
+ && status.isSetRedirectNode()) {
clientManager.updateLeaderCache(
- insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
+ pipeInsertNodeTabletInsertionEvent.getDeviceId(),
status.getRedirectNode());
}
}
@@ -305,14 +317,16 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
final TPipeTransferResp resp;
try {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferTabletRawReq.toTPipeTransferReq(
- pipeRawTabletInsertionEvent.convertToTablet(),
- pipeRawTabletInsertionEvent.isAligned())));
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferTabletRawReq.toTPipeTransferReq(
+ pipeRawTabletInsertionEvent.convertToTablet(),
+ pipeRawTabletInsertionEvent.isAligned()));
+ rateLimitIfNeeded(
+ pipeRawTabletInsertionEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -356,6 +370,7 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
private void doTransfer(final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
throws PipeException, IOException {
+ final String pipeName = pipeTsFileInsertionEvent.getPipeName();
final File tsFile = pipeTsFileInsertionEvent.getTsFile();
final File modFile = pipeTsFileInsertionEvent.getModFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
@@ -363,20 +378,19 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
// 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
if (pipeTsFileInsertionEvent.isWithMod() &&
clientManager.supportModsIfIsDataNodeReceiver()) {
- transferFilePieces(modFile, clientAndStatus, true);
- transferFilePieces(tsFile, clientAndStatus, true);
+ transferFilePieces(pipeName, modFile, clientAndStatus, true);
+ transferFilePieces(pipeName, tsFile, clientAndStatus, true);
// 2. Transfer file seal signal with mod, which means the file is
transferred completely
try {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
- modFile.getName(),
- modFile.length(),
- tsFile.getName(),
- tsFile.length())));
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length()));
+ rateLimitIfNeeded(
+ pipeTsFileInsertionEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
clientManager.adjustTimeoutIfNecessary(e);
@@ -385,16 +399,17 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
e);
}
} else {
- transferFilePieces(tsFile, clientAndStatus, false);
+ transferFilePieces(pipeName, tsFile, clientAndStatus, false);
// 2. Transfer file seal signal without mod, which means the file is
transferred completely
try {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferTsFileSealReq.toTPipeTransferReq(
- tsFile.getName(), tsFile.length())));
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(),
tsFile.length()));
+ rateLimitIfNeeded(
+ pipeTsFileInsertionEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
clientManager.adjustTimeoutIfNecessary(e);
@@ -419,10 +434,10 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
@Override
public void close() {
- super.close();
-
if (tabletBatchBuilder != null) {
tabletBatchBuilder.close();
}
+
+ super.close();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 2b2cba03f20..210bbec3dc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.Pair;
@@ -87,32 +88,35 @@ public class IoTDBSchemaRegionConnector extends
IoTDBDataNodeSyncConnector {
private void doTransfer(final PipeSchemaRegionSnapshotEvent snapshotEvent)
throws PipeException, IOException {
+ final String pipeName = snapshotEvent.getPipeName();
final File mTreeSnapshotFile = snapshotEvent.getMTreeSnapshotFile();
final File tagLogSnapshotFile = snapshotEvent.getTagLogSnapshotFile();
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
final TPipeTransferResp resp;
// 1. Transfer mTreeSnapshotFile, and tLog file if exists
- transferFilePieces(mTreeSnapshotFile, clientAndStatus, true);
+ transferFilePieces(pipeName, mTreeSnapshotFile, clientAndStatus, true);
if (Objects.nonNull(tagLogSnapshotFile)) {
- transferFilePieces(tagLogSnapshotFile, clientAndStatus, true);
+ transferFilePieces(pipeName, tagLogSnapshotFile, clientAndStatus, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred
completely
try {
- resp =
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
- // The pattern is surely Non-null
- snapshotEvent.getPatternString(),
- mTreeSnapshotFile.getName(),
- mTreeSnapshotFile.length(),
- Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.getName() : null,
- Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.length() : 0,
- snapshotEvent.getDatabaseName(),
- snapshotEvent.toSealTypeString())));
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq(
+ // The pattern is surely Non-null
+ snapshotEvent.getPatternString(),
+ mTreeSnapshotFile.getName(),
+ mTreeSnapshotFile.length(),
+ Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.getName() : null,
+ Objects.nonNull(tagLogSnapshotFile) ?
tagLogSnapshotFile.length() : 0,
+ snapshotEvent.getDatabaseName(),
+ snapshotEvent.toSealTypeString()));
+ rateLimitIfNeeded(
+ snapshotEvent.getPipeName(),
+ clientAndStatus.getLeft().getEndPoint(),
+ req.getBody().length);
+ resp = clientAndStatus.getLeft().pipeTransfer(req);
} catch (final Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
index 8aa7841a8d0..ce7046bf5ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
@@ -33,22 +33,19 @@ public class LoadTsFileRateLimiter {
new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
private final RateLimiter loadWriteRateLimiter;
- private LoadTsFileRateLimiter() {
- final double throughputBytesPerSecondLimit =
throughputBytesPerSecond.get();
- loadWriteRateLimiter =
- // if throughput <= 0, disable rate limiting
- throughputBytesPerSecondLimit <= 0
- ? RateLimiter.create(Double.MAX_VALUE)
- : RateLimiter.create(throughputBytesPerSecondLimit);
- }
-
public void acquire(long bytes) {
- if (throughputBytesPerSecond.get() !=
CONFIG.getLoadWriteThroughputBytesPerSecond()) {
- final double newThroughputBytesPerSecond =
CONFIG.getLoadWriteThroughputBytesPerSecond();
- throughputBytesPerSecond.set(newThroughputBytesPerSecond);
+ final double throughputBytesPerSecondLimit =
CONFIG.getLoadWriteThroughputBytesPerSecond();
+
+ if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
+ throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
loadWriteRateLimiter.setRate(
// if throughput <= 0, disable rate limiting
- newThroughputBytesPerSecond <= 0 ? Double.MAX_VALUE :
newThroughputBytesPerSecond);
+ throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE :
throughputBytesPerSecondLimit);
+ }
+
+ // For performance, we don't need to acquire rate limiter if throughput <= 0
+ if (throughputBytesPerSecondLimit <= 0) {
+ return;
}
while (bytes > 0) {
@@ -64,6 +61,15 @@ public class LoadTsFileRateLimiter {
//////////////////////////// Singleton ////////////////////////////
+ private LoadTsFileRateLimiter() {
+ final double throughputBytesPerSecondLimit =
throughputBytesPerSecond.get();
+ loadWriteRateLimiter =
+ // if throughput <= 0, disable rate limiting
+ throughputBytesPerSecondLimit <= 0
+ ? RateLimiter.create(Double.MAX_VALUE)
+ : RateLimiter.create(throughputBytesPerSecondLimit);
+ }
+
private static class LoadTsFileRateLimiterHolder {
private static final LoadTsFileRateLimiter INSTANCE = new
LoadTsFileRateLimiter();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 4ea3048f2c3..843374ec49c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -46,16 +46,12 @@ import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
public class TreeModelPlanner implements IPlanner {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TreeModelPlanner.class);
private final Statement statement;
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 827eeca2ad7..099f76fb3b7 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -965,6 +965,11 @@ data_replication_factor=1
# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780
+# The total bytes that all pipe sinks can transfer per second.
+# When given a value less than or equal to 0, it means no limit.
+# default value is -1, which means no limit.
+# pipe_all_sinks_rate_limit_bytes_per_second=-1
+
####################
### RatisConsensus Configuration
####################
@@ -1104,3 +1109,8 @@ data_replication_factor=1
# Load clean up task is used to clean up the unsuccessful loaded tsfile after
a certain period of time.
# The parameter is the delay time after an unsuccessful load operation (in
seconds).
# load_clean_up_task_execution_delay_time_seconds=1800
+
+# The maximum bytes per second of disk write throughput when loading tsfile.
+# When given a value less than or equal to 0, it means no limit.
+# Default value is -1, which means no limit.
+# load_write_throughput_bytes_per_second=-1
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 4572516fbee..91fa97fc82c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -165,6 +165,10 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
return endpoint.getPort();
}
+ public TEndPoint getEndPoint() {
+ return endpoint;
+ }
+
@Override
public String toString() {
return String.format("AsyncPipeDataTransferServiceClient{%s}, id = {%d}",
endpoint, id);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index fc437a3de97..c03cf539711 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -194,6 +194,8 @@ public class CommonConfig {
private int pipeAsyncConnectorSelectorNumber = 4;
private int pipeAsyncConnectorMaxClientNumber = 16;
+ private double pipeAllSinksRateLimitBytesPerSecond = -1;
+
private boolean isSeperatedPipeHeartbeatEnabled = true;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -1007,6 +1009,14 @@ public class CommonConfig {
this.pipeRemainingTimeCommitRateSmoothingFactor =
pipeRemainingTimeCommitRateSmoothingFactor;
}
+ public double getPipeAllSinksRateLimitBytesPerSecond() {
+ return pipeAllSinksRateLimitBytesPerSecond;
+ }
+
+ public void setPipeAllSinksRateLimitBytesPerSecond(double
pipeAllSinksRateLimitBytesPerSecond) {
+ this.pipeAllSinksRateLimitBytesPerSecond =
pipeAllSinksRateLimitBytesPerSecond;
+ }
+
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
return twoStageAggregateMaxCombinerLiveTimeInMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index efb3420b62c..b472b155aef 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -377,6 +377,12 @@ public class CommonDescriptor {
"pipe_async_connector_max_client_number",
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));
+ config.setPipeAllSinksRateLimitBytesPerSecond(
+ Double.parseDouble(
+ properties.getProperty(
+ "pipe_all_sinks_rate_limit_bytes_per_second",
+
String.valueOf(config.getPipeAllSinksRateLimitBytesPerSecond()))));
+
config.setSeperatedPipeHeartbeatEnabled(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 41331a822dd..dfd48e7eccb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -123,6 +123,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
}
+ public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+ return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
+ }
+
public float getPipeLeaderCacheMemoryUsagePercentage() {
return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
}
@@ -324,6 +328,10 @@ public class PipeConfig {
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
+ LOGGER.info(
+ "PipeAllConnectorsRateLimitBytesPerSecond: {}",
+ getPipeAllConnectorsRateLimitBytesPerSecond());
+
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
"PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index c15fc4db189..5e73c4d54d8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -177,6 +177,10 @@ public class PipeConnectorConstant {
CONNECTOR_COMPRESSOR_ZSTD,
CONNECTOR_COMPRESSOR_LZMA2)));
+ public static final String CONNECTOR_RATE_LIMIT_KEY =
"connector.rate-limit-bytes-per-second";
+ public static final String SINK_RATE_LIMIT_KEY =
"sink.rate-limit-bytes-per-second";
+ public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
+
public static final String SINK_TOPIC_KEY = "sink.topic";
public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
index 3ab7a156b74..f15934afd5e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.connector.client;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
@@ -33,6 +34,7 @@ public class IoTDBSyncClient extends IClientRPCService.Client
private final String ipAddress;
private final int port;
+ private final TEndPoint endPoint;
public IoTDBSyncClient(
ThriftClientProperty property,
@@ -57,6 +59,7 @@ public class IoTDBSyncClient extends IClientRPCService.Client
ipAddress, port, property.getConnectionTimeoutMs())));
this.ipAddress = ipAddress;
this.port = port;
+ this.endPoint = new TEndPoint(ipAddress, port);
final TTransport transport = getInputProtocol().getTransport();
if (!transport.isOpen()) {
transport.open();
@@ -71,6 +74,10 @@ public class IoTDBSyncClient extends IClientRPCService.Client
return port;
}
+ public TEndPoint getEndPoint() {
+ return endPoint;
+ }
+
public void setTimeout(int timeout) {
((TimeoutChangeableTransport)
(getInputProtocol().getTransport())).setTimeout(timeout);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
similarity index 51%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
index 8aa7841a8d0..575731d04a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
@@ -17,63 +17,54 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.execution.load;
+package org.apache.iotdb.commons.pipe.connector.limiter;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.RateLimiter;
-public class LoadTsFileRateLimiter {
+/** This is a global rate limiter for all connectors. */
+public class GlobalRateLimiter {
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final PipeConfig CONFIG = PipeConfig.getInstance();
private final AtomicDouble throughputBytesPerSecond =
- new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond());
- private final RateLimiter loadWriteRateLimiter;
+ new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond());
+ private final RateLimiter rateLimiter;
- private LoadTsFileRateLimiter() {
+ public GlobalRateLimiter() {
final double throughputBytesPerSecondLimit =
throughputBytesPerSecond.get();
- loadWriteRateLimiter =
- // if throughput <= 0, disable rate limiting
+ rateLimiter =
throughputBytesPerSecondLimit <= 0
? RateLimiter.create(Double.MAX_VALUE)
: RateLimiter.create(throughputBytesPerSecondLimit);
}
public void acquire(long bytes) {
- if (throughputBytesPerSecond.get() !=
CONFIG.getLoadWriteThroughputBytesPerSecond()) {
- final double newThroughputBytesPerSecond =
CONFIG.getLoadWriteThroughputBytesPerSecond();
- throughputBytesPerSecond.set(newThroughputBytesPerSecond);
- loadWriteRateLimiter.setRate(
+ final double throughputBytesPerSecondLimit =
+ CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+
+ if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) {
+ throughputBytesPerSecond.set(throughputBytesPerSecondLimit);
+ rateLimiter.setRate(
// if throughput <= 0, disable rate limiting
- newThroughputBytesPerSecond <= 0 ? Double.MAX_VALUE :
newThroughputBytesPerSecond);
+ throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE :
throughputBytesPerSecondLimit);
+ }
+
+ // For performance, we don't need to acquire rate limiter if throughput <= 0
+ if (throughputBytesPerSecondLimit <= 0) {
+ return;
}
while (bytes > 0) {
if (bytes > Integer.MAX_VALUE) {
- loadWriteRateLimiter.acquire(Integer.MAX_VALUE);
+ rateLimiter.acquire(Integer.MAX_VALUE);
bytes -= Integer.MAX_VALUE;
} else {
- loadWriteRateLimiter.acquire((int) bytes);
+ rateLimiter.acquire((int) bytes);
return;
}
}
}
-
- //////////////////////////// Singleton ////////////////////////////
-
- private static class LoadTsFileRateLimiterHolder {
-
- private static final LoadTsFileRateLimiter INSTANCE = new
LoadTsFileRateLimiter();
-
- private LoadTsFileRateLimiterHolder() {
- // Prevent instantiation
- }
- }
-
- public static LoadTsFileRateLimiter getInstance() {
- return LoadTsFileRateLimiterHolder.INSTANCE;
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
new file mode 100644
index 00000000000..8b151644576
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.limiter;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeEndPointRateLimiter {
+
+ private final double bytesPerSecondLimit;
+
+ private final ConcurrentMap<TEndPoint, RateLimiter> endPointRateLimiterMap;
+
+ public PipeEndPointRateLimiter(double bytesPerSecondLimit) {
+ this.bytesPerSecondLimit = bytesPerSecondLimit;
+ endPointRateLimiterMap = new ConcurrentHashMap<>();
+ }
+
+ public void acquire(final TEndPoint endPoint, long bytes) {
+ if (endPoint == null) {
+ return;
+ }
+
+ final RateLimiter rateLimiter =
+ endPointRateLimiterMap.computeIfAbsent(
+ endPoint, e -> RateLimiter.create(bytesPerSecondLimit));
+
+ while (bytes > 0) {
+ if (bytes > Integer.MAX_VALUE) {
+ rateLimiter.acquire(Integer.MAX_VALUE);
+ bytes -= Integer.MAX_VALUE;
+ } else {
+ rateLimiter.acquire((int) bytes);
+ return;
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index ec2d22ddfa6..7aac1ffc90d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.connector.protocol;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant;
@@ -57,11 +58,29 @@ import static
org.apache.iotdb.commons.utils.BasicStructureSerDeUtil.LONG_LEN;
public abstract class IoTDBAirGapConnector extends IoTDBConnector {
+ protected static class AirGapSocket extends Socket {
+
+ private final TEndPoint endPoint;
+
+ public AirGapSocket(String ip, int port) {
+ this.endPoint = new TEndPoint(ip, port);
+ }
+
+ public TEndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ @Override
+ public String toString() {
+ return "AirGapSocket{" + "endPoint=" + endPoint + "} (" +
super.toString() + ")";
+ }
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBAirGapConnector.class);
protected static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
- protected final List<Socket> sockets = new ArrayList<>();
+ protected final List<AirGapSocket> sockets = new ArrayList<>();
protected final List<Boolean> isSocketAlive = new ArrayList<>();
private LoadBalancer loadBalancer;
@@ -147,7 +166,7 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
}
}
- final Socket socket = new Socket();
+ final AirGapSocket socket = new AirGapSocket(ip, port);
try {
socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs);
@@ -176,7 +195,7 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
String.format("All target servers %s are not available.", nodeUrls));
}
- protected void sendHandshakeReq(Socket socket) throws IOException {
+ protected void sendHandshakeReq(AirGapSocket socket) throws IOException {
socket.setSoTimeout(handshakeTimeoutMs);
// Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to
handshake by
// PipeTransferHandshakeV1Req. If failed again, throw
PipeConnectionException.
@@ -208,7 +227,8 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
}
}
- protected void transferFilePieces(File file, Socket socket, boolean
isMultiFile)
+ protected void transferFilePieces(
+ String pipeName, File file, AirGapSocket socket, boolean isMultiFile)
throws PipeException, IOException {
final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
@@ -225,11 +245,11 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
? readBuffer
: Arrays.copyOfRange(readBuffer, 0, readLength);
if (!send(
+ pipeName,
socket,
- compressIfNeeded(
- isMultiFile
- ? getTransferMultiFilePieceBytes(file.getName(), position,
payload)
- : getTransferSingleFilePieceBytes(file.getName(),
position, payload)))) {
+ isMultiFile
+ ? getTransferMultiFilePieceBytes(file.getName(), position,
payload)
+ : getTransferSingleFilePieceBytes(file.getName(), position,
payload))) {
final String errorMessage =
String.format("Transfer file %s error. Socket %s.", file,
socket);
if (mayNeedHandshakeWhenFail()) {
@@ -261,11 +281,15 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
return loadBalancer.nextSocketIndex();
}
- protected boolean send(Socket socket, byte[] bytes) throws IOException {
+ protected boolean send(String pipeName, AirGapSocket socket, byte[] bytes)
throws IOException {
if (!socket.isConnected()) {
return false;
}
+ bytes = compressIfNeeded(bytes);
+
+ rateLimitIfNeeded(pipeName, socket.getEndPoint(), bytes.length);
+
final BufferedOutputStream outputStream = new
BufferedOutputStream(socket.getOutputStream());
bytes = enrichWithLengthAndChecksum(bytes);
outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes);
@@ -276,6 +300,10 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response);
}
+ protected boolean send(AirGapSocket socket, byte[] bytes) throws IOException
{
+ return send(null, socket, bytes);
+ }
+
private byte[] enrichWithLengthAndChecksum(byte[] bytes) {
// Length of checksum and bytes payload
final byte[] length = BytesUtils.intToBytes(bytes.length + LONG_LEN);
@@ -309,6 +337,8 @@ public abstract class IoTDBAirGapConnector extends
IoTDBConnector {
isSocketAlive.set(i, false);
}
}
+
+ super.close();
}
/////////////////////// Strategies for load balance
//////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 751f0dec05f..f65daf86d4f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
import
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
+import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter;
+import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -40,8 +42,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
@@ -65,6 +69,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
@@ -77,6 +83,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
public abstract class IoTDBConnector implements PipeConnector {
@@ -91,8 +98,13 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected String loadBalanceStrategy;
- protected boolean isRpcCompressionEnabled;
- protected final List<PipeCompressor> compressors = new ArrayList<>();
+ private boolean isRpcCompressionEnabled;
+ private final List<PipeCompressor> compressors = new ArrayList<>();
+
+ private static final Map<String, PipeEndPointRateLimiter>
PIPE_END_POINT_RATE_LIMITER_MAP =
+ new ConcurrentHashMap<>();
+ private double endPointRateLimitBytesPerSecond = -1;
+ private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new
GlobalRateLimiter();
protected boolean isTabletBatchModeEnabled = true;
@@ -169,6 +181,17 @@ public abstract class IoTDBConnector implements
PipeConnector {
compressors.size());
isRpcCompressionEnabled = !compressors.isEmpty();
+ endPointRateLimitBytesPerSecond =
+ parameters.getDoubleOrDefault(
+ Arrays.asList(CONNECTOR_RATE_LIMIT_KEY, SINK_RATE_LIMIT_KEY),
+ CONNECTOR_RATE_LIMIT_DEFAULT_VALUE);
+ validator.validate(
+ arg -> endPointRateLimitBytesPerSecond <= Double.MAX_VALUE,
+ String.format(
+ "Rate limit should be in the range (0, %f], but got %f.",
+ Double.MAX_VALUE, endPointRateLimitBytesPerSecond),
+ endPointRateLimitBytesPerSecond);
+
validator.validate(
arg -> arg.equals("retry") || arg.equals("ignore"),
String.format(
@@ -302,6 +325,12 @@ public abstract class IoTDBConnector implements
PipeConnector {
}
}
+ @Override
+ public void close() {
+ // TODO: Not all the limiters should be closed here, but it's fine for now.
+ PIPE_END_POINT_RATE_LIMITER_MAP.clear();
+ }
+
protected TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws
IOException {
return isRpcCompressionEnabled
? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors)
@@ -322,6 +351,18 @@ public abstract class IoTDBConnector implements
PipeConnector {
return compressors;
}
+ public void rateLimitIfNeeded(
+ final String pipeName, final TEndPoint endPoint, final long bytesLength)
{
+ if (pipeName != null && endPointRateLimitBytesPerSecond > 0) {
+ PIPE_END_POINT_RATE_LIMITER_MAP
+ .computeIfAbsent(
+ pipeName, endpoint -> new
PipeEndPointRateLimiter(endPointRateLimitBytesPerSecond))
+ .acquire(endPoint, bytesLength);
+ }
+
+ GLOBAL_RATE_LIMITER.acquire(bytesLength);
+ }
+
public PipeReceiverStatusHandler statusHandler() {
return receiverStatusHandler;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 4e1ab4d4789..713f25df29e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import com.google.common.collect.ImmutableList;
import org.apache.tsfile.utils.Pair;
@@ -143,7 +144,10 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
}
protected void transferFilePieces(
- File file, Pair<IoTDBSyncClient, Boolean> clientAndStatus, boolean
isMultiFile)
+ String pipeName,
+ File file,
+ Pair<IoTDBSyncClient, Boolean> clientAndStatus,
+ boolean isMultiFile)
throws PipeException, IOException {
final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
@@ -161,16 +165,16 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
: Arrays.copyOfRange(readBuffer, 0, readLength);
final PipeTransferFilePieceResp resp;
try {
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ isMultiFile
+ ? getTransferMultiFilePieceReq(file.getName(), position,
payLoad)
+ : getTransferSingleFilePieceReq(file.getName(),
position, payLoad));
+ rateLimitIfNeeded(
+ pipeName, clientAndStatus.getLeft().getEndPoint(),
req.getBody().length);
resp =
PipeTransferFilePieceResp.fromTPipeTransferResp(
- clientAndStatus
- .getLeft()
- .pipeTransfer(
- compressIfNeeded(
- isMultiFile
- ?
getTransferMultiFilePieceReq(file.getName(), position, payLoad)
- : getTransferSingleFilePieceReq(
- file.getName(), position, payLoad))));
+ clientAndStatus.getLeft().pipeTransfer(req));
} catch (Exception e) {
clientAndStatus.setRight(false);
throw new PipeConnectionException(
@@ -219,5 +223,7 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
if (clientManager != null) {
clientManager.close();
}
+
+ super.close();
}
}