This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new c442bf7ca1f Pipe: Support async tsfile load when data syncing between
clusters (#13311)
c442bf7ca1f is described below
commit c442bf7ca1fba6e8612a1a74219c91f438585858
Author: YC27 <[email protected]>
AuthorDate: Tue Aug 27 16:44:26 2024 +0800
Pipe: Support async tsfile load when data syncing between clusters (#13311)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 97 ++++++++++++++++++++++
.../client/IoTDBConfigNodeSyncClientManager.java | 6 +-
.../protocol/IoTDBConfigRegionAirGapConnector.java | 2 +
.../protocol/IoTDBConfigRegionConnector.java | 6 +-
.../client/IoTDBDataNodeAsyncClientManager.java | 32 ++++---
.../client/IoTDBDataNodeSyncClientManager.java | 6 +-
.../airgap/IoTDBDataNodeAirGapConnector.java | 2 +
.../async/IoTDBDataRegionAsyncConnector.java | 3 +-
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 6 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 32 ++++++-
.../load/active/ActiveLoadTsFileLoader.java | 57 +------------
.../config/constant/PipeConnectorConstant.java | 11 +++
.../connector/client/IoTDBSyncClientManager.java | 8 +-
.../common/PipeTransferHandshakeConstant.java | 1 +
.../pipe/connector/protocol/IoTDBConnector.java | 20 +++++
.../connector/protocol/IoTDBSslSyncConnector.java | 6 +-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 14 ++++
.../org/apache/iotdb/commons/utils/FileUtils.java | 53 ++++++++++++
18 files changed, 278 insertions(+), 84 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 7fda1250b1b..26c76bf0042 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -351,4 +351,101 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
Collections.singleton("root.ln.wf01.wt02,true,null,INF,"));
}
}
+
+ @Test
+ public void testSyncLoadTsFile() throws Exception {
+ testReceiverLoadTsFile("sync");
+ }
+
+ @Test
+ public void testAsyncLoadTsFile() throws Exception {
+ testReceiverLoadTsFile("async");
+ }
+
+ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws
Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)",
"flush"))) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("extractor.realtime.mode", "forced-log");
+
+ connectorAttributes.put("sink", "iotdb-thrift-sink");
+ connectorAttributes.put("sink.batch.enable", "false");
+ connectorAttributes.put("sink.ip", receiverIp);
+ connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+ connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes))
+ .getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (2, 1)",
"flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,",
"2,1.0,"))));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipe("testPipe").getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes))
+ .getCode());
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.vehicle.d0(time, s1) values (4, 1)",
+ "insert into root.vehicle.d0(time, s1) values (3, 1), (0, 1)",
+ "flush"))) {
+ return;
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from root.**",
+ "Time,root.vehicle.d0.s1,",
+ Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList("0,1.0,", "1,1.0,", "2,1.0,",
"3,1.0,", "4,1.0,"))));
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
index fd5b3c9ddb9..b2967e3bdf8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/client/IoTDBConfigNodeSyncClientManager.java
@@ -39,7 +39,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
String trustStorePath,
String trustStorePwd,
String loadBalanceStrategy,
- boolean shouldReceiverConvertOnTypeMismatch) {
+ boolean shouldReceiverConvertOnTypeMismatch,
+ String loadTsFileStrategy) {
super(
endPoints,
useSSL,
@@ -47,7 +48,8 @@ public class IoTDBConfigNodeSyncClientManager extends
IoTDBSyncClientManager {
trustStorePwd,
false,
loadBalanceStrategy,
- shouldReceiverConvertOnTypeMismatch);
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index ba151351c39..0d466605d8f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -71,6 +71,8 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 96059980c38..1e2f3c19e5a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -63,14 +63,16 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
final String trustStorePwd,
final boolean useLeaderCache,
final String loadBalanceStrategy,
- final boolean shouldReceiverConvertOnTypeMismatch) {
+ final boolean shouldReceiverConvertOnTypeMismatch,
+ final String loadTsFileStrategy) {
return new IoTDBConfigNodeSyncClientManager(
nodeUrls,
useSSL,
trustStorePath,
trustStorePwd,
loadBalanceStrategy,
- shouldReceiverConvertOnTypeMismatch);
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 6814f8151b9..bfe5360e749 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -43,8 +43,10 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -60,35 +62,38 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private final Set<TEndPoint> endPointSet;
- private static final AtomicReference<
- IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
- ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
+ private static final Map<String, IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>>
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new
ConcurrentHashMap<>();
private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
endPoint2Client;
private final LoadBalancer loadBalancer;
private final boolean shouldReceiverConvertOnTypeMismatch;
+ private final String loadTsFileStrategy;
+
public IoTDBDataNodeAsyncClientManager(
List<TEndPoint> endPoints,
boolean useLeaderCache,
String loadBalanceStrategy,
- boolean shouldReceiverConvertOnTypeMismatch) {
+ boolean shouldReceiverConvertOnTypeMismatch,
+ String loadTsFileStrategy) {
super(endPoints, useLeaderCache);
endPointSet = new HashSet<>(endPoints);
- if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+ final String receiverAttributes =
+ String.format("%s-%s", shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
+ if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
synchronized (IoTDBDataRegionAsyncConnector.class) {
- if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
- ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
- new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
- .createClientManager(
- new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
- }
+ ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
+ receiverAttributes,
+ new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
}
}
- endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
+ endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
switch (loadBalanceStrategy) {
case CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY:
@@ -108,6 +113,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
+ this.loadTsFileStrategy = loadTsFileStrategy;
}
public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
@@ -219,6 +225,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index df9ae2a0e29..5a341dc7574 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -49,7 +49,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
String trustStorePwd,
boolean useLeaderCache,
String loadBalanceStrategy,
- boolean shouldReceiverConvertOnTypeMismatch) {
+ boolean shouldReceiverConvertOnTypeMismatch,
+ String loadTsFileStrategy) {
super(
endPoints,
useSSL,
@@ -57,7 +58,8 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
trustStorePwd,
useLeaderCache,
loadBalanceStrategy,
- shouldReceiverConvertOnTypeMismatch);
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index f0491431196..221fa516448 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -103,6 +103,8 @@ public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 3f37b247869..394e2e95d2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -125,7 +125,8 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY,
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
loadBalanceStrategy,
- shouldReceiverConvertOnTypeMismatch);
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 32c886fbe56..9580346d25b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -87,7 +87,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
final String trustStorePwd,
final boolean useLeaderCache,
final String loadBalanceStrategy,
- final boolean shouldReceiverConvertOnTypeMismatch) {
+ final boolean shouldReceiverConvertOnTypeMismatch,
+ final String loadTsFileStrategy) {
clientManager =
new IoTDBDataNodeSyncClientManager(
nodeUrls,
@@ -96,7 +97,8 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
trustStorePwd,
useLeaderCache,
loadBalanceStrategy,
- shouldReceiverConvertOnTypeMismatch);
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy);
return clientManager;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 20a34319d35..52bbe1f68bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransf
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferSliceReq;
import org.apache.iotdb.commons.pipe.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -87,12 +88,14 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -405,8 +408,10 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
@Override
protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final
String fileAbsolutePath)
- throws FileNotFoundException {
- return loadTsFile(fileAbsolutePath);
+ throws IOException {
+ return isUsingAsyncLoadTsFileStrategy.get()
+ ? loadTsFileAsync(Collections.singletonList(fileAbsolutePath))
+ : loadTsFileSync(fileAbsolutePath);
}
@Override
@@ -415,11 +420,30 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
throws IOException, IllegalPathException {
return req instanceof PipeTransferTsFileSealWithModReq
// TsFile's absolute path will be the second element
- ? loadTsFile(fileAbsolutePaths.get(1))
+ ? (isUsingAsyncLoadTsFileStrategy.get()
+ ? loadTsFileAsync(fileAbsolutePaths)
+ : loadTsFileSync(fileAbsolutePaths.get(1)))
: loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
}
- private TSStatus loadTsFile(final String fileAbsolutePath) throws
FileNotFoundException {
+ private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws
IOException {
+ final String loadActiveListeningPipeDir =
IOTDB_CONFIG.getLoadActiveListeningPipeDir();
+
+ for (final String absolutePath : absolutePaths) {
+ if (absolutePath == null) {
+ continue;
+ }
+ final File sourceFile = new File(absolutePath);
+ if (!Objects.equals(
+ loadActiveListeningPipeDir,
sourceFile.getParentFile().getAbsolutePath())) {
+ FileUtils.moveFileWithMD5Check(sourceFile, new
File(loadActiveListeningPipeDir));
+ }
+ }
+
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ private TSStatus loadTsFileSync(final String fileAbsolutePath) throws
FileNotFoundException {
final LoadTsFileStatement statement = new
LoadTsFileStatement(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index a5b45146067..b130e58e2f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -36,9 +36,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,12 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
-import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.ZoneId;
import java.util.Objects;
@@ -231,7 +227,6 @@ public class ActiveLoadTsFileLoader {
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to
memory constraints, will retry later.",
filePair.getLeft(),
filePair.getRight());
- pendingQueue.enqueue(filePair.getLeft(), filePair.getRight());
} else {
LOGGER.warn(
"Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of
an unexpected exception. File will be moved to fail directory.",
@@ -257,62 +252,12 @@ public class ActiveLoadTsFileLoader {
final File targetDir = new File(failDir.get());
try {
- moveFileWithMD5Check(sourceFile, targetDir);
+
org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check(sourceFile,
targetDir);
} catch (final IOException e) {
LOGGER.warn("Error occurred during moving file {} to fail directory.",
filePath, e);
}
}
- private static void moveFileWithMD5Check(final File sourceFile, final File
targetDir)
- throws IOException {
- final String sourceFileName = sourceFile.getName();
- final File targetFile = new File(targetDir, sourceFileName);
-
- if (targetFile.exists()) {
- if (haveSameMD5(sourceFile, targetFile)) {
- FileUtils.forceDelete(sourceFile);
- LOGGER.info(
- "Deleted the file {} because it already exists in the fail
directory: {}",
- sourceFile.getName(),
- targetDir.getAbsolutePath());
- } else {
- renameWithMD5(sourceFile, targetDir);
- LOGGER.info(
- "Renamed file {} to {} because it already exists in the fail
directory: {}",
- sourceFile.getName(),
- targetFile.getName(),
- targetDir.getAbsolutePath());
- }
- } else {
- FileUtils.moveFileToDirectory(sourceFile, targetDir, true);
- LOGGER.info(
- "Moved file {} to fail directory {}.", sourceFile.getName(),
targetDir.getAbsolutePath());
- }
- }
-
- private static boolean haveSameMD5(final File file1, final File file2) {
- try (final InputStream is1 = Files.newInputStream(file1.toPath());
- final InputStream is2 = Files.newInputStream(file2.toPath())) {
- return DigestUtils.md5Hex(is1).equals(DigestUtils.md5Hex(is2));
- } catch (final Exception e) {
- return false;
- }
- }
-
- private static void renameWithMD5(File sourceFile, File targetDir) throws
IOException {
- try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
- final String sourceFileBaseName =
FilenameUtils.getBaseName(sourceFile.getName());
- final String sourceFileExtension =
FilenameUtils.getExtension(sourceFile.getName());
- final String sourceFileMD5 = DigestUtils.md5Hex(is);
-
- final String targetFileName =
- sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." +
sourceFileExtension;
- final File targetFile = new File(targetDir, targetFileName);
-
- FileUtils.moveFile(sourceFile, targetFile,
StandardCopyOption.REPLACE_EXISTING);
- }
- }
-
// Metrics
public long countAndReportFailedFileNumber() {
final long[] fileCount = {0};
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 87ec0aaaeae..77eccfb4069 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -225,6 +225,17 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_CONSENSUS_GROUP_ID_KEY =
"connector.consensus.group-id";
public static final String CONNECTOR_CONSENSUS_PIPE_NAME =
"connector.consensus.pipe-name";
+ public static final String CONNECTOR_LOAD_TSFILE_STRATEGY_KEY =
"connector.load-tsfile-strategy";
+ public static final String SINK_LOAD_TSFILE_STRATEGY_KEY =
"sink.load-tsfile-strategy";
+ public static final String CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE =
"async";
+ public static final String CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE =
"sync";
+ public static final Set<String> CONNECTOR_LOAD_TSFILE_STRATEGY_SET =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE,
+ CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)));
+
private PipeConnectorConstant() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index 57b9a4fede4..81b5194621c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -62,6 +62,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
private final boolean shouldReceiverConvertOnTypeMismatch;
+ private final String loadTsFileStrategy;
+
protected IoTDBSyncClientManager(
List<TEndPoint> endPoints,
boolean useSSL,
@@ -69,7 +71,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
String trustStorePwd,
boolean useLeaderCache,
String loadBalanceStrategy,
- boolean shouldReceiverConvertOnTypeMismatch) {
+ boolean shouldReceiverConvertOnTypeMismatch,
+ String loadTsFileStrategy) {
super(endPoints, useLeaderCache);
this.useSSL = useSSL;
@@ -98,6 +101,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
}
this.shouldReceiverConvertOnTypeMismatch =
shouldReceiverConvertOnTypeMismatch;
+ this.loadTsFileStrategy = loadTsFileStrategy;
}
public void checkClientStatusAndTryReconstructIfNecessary() {
@@ -177,6 +181,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
loadTsFileStrategy);
// Try to handshake by PipeTransferHandshakeV2Req.
TPipeTransferResp resp =
client.pipeTransfer(buildHandshakeV2Req(params));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
index b6864706aaa..ec8818072d2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/thrift/common/PipeTransferHandshakeConstant.java
@@ -24,6 +24,7 @@ public class PipeTransferHandshakeConstant {
public static final String HANDSHAKE_KEY_TIME_PRECISION =
"timestampPrecision";
public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
public static final String HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH =
"convertOnTypeMismatch";
+ public static final String HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY =
"loadTsFileStrategy";
private PipeTransferHandshakeConstant() {
// Utility class
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 55231032e5a..eb66a638672 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -83,6 +83,9 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SET;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
@@ -101,6 +104,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
public abstract class IoTDBConnector implements PipeConnector {
@@ -116,6 +120,8 @@ public abstract class IoTDBConnector implements
PipeConnector {
protected String loadBalanceStrategy;
+ protected String loadTsFileStrategy;
+
private boolean isRpcCompressionEnabled;
private final List<PipeCompressor> compressors = new ArrayList<>();
@@ -183,6 +189,20 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
loadBalanceStrategy);
+ loadTsFileStrategy =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_LOAD_TSFILE_STRATEGY_KEY,
SINK_LOAD_TSFILE_STRATEGY_KEY),
+ CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)
+ .trim()
+ .toLowerCase();
+ validator.validate(
+ arg -> CONNECTOR_LOAD_TSFILE_STRATEGY_SET.contains(loadTsFileStrategy),
+ String.format(
+ "Load tsfile strategy should be one of %s, but got %s.",
+ CONNECTOR_LOAD_TSFILE_STRATEGY_SET, loadTsFileStrategy),
+ loadTsFileStrategy);
+
final int zstdCompressionLevel =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY,
SINK_COMPRESSOR_ZSTD_LEVEL_KEY),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 271596cf549..b1a0174e75c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -122,7 +122,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
trustStorePwd,
useLeaderCache,
loadBalanceStrategy,
- shouldReceiverConvertOnTypeMismatch);
+ shouldReceiverConvertOnTypeMismatch,
+ loadTsFileStrategy);
}
protected abstract IoTDBSyncClientManager constructClient(
@@ -132,7 +133,8 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
String trustStorePwd,
boolean useLeaderCache,
String loadBalanceStrategy,
- boolean shouldReceiverConvertOnTypeMismatch);
+ boolean shouldReceiverConvertOnTypeMismatch,
+ String loadTsFileStrategy);
@Override
public void handshake() throws Exception {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 7197e112526..4b37cf9aabd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
@@ -46,6 +47,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -71,6 +73,9 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
protected boolean shouldConvertDataTypeOnTypeMismatch =
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
+ // Used to determine current strategy is sync or async
+ protected final AtomicBoolean isUsingAsyncLoadTsFileStrategy = new
AtomicBoolean(false);
+
@Override
public IoTDBConnectorRequestVersion getVersion() {
return IoTDBConnectorRequestVersion.VERSION_1;
@@ -228,6 +233,15 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
Boolean.parseBoolean(shouldConvertDataTypeOnTypeMismatchString);
}
+ final String loadTsFileStrategyString =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY);
+ if (loadTsFileStrategyString != null) {
+ isUsingAsyncLoadTsFileStrategy.set(
+ Objects.equals(
+ PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_ASYNC_VALUE,
+ loadTsFileStrategyString));
+ }
+
// Handle the handshake request as a v1 request.
// Here we construct a fake "dataNode" request to valid from v1 validation
logic, though
// it may not require the actual type of the v1 request.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index ee2f5098642..71498bbe220 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.commons.utils;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,11 +32,13 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.util.Arrays;
@@ -304,4 +308,53 @@ public class FileUtils {
}
return String.format("%.2f %cB", bytes / 1000.0, ci.current());
}
+
+ public static void moveFileWithMD5Check(final File sourceFile, final File
targetDir)
+ throws IOException {
+ final String sourceFileName = sourceFile.getName();
+ final File targetFile = new File(targetDir, sourceFileName);
+
+ if (targetFile.exists()) {
+ if (haveSameMD5(sourceFile, targetFile)) {
+ org.apache.commons.io.FileUtils.forceDelete(sourceFile);
+ LOGGER.info(
+ "Deleted the file {} because it already exists in the fail
directory: {}",
+ sourceFile.getName(),
+ targetDir.getAbsolutePath());
+ } else {
+ renameWithMD5(sourceFile, targetDir);
+ LOGGER.info(
+ "Renamed file {} to {} because it already exists in the fail
directory: {}",
+ sourceFile.getName(),
+ targetFile.getName(),
+ targetDir.getAbsolutePath());
+ }
+ } else {
+ org.apache.commons.io.FileUtils.moveFileToDirectory(sourceFile,
targetDir, true);
+ }
+ }
+
+ private static boolean haveSameMD5(final File file1, final File file2) {
+ try (final InputStream is1 = Files.newInputStream(file1.toPath());
+ final InputStream is2 = Files.newInputStream(file2.toPath())) {
+ return DigestUtils.md5Hex(is1).equals(DigestUtils.md5Hex(is2));
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ private static void renameWithMD5(File sourceFile, File targetDir) throws
IOException {
+ try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
+ final String sourceFileBaseName =
FilenameUtils.getBaseName(sourceFile.getName());
+ final String sourceFileExtension =
FilenameUtils.getExtension(sourceFile.getName());
+ final String sourceFileMD5 = DigestUtils.md5Hex(is);
+
+ final String targetFileName =
+ sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." +
sourceFileExtension;
+ final File targetFile = new File(targetDir, targetFileName);
+
+ org.apache.commons.io.FileUtils.moveFile(
+ sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
}