This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7e4e01cc632 Load & Pipe: Support Active Load Table Model TsFiles &
Support Async Load in SQL & Support Async Load Strategy in Pipe (#15208)
7e4e01cc632 is described below
commit 7e4e01cc6327a3163432c05b97e46dd70df923a2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Mar 28 23:21:08 2025 +0800
Load & Pipe: Support Active Load Table Model TsFiles & Support Async Load
in SQL & Support Async Load Strategy in Pipe (#15208)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 20 ++--
.../plan/analyze/load/LoadTsFileAnalyzer.java | 76 +++++++++++++++
.../plan/relational/sql/ast/LoadTsFile.java | 6 ++
.../plan/statement/crud/LoadTsFileStatement.java | 10 ++
.../load/active/ActiveLoadTsFileLoader.java | 102 +++++++++++++--------
.../load/config/LoadTsFileConfigurator.java | 20 ++++
.../org/apache/iotdb/commons/utils/FileUtils.java | 49 ++++++++++
7 files changed, 239 insertions(+), 44 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 082655b45fe..ea2574f47ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -567,27 +567,31 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadTsFileAsync(final String dataBaseName, final
List<String> absolutePaths)
throws IOException {
- if (Objects.nonNull(dataBaseName)) {
- throw new PipeException(
- "Async load tsfile does not support table model tsfile. Given
database name: "
- + dataBaseName);
- }
-
final String loadActiveListeningPipeDir =
IOTDB_CONFIG.getLoadActiveListeningPipeDir();
if (Objects.isNull(loadActiveListeningPipeDir)) {
throw new PipeException("Load active listening pipe dir is not set.");
}
+ if (Objects.nonNull(dataBaseName)) {
+ final File targetDir = new File(loadActiveListeningPipeDir,
dataBaseName);
+ return this.loadTsFileAsyncToTargetDir(targetDir, absolutePaths);
+ }
+
+ return loadTsFileAsyncToTargetDir(new File(loadActiveListeningPipeDir),
absolutePaths);
+ }
+
+ private TSStatus loadTsFileAsyncToTargetDir(
+ final File targetDir, final List<String> absolutePaths) throws
IOException {
for (final String absolutePath : absolutePaths) {
if (absolutePath == null) {
continue;
}
final File sourceFile = new File(absolutePath);
if (!Objects.equals(
- loadActiveListeningPipeDir,
sourceFile.getParentFile().getAbsolutePath())) {
+ targetDir.getAbsolutePath(),
sourceFile.getParentFile().getAbsolutePath())) {
RetryUtils.retryOnException(
() -> {
- FileUtils.moveFileWithMD5Check(sourceFile, new
File(loadActiveListeningPipeDir));
+ FileUtils.moveFileWithMD5Check(sourceFile, targetDir);
return null;
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 3da8922f4e1..498e678b4bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
@@ -80,6 +81,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
+import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName;
import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
@@ -117,6 +120,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
// User specified configs
private final int databaseLevel;
private String databaseForTableData;
+ private final boolean isAsyncLoad;
private final boolean isVerifySchema;
private final boolean isAutoCreateDatabase;
private final boolean isDeleteAfterLoad;
@@ -143,6 +147,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
this.databaseForTableData = loadTsFileStatement.getDatabase();
+ this.isAsyncLoad = loadTsFileStatement.isAsyncLoad();
this.isVerifySchema = loadTsFileStatement.isVerifySchema();
this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
@@ -166,6 +171,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
this.databaseForTableData = loadTsFileTableStatement.getDatabase();
+ this.isAsyncLoad = loadTsFileTableStatement.isAsyncLoad();
this.isVerifySchema = loadTsFileTableStatement.isVerifySchema();
this.isAutoCreateDatabase =
loadTsFileTableStatement.isAutoCreateDatabase();
this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
@@ -199,6 +205,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
return analysis;
}
+ if (isAsyncLoad && doAsyncLoad(analysis)) {
+ return analysis;
+ }
+
try {
if (!doAnalyzeFileByFile(analysis)) {
return analysis;
@@ -268,6 +278,72 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
return true;
}
+ private boolean doAsyncLoad(final IAnalysis analysis) {
+ final String[] loadActiveListeningDirs =
+ IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+ String targetFilePath = null;
+ for (int i = 0, size = loadActiveListeningDirs == null ? 0 :
loadActiveListeningDirs.length;
+ i < size;
+ i++) {
+ if (loadActiveListeningDirs[i] != null) {
+ targetFilePath = loadActiveListeningDirs[i];
+ break;
+ }
+ }
+ if (targetFilePath == null) {
+ LOGGER.warn("Load active listening dir is not set. Will try sync load
instead.");
+ return false;
+ }
+
+ try {
+ if (Objects.nonNull(databaseForTableData)) {
+ loadTsFilesAsyncToTargetDir(new File(targetFilePath,
databaseForTableData), tsFiles);
+ } else {
+ loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to async load tsfiles {} to target dir {}. Will try sync
load instead.",
+ tsFiles,
+ targetFilePath,
+ e);
+ return false;
+ }
+
+ analysis.setFinishQueryAfterAnalyze(true);
+ setRealStatement(analysis);
+ return true;
+ }
+
+ private void loadTsFilesAsyncToTargetDir(final File targetDir, final
List<File> files)
+ throws IOException {
+ for (final File file : files) {
+ if (file == null) {
+ continue;
+ }
+
+ loadTsFileAsyncToTargetDir(targetDir, file);
+ loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() +
".resource"));
+ loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() +
".mods"));
+ }
+ }
+
+ private void loadTsFileAsyncToTargetDir(final File targetDir, final File
file)
+ throws IOException {
+ if (!file.exists()) {
+ return;
+ }
+ RetryUtils.retryOnException(
+ () -> {
+ if (isDeleteAfterLoad) {
+ moveFileWithMD5Check(file, targetDir);
+ } else {
+ copyFileWithMD5Check(file, targetDir);
+ }
+ return null;
+ });
+ }
+
private boolean doAnalyzeFileByFile(IAnalysis analysis) {
// analyze tsfile metadata file by file
for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index f42ac22002e..180056c774e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -46,6 +46,7 @@ public class LoadTsFile extends Statement {
private long tabletConversionThresholdBytes = -1;
private boolean autoCreateDatabase = true;
private boolean verify = true;
+ private boolean isAsyncLoad = false;
private boolean isGeneratedByPipe = false;
@@ -138,6 +139,10 @@ public class LoadTsFile extends Statement {
return this;
}
+ public boolean isAsyncLoad() {
+ return isAsyncLoad;
+ }
+
public void markIsGeneratedByPipe() {
isGeneratedByPipe = true;
}
@@ -183,6 +188,7 @@ public class LoadTsFile extends Statement {
this.tabletConversionThresholdBytes =
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
this.verify =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
+ this.isAsyncLoad =
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
}
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 2acd906cfa4..abb42b74417 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -43,6 +43,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ASYNC_LOAD_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY;
@@ -62,6 +63,7 @@ public class LoadTsFileStatement extends Statement {
private long tabletConversionThresholdBytes = -1;
private boolean autoCreateDatabase = true;
private boolean isGeneratedByPipe = false;
+ private boolean isAsyncLoad = false;
private Map<String, String> loadAttributes;
@@ -249,6 +251,10 @@ public class LoadTsFileStatement extends Statement {
initAttributes();
}
+ public boolean isAsyncLoad() {
+ return isAsyncLoad;
+ }
+
private void initAttributes() {
this.databaseLevel =
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
@@ -258,6 +264,7 @@ public class LoadTsFileStatement extends Statement {
this.tabletConversionThresholdBytes =
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
this.verifySchema =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
+ this.isAsyncLoad =
LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
}
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
@@ -326,6 +333,7 @@ public class LoadTsFileStatement extends Statement {
loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(convertOnTypeMismatch));
loadAttributes.put(
TABLET_CONVERSION_THRESHOLD_KEY,
String.valueOf(tabletConversionThresholdBytes));
+ loadAttributes.put(ASYNC_LOAD_KEY, String.valueOf(isAsyncLoad));
return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
}
@@ -350,6 +358,8 @@ public class LoadTsFileStatement extends Statement {
+ convertOnTypeMismatch
+ ", tablet-conversion-threshold="
+ tabletConversionThresholdBytes
+ + ", async-load="
+ + isAsyncLoad
+ ", tsFiles size="
+ tsFiles.size()
+ '}';
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 65665f5e3d4..2bae3175ca0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,12 +24,14 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
-import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
@@ -54,6 +56,7 @@ import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.ZoneId;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
@@ -67,6 +70,8 @@ public class ActiveLoadTsFileLoader {
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
private static final int MAX_PENDING_SIZE = 1000;
private final ActiveLoadPendingQueue pendingQueue = new
ActiveLoadPendingQueue();
@@ -149,30 +154,43 @@ public class ActiveLoadTsFileLoader {
}
private void tryLoadPendingTsFiles() {
- while (true) {
- final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
- if (!filePair.isPresent()) {
- return;
- }
+ final IClientSession session =
+ new InternalClientSession(
+ String.format(
+ "%s_%s",
+ ActiveLoadTsFileLoader.class.getSimpleName(),
Thread.currentThread().getName()));
+ session.setUsername(AuthorityChecker.SUPER_USER);
+ session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
+ session.setZoneId(ZoneId.systemDefault());
+
+ try {
+ while (true) {
+ final Optional<Pair<String, Boolean>> filePair =
tryGetNextPendingFile();
+ if (!filePair.isPresent()) {
+ return;
+ }
- try {
- final TSStatus result = loadTsFile(filePair.get());
- if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- LOGGER.info(
- "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
- filePair.get().getLeft(),
- filePair.get().getRight());
- } else {
- handleLoadFailure(filePair.get(), result);
+ try {
+ final TSStatus result = loadTsFile(filePair.get(), session);
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ LOGGER.info(
+ "Successfully auto load tsfile {} (isGeneratedByPipe = {})",
+ filePair.get().getLeft(),
+ filePair.get().getRight());
+ } else {
+ handleLoadFailure(filePair.get(), result);
+ }
+ } catch (final FileNotFoundException e) {
+ handleFileNotFoundException(filePair.get());
+ } catch (final Exception e) {
+ handleOtherException(filePair.get(), e);
+ } finally {
+ pendingQueue.removeFromLoading(filePair.get().getLeft());
}
- } catch (final FileNotFoundException e) {
- handleFileNotFoundException(filePair.get());
- } catch (final Exception e) {
- handleOtherException(filePair.get(), e);
- } finally {
- pendingQueue.removeFromLoading(filePair.get().getLeft());
}
+ } finally {
+ SESSION_MANAGER.closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
}
}
@@ -195,27 +213,39 @@ public class ActiveLoadTsFileLoader {
}
}
- private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws
FileNotFoundException {
+ private TSStatus loadTsFile(final Pair<String, Boolean> filePair, final
IClientSession session)
+ throws FileNotFoundException {
final LoadTsFileStatement statement = new
LoadTsFileStatement(filePair.getLeft());
+ final List<File> files = statement.getTsFiles();
+ if (!files.isEmpty()) {
+ final File parentFile = files.get(0).getParentFile();
+ statement.setDatabase(parentFile == null ? "null" :
parentFile.getName());
+ }
statement.setDeleteAfterLoad(true);
statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(isVerify);
statement.setAutoCreateDatabase(false);
- return executeStatement(filePair.getRight() ? new
PipeEnrichedStatement(statement) : statement);
+ return executeStatement(
+ filePair.getRight() ? new PipeEnrichedStatement(statement) :
statement, session);
}
- private TSStatus executeStatement(final Statement statement) {
- return Coordinator.getInstance()
- .executeForTreeModel(
- statement,
- SessionManager.getInstance().requestQueryId(),
- new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
- "",
- ClusterPartitionFetcher.getInstance(),
- ClusterSchemaFetcher.getInstance(),
- IOTDB_CONFIG.getQueryTimeoutThreshold(),
- false)
- .status;
+ private TSStatus executeStatement(final Statement statement, final
IClientSession session) {
+ SESSION_MANAGER.registerSession(session);
+ try {
+ return Coordinator.getInstance()
+ .executeForTreeModel(
+ statement,
+ SESSION_MANAGER.requestQueryId(),
+ SESSION_MANAGER.getSessionInfo(session),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+ IOTDB_CONFIG.getQueryTimeoutThreshold(),
+ false)
+ .status;
+ } finally {
+ SESSION_MANAGER.removeCurrSession();
+ }
}
private void handleLoadFailure(final Pair<String, Boolean> filePair, final
TSStatus status) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 0a21c82be27..2c594b868c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -53,6 +53,9 @@ public class LoadTsFileConfigurator {
case VERIFY_KEY:
validateVerifyParam(value);
break;
+ case ASYNC_LOAD_KEY:
+ validateAsyncLoadParam(value);
+ break;
default:
throw new SemanticException("Invalid parameter '" + key + "' for LOAD
TSFILE command.");
}
@@ -164,6 +167,23 @@ public class LoadTsFileConfigurator {
loadAttributes.getOrDefault(VERIFY_KEY,
String.valueOf(VERIFY_DEFAULT_VALUE)));
}
+ public static final String ASYNC_LOAD_KEY = "async";
+ private static final boolean ASYNC_LOAD_DEFAULT_VALUE = false;
+
+ public static void validateAsyncLoadParam(final String asyncLoad) {
+ if (!"true".equalsIgnoreCase(asyncLoad) &&
!"false".equalsIgnoreCase(asyncLoad)) {
+ throw new SemanticException(
+ String.format(
+ "Given %s value '%s' is not supported, please input a valid
boolean value.",
+ ASYNC_LOAD_KEY, asyncLoad));
+ }
+ }
+
+ public static boolean parseOrGetDefaultAsyncLoad(final Map<String, String>
loadAttributes) {
+ return Boolean.parseBoolean(
+ loadAttributes.getOrDefault(ASYNC_LOAD_KEY,
String.valueOf(ASYNC_LOAD_DEFAULT_VALUE)));
+ }
+
private LoadTsFileConfigurator() {
throw new IllegalStateException("Utility class");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index fbc0621caaa..f8de487c365 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -405,6 +405,35 @@ public class FileUtils {
}
}
+ public static void copyFileWithMD5Check(final File sourceFile, final File
targetDir)
+ throws IOException {
+ final String sourceFileName = sourceFile.getName();
+ final File targetFile = new File(targetDir, sourceFileName);
+ if (targetFile.exists()) {
+ if (!haveSameMD5(sourceFile, targetFile)) {
+ final String renameFile = copyFileRenameWithMD5(sourceFile, targetDir);
+ LOGGER.info(
+ "Copy file {} to {} because it already exists in the target
directory: {}",
+ sourceFile.getName(),
+ renameFile,
+ targetDir.getAbsolutePath());
+ }
+ } else {
+ if (!(targetDir.exists() || targetDir.mkdirs())) {
+ final String log =
+ String.format("failed to create target directory: %s",
targetDir.getAbsolutePath());
+ LOGGER.warn(log);
+ throw new IOException(log);
+ }
+
+ Files.copy(
+ sourceFile.toPath(),
+ targetFile.toPath(),
+ StandardCopyOption.REPLACE_EXISTING,
+ StandardCopyOption.COPY_ATTRIBUTES);
+ }
+ }
+
private static boolean haveSameMD5(final File file1, final File file2) {
try (final InputStream is1 = Files.newInputStream(file1.toPath());
final InputStream is2 = Files.newInputStream(file2.toPath())) {
@@ -428,4 +457,24 @@ public class FileUtils {
sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
}
}
+
+ private static String copyFileRenameWithMD5(final File sourceFile, final
File targetDir)
+ throws IOException {
+ try (final InputStream is = Files.newInputStream(sourceFile.toPath())) {
+ final String sourceFileBaseName =
FilenameUtils.getBaseName(sourceFile.getName());
+ final String sourceFileExtension =
FilenameUtils.getExtension(sourceFile.getName());
+ final String sourceFileMD5 = DigestUtils.md5Hex(is);
+
+ final String targetFileName =
+ sourceFileBaseName + "-" + sourceFileMD5.substring(0, 16) + "." +
sourceFileExtension;
+ final File targetFile = new File(targetDir, targetFileName);
+
+ Files.copy(
+ sourceFile.toPath(),
+ targetFile.toPath(),
+ StandardCopyOption.REPLACE_EXISTING,
+ StandardCopyOption.COPY_ATTRIBUTES);
+ return targetFileName;
+ }
+ }
}