This is an automated email from the ASF dual-hosted git repository. heiming pushed a commit to branch tiered_storage in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fbe596118ae8df1e2e77b79d913eedbc9e0b6273 Author: HeimingZ <[email protected]> AuthorDate: Mon May 22 19:53:50 2023 +0800 add migration --- .../resources/conf/iotdb-common.properties | 18 ++ .../iotdb/commons/concurrent/ThreadName.java | 2 + .../apache/iotdb/commons/service/ServiceType.java | 4 +- .../org/apache/iotdb/os/fileSystem/OSFile.java | 13 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 55 ++++-- .../iotdb/db/conf/directories/FolderManager.java | 4 +- .../iotdb/db/conf/directories/TierManager.java | 127 ++++++++++++- .../impl/SizeTieredCompactionSelector.java | 2 +- .../utils/CrossSpaceCompactionCandidate.java | 5 +- .../db/engine/migration/LocalMigrationTask.java | 70 ++++++++ .../iotdb/db/engine/migration/MigrationCause.java | 24 +++ .../iotdb/db/engine/migration/MigrationTask.java | 93 ++++++++++ .../db/engine/migration/MigrationTaskManager.java | 196 +++++++++++++++++++++ .../db/engine/migration/RemoteMigrationTask.java | 60 +++++++ .../iotdb/db/engine/storagegroup/DataRegion.java | 4 +- .../engine/storagegroup/TsFileNameGenerator.java | 5 +- .../db/engine/storagegroup/TsFileResource.java | 30 +++- .../java/org/apache/iotdb/db/service/DataNode.java | 2 + .../tsfile/fileSystem/fsFactory/FSFactory.java | 10 +- .../tsfile/fileSystem/fsFactory/HDFSFactory.java | 3 + .../fileSystem/fsFactory/HybridFSFactory.java | 5 + .../fileSystem/fsFactory/LocalFSFactory.java | 5 + .../tsfile/fileSystem/fsFactory/OSFSFactory.java | 3 + .../org/apache/iotdb/tsfile/utils/FSUtils.java | 6 +- 24 files changed, 703 insertions(+), 43 deletions(-) diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index d0e00cb5d38..a3c906e4cf0 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -174,6 +174,14 @@ cluster_name=defaultCluster # Datatype: long # heartbeat_interval_in_ms=1000 +#################### +### Disk management +#################### + +# thread pool size for migrate operation in the DataNode's data directories. +# Datatype: int +# migrate_thread_count=3 + # Disk remaining threshold at which DataNode is set to ReadOnly status # Datatype: double(percentage) # disk_space_warning_threshold=0.05 @@ -1125,3 +1133,13 @@ cluster_name=defaultCluster # Datatype: int # influxdb_rpc_port=8086 + +#################### +### Object storage management +#################### + +# object_storage_name=aws_s3 +# object_storage_bucket=iotdb +# object_storage_endpoiont=yourEndpoint +# object_storage_access_key=yourAccessKey +# object_storage_access_secret=yourAccessSecret \ No newline at end of file diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 5d6fa3b53b4..6a9360b93c6 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -71,6 +71,8 @@ public enum ThreadName { PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"), PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"), PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"), + MIGRATION_SCHEDULER("Migration-Scheduler"), + MIGRATION("Migration-Executor-Pool"), ; private final String name; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java index 94d524defcf..b7a6e3aceaa 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java @@ -75,7 +75,9 @@ public enum ServiceType { IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"), PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE( "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"), - MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService"); + MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService"), + MIGRATION_SERVICE("Migration Manager", "Migration Manager"); + private final String name; private final String jmxName; diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java index 31de138b9a5..d04eaa397ea 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java @@ -260,12 +260,12 @@ public class OSFile extends File { @Override public boolean mkdir() { - throw new UnsupportedOperationException(UNSUPPORT_OPERATION); + return true; } @Override public boolean mkdirs() { - throw new UnsupportedOperationException(UNSUPPORT_OPERATION); + return true; } @Override @@ -326,17 +326,20 @@ public class OSFile extends File { @Override public long getTotalSpace() { - throw new UnsupportedOperationException(UNSUPPORT_OPERATION); + // object storage has infinity space + return Long.MAX_VALUE; } @Override public long getFreeSpace() { - throw new UnsupportedOperationException(UNSUPPORT_OPERATION); + // object storage has infinity space + return Long.MAX_VALUE; } @Override public long getUsableSpace() { - throw new UnsupportedOperationException(UNSUPPORT_OPERATION); + // object storage has infinity space + return Long.MAX_VALUE; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index fed4db2feb9..1b042e42b25 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.audit.AuditLogOperation; import org.apache.iotdb.db.audit.AuditLogStorage; +import org.apache.iotdb.db.conf.directories.TierManager; import org.apache.iotdb.db.engine.compaction.execute.performer.constant.CrossCompactionPerformer; import org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerSeqCompactionPerformer; import org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerUnseqCompactionPerformer; @@ -1105,6 +1106,12 @@ public class IoTDBConfig { */ private String RateLimiterType = "FixedIntervalRateLimiter"; + private String objectStorageName = "aws_s3"; + private String objectStorageBucket = "iotdb"; + private String objectStorageEndpoiont = "yourEndpoint"; + private String objectStorageAccessKey = "yourAccessKey"; + private String objectStorageAccessSecret = "yourAccessSecret"; + IoTDBConfig() {} public float getUdfMemoryBudgetInMB() { @@ -1211,17 +1218,18 @@ public class IoTDBConfig { private void formulateDataDirs(String[][] tierDataDirs) { for (int i = 0; i < tierDataDirs.length; i++) { for (int j = 0; j < tierDataDirs[i].length; j++) { - switch (FSUtils.getFSType(tierDataDirs[i][j])) { - case HDFS: - tierDataDirs[i][j] = getHdfsDir() + File.separatorChar + tierDataDirs[i][j]; - break; - case OBJECT_STORAGE: - // TODO(zhm) 对象存储路径配置 - break; - case LOCAL: - default: - tierDataDirs[i][j] = addDataHomeDir(tierDataDirs[i][j]); - break; + if (tierDataDirs[i][j].equals("object_storage")) { + tierDataDirs[i][j] = FSUtils.getOSDefaultPath(objectStorageBucket, dataNodeId); + } else { + switch (FSUtils.getFSType(tierDataDirs[i][j])) { + case HDFS: + tierDataDirs[i][j] = getHdfsDir() + File.separatorChar + tierDataDirs[i][j]; + break; + case LOCAL: + default: + tierDataDirs[i][j] = addDataHomeDir(tierDataDirs[i][j]); + break; + } } } } @@ -1230,7 +1238,7 @@ public class IoTDBConfig { void reloadDataDirs(String[][] tierDataDirs) throws LoadConfigurationException { // format data directories formulateDataDirs(tierDataDirs); - // make sure old data directories not removed, TODO(zhm) 层级关系是否可以变化,当前实现仅支持在最后添加层级 + // make sure old data directories not removed for (int i = 0; i < this.tierDataDirs.length; ++i) { HashSet<String> newDirs = new HashSet<>(Arrays.asList(tierDataDirs[i])); for (String oldDir : this.tierDataDirs[i]) { @@ -1243,7 +1251,7 @@ public class IoTDBConfig { } } this.tierDataDirs = tierDataDirs; - // TierManager.getInstance().updateFileFolders(); + TierManager.getInstance().resetFolders(); } // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the same with IOTDB_HOME @@ -1313,6 +1321,7 @@ public class IoTDBConfig { } public void setTierDataDirs(String[][] tierDataDirs) { + formulateDataDirs(tierDataDirs); this.tierDataDirs = tierDataDirs; // TODO(szywilliam): rewrite the logic here when ratis supports complete snapshot semantic setRatisDataRegionSnapshotDir( @@ -3810,4 +3819,24 @@ public class IoTDBConfig { public String getSortTmpDir() { return sortTmpDir; } + + public String getObjectStorageName() { + return objectStorageName; + } + + public String getObjectStorageBucket() { + return objectStorageBucket; + } + + public String getObjectStorageEndpoiont() { + return objectStorageEndpoiont; + } + + public String getObjectStorageAccessKey() { + return objectStorageAccessKey; + } + + public String getObjectStorageAccessSecret() { + return objectStorageAccessSecret; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java index 571a07a9ad4..9fb73749bc0 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java @@ -61,7 +61,7 @@ public class FolderManager { try { this.selectStrategy.setFolders(folders); } catch (DiskSpaceInsufficientException e) { - logger.error("All disks of wal folders are full, change system mode to read-only.", e); + logger.error("All folders are full, change system mode to read-only.", e); CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly); throw e; } @@ -71,7 +71,7 @@ public class FolderManager { try { return folders.get(selectStrategy.nextFolderIndex()); } catch (DiskSpaceInsufficientException e) { - logger.error("All disks of wal folders are full, change system mode to read-only.", e); + logger.error("All folders are full, change system mode to read-only.", e); CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly); throw e; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java index 2daeb089ff4..027c3e35971 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java @@ -33,11 +33,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** The main class of multiple directories. Used to allocate folders to data files. */ @@ -58,6 +65,8 @@ public class TierManager { private final Map<String, Integer> seqDir2TierLevel = new HashMap<>(); /** unSeq file folder's rawFsPath path -> tier level */ private final Map<String, Integer> unSeqDir2TierLevel = new HashMap<>(); + /** total space of each tier, Long.MAX_VALUE when one tier contains remote storage */ + private long[] tierDiskTotalSpace; private TierManager() { try { @@ -78,10 +87,26 @@ public class TierManager { public void resetFolders() { String[][] tierDirs = config.getTierDataDirs(); + for (int i = 0; i < tierDirs.length; ++i) { + for (int j = 0; j < tierDirs[i].length; ++j) { + if (FSUtils.isLocal(tierDirs[i][j])) { + try { + tierDirs[i][j] = new File(tierDirs[i][j]).getCanonicalPath(); + } catch (IOException e) { + logger.error("Fail to get canonical path of data dir {}", tierDirs[i][j], e); + } + } + } + } + for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) { List<String> seqDirs = Arrays.stream(tierDirs[tierLevel]) - .map(v -> v + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME) + .map( + v -> + FSFactoryProducer.getFSFactory() + .getFile(v, IoTDBConstant.SEQUENCE_FLODER_NAME) + .getPath()) .collect(Collectors.toList()); mkDataDirs(seqDirs); try { @@ -95,7 +120,11 @@ public class TierManager { List<String> unSeqDirs = Arrays.stream(tierDirs[tierLevel]) - .map(v -> v + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME) + .map( + v -> + FSFactoryProducer.getFSFactory() + .getFile(v, IoTDBConstant.UNSEQUENCE_FLODER_NAME) + .getPath()) .collect(Collectors.toList()); mkDataDirs(unSeqDirs); try { @@ -107,6 +136,8 @@ public class TierManager { unSeqDir2TierLevel.put(dir, tierLevel); } } + + tierDiskTotalSpace = getTierDiskSpace(DiskSpaceType.TOTAL); } private void mkDataDirs(List<String> folders) { @@ -121,13 +152,11 @@ public class TierManager { } } - public String getNextFolderForSequenceFile(int tierLevel) throws DiskSpaceInsufficientException { - return seqTiers.get(tierLevel).getNextFolder(); - } - - public String getNextFolderForUnSequenceFile(int tierLevel) + public String getNextFolderForTsFile(int tierLevel, boolean sequence) throws DiskSpaceInsufficientException { - return unSeqTiers.get(tierLevel).getNextFolder(); + return sequence + ? seqTiers.get(tierLevel).getNextFolder() + : unSeqTiers.get(tierLevel).getNextFolder(); } public List<String> getAllFilesFolders() { @@ -162,6 +191,88 @@ public class TierManager { return seqTiers.size(); } + public int getFileTierLevel(File file) { + String filePath; + try { + filePath = file.getCanonicalPath(); + } catch (IOException e) { + logger.error("Fail to get canonical path of data dir {}", file, e); + filePath = file.getPath(); + } + + for (Map.Entry<String, Integer> entry : seqDir2TierLevel.entrySet()) { + if (filePath.startsWith(entry.getKey())) { + return entry.getValue(); + } + } + for (Map.Entry<String, Integer> entry : unSeqDir2TierLevel.entrySet()) { + if (filePath.startsWith(entry.getKey())) { + return entry.getValue(); + } + } + throw new RuntimeException(String.format("%s is not a legal TsFile path", file)); + } + + public long[] getTierDiskTotalSpace() { + return Arrays.copyOf(tierDiskTotalSpace, tierDiskTotalSpace.length); + } + + public long[] getTierDiskUsableSpace() { + return getTierDiskSpace(DiskSpaceType.USABLE); + } + + private long[] getTierDiskSpace(DiskSpaceType type) { + String[][] tierDirs = config.getTierDataDirs(); + long[] tierDiskSpace = new long[tierDirs.length]; + for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) { + Set<FileStore> tierFileStores = new HashSet<>(); + for (String dir : tierDirs[tierLevel]) { + if (!FSUtils.isLocal(dir)) { + tierDiskSpace[tierLevel] = Long.MAX_VALUE; + break; + } + // get the FileStore of each local dir + Path path = Paths.get(dir); + FileStore fileStore = null; + try { + fileStore = Files.getFileStore(path); + } catch (IOException e) { + // check parent if path is not exists + path = path.getParent(); + try { + fileStore = Files.getFileStore(path); + } catch (IOException innerException) { + logger.error("Failed to get storage path of {}, because", dir, innerException); + } + } + // update space info + if (fileStore != null && !tierFileStores.contains(fileStore)) { + tierFileStores.add(fileStore); + try { + switch (type) { + case TOTAL: + tierDiskSpace[tierLevel] += fileStore.getTotalSpace(); + break; + case USABLE: + tierDiskSpace[tierLevel] += fileStore.getUsableSpace(); + break; + default: + break; + } + } catch (IOException e) { + logger.error("Failed to statistic the size of {}, because", fileStore, e); + } + } + } + } + return tierDiskSpace; + } + + private enum DiskSpaceType { + TOTAL, + USABLE, + } + public static TierManager getInstance() { return TierManagerHolder.INSTANCE; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java index df88248bf2e..bb0a2c8ee44 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -114,7 +114,7 @@ public class SizeTieredCompactionSelector selectedFileSize = 0L; continue; } - if (currentFile.getStatus() != TsFileResourceStatus.CLOSED) { + if (currentFile.getStatus() != TsFileResourceStatus.CLOSED || currentFile.isMigrating()) { selectedFileList.clear(); selectedFileSize = 0L; continue; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java index bedf2e2fbcc..329c778f120 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java @@ -142,7 +142,9 @@ public class CrossSpaceCompactionCandidate { private List<TsFileResourceCandidate> filterUnseqResource(List<TsFileResource> unseqResources) { List<TsFileResourceCandidate> ret = new ArrayList<>(); for (TsFileResource resource : unseqResources) { - if (resource.getStatus() != TsFileResourceStatus.CLOSED || !resource.getTsFile().exists()) { + if (resource.getStatus() != TsFileResourceStatus.CLOSED + || resource.isMigrating() + || !resource.getTsFile().exists()) { break; } else if (resource.stillLives(ttlLowerBound)) { ret.add(new TsFileResourceCandidate(resource)); @@ -199,6 +201,7 @@ public class CrossSpaceCompactionCandidate { // the status of file may be changed after the task is submitted to queue this.isValidCandidate = tsFileResource.getStatus() == TsFileResourceStatus.CLOSED + && !tsFileResource.isMigrating() && tsFileResource.getTsFile().exists(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java new file mode 100644 index 00000000000..347452e26ed --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.migration; + +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class LocalMigrationTask extends MigrationTask { + private static final Logger logger = LoggerFactory.getLogger(LocalMigrationTask.class); + + LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) { + super(cause, tsFile, targetDir); + } + + @Override + public void migrate() { + // copy TsFile and resource file + tsFile.readLock(); + try { + fsFactory.copyFile(srcTsFile, destTsFile); + fsFactory.copyFile(srcResourceFile, destResourceFile); + } catch (IOException e) { + logger.error("Fail to copy TsFile {}", srcTsFile); + destTsFile.delete(); + destResourceFile.delete(); + return; + } finally { + tsFile.readUnlock(); + } + // close mods file and replace TsFile path + tsFile.writeLock(); + try { + tsFile.resetModFile(); + fsFactory.copyFile(srcModsFile, destModsFile); + tsFile.setFile(destTsFile); + } catch (IOException e) { + logger.error("Fail to copy mods file {}", srcModsFile); + destTsFile.delete(); + destResourceFile.delete(); + destModsFile.delete(); + return; + } finally { + tsFile.writeUnlock(); + } + // clear src files + srcTsFile.delete(); + srcResourceFile.delete(); + srcModsFile.delete(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java new file mode 100644 index 00000000000..dc1d270ba61 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.migration; + +public enum MigrationCause { + TTL, + DISK_SPACE +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java new file mode 100644 index 00000000000..0571d9d47f7 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.migration; + +import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; +import org.apache.iotdb.tsfile.utils.FSUtils; + +import java.io.File; + +public abstract class MigrationTask implements Runnable { + protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + + protected final MigrationCause cause; + protected final TsFileResource tsFile; + protected final String targetDir; + + protected final File srcTsFile; + protected final File destTsFile; + protected final File srcResourceFile; + protected final File destResourceFile; + protected final File srcModsFile; + protected final File destModsFile; + + MigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) { + this.cause = cause; + this.tsFile = tsFile; + this.targetDir = targetDir; + this.srcTsFile = tsFile.getTsFile(); + this.destTsFile = fsFactory.getFile(targetDir, tsFile.getTsFile().getName()); + this.srcResourceFile = + fsFactory.getFile( + srcTsFile.getParentFile(), srcTsFile.getName() + TsFileResource.RESOURCE_SUFFIX); + this.destResourceFile = + fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + TsFileResource.RESOURCE_SUFFIX); + this.srcModsFile = + fsFactory.getFile( + srcTsFile.getParentFile(), srcTsFile.getName() + ModificationFile.FILE_SUFFIX); + this.destModsFile = + fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + ModificationFile.FILE_SUFFIX); + } + + public static MigrationTask newTask( + MigrationCause cause, TsFileResource sourceTsFile, String targetDir) { + if (FSUtils.isLocal(targetDir)) { + return new LocalMigrationTask(cause, sourceTsFile, targetDir); + } else { + return new RemoteMigrationTask(cause, sourceTsFile, targetDir); + } + } + + @Override + public void run() { + if (canMigrate()) { + tsFile.setIsMigrating(true); + if (!canMigrate()) { + tsFile.setIsMigrating(false); + return; + } + } else { + return; + } + + migrate(); + + tsFile.setIsMigrating(false); + } + + protected boolean canMigrate() { + return tsFile.getStatus() == TsFileResourceStatus.CLOSED; + } + + public abstract void migrate(); +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java new file mode 100644 index 00000000000..37957aa8b0a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.migration; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.service.IService; +import org.apache.iotdb.commons.service.ServiceType; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.directories.TierManager; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.db.utils.DateTimeUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class MigrationTaskManager implements IService { + private static final Logger logger = LoggerFactory.getLogger(MigrationTaskManager.class); + private static final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + private static final TierManager tierManager = TierManager.getInstance(); + private static final long CHECK_INTERVAL_IN_SECONDS = 10 * 60; + private static final double TIER_DISK_SPACE_WARN_THRESHOLD = + commonConfig.getDiskSpaceWarningThreshold() + 0.1; + private static final double TIER_DISK_SPACE_SAFE_THRESHOLD = + commonConfig.getDiskSpaceWarningThreshold() + 0.2; + /** single thread to schedule */ + private ScheduledExecutorService scheduler; + /** single thread to sync syncingBuffer to disk */ + private ExecutorService workers; + + @Override + public void start() throws StartupException { + if (iotdbConfig.getTierDataDirs().length == 1) { + return; + } + scheduler = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.MIGRATION_SCHEDULER.getName()); + workers = + IoTDBThreadPoolFactory.newFixedThreadPool( + iotdbConfig.getCompactionThreadCount(), ThreadName.MIGRATION.getName()); + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + scheduler, + () -> new MigrationScheduleTask().run(), + CHECK_INTERVAL_IN_SECONDS, + CHECK_INTERVAL_IN_SECONDS, + TimeUnit.SECONDS); + } + + private class MigrationScheduleTask implements Runnable { + private final long[] tierDiskTotalSpace = tierManager.getTierDiskTotalSpace(); + private final long[] tierDiskUsableSpace = tierManager.getTierDiskUsableSpace(); + private final Set<Integer> needMigrationTiers = new HashSet<>(); + + public MigrationScheduleTask() { + for (int i = 0; i < tierManager.getTiersNum(); i++) { + double usage = tierDiskUsableSpace[i] * 1.0 / tierDiskTotalSpace[i]; + if (usage <= TIER_DISK_SPACE_WARN_THRESHOLD) { + needMigrationTiers.add(i); + } + } + } + + @Override + public void run() { + schedule(); + } + + private void schedule() { + // submit migration tasks + for (DataRegion dataRegion : StorageEngine.getInstance().getAllDataRegions()) { + List<TsFileResource> tsfiles = dataRegion.getSequenceFileList(); + tsfiles.addAll(dataRegion.getUnSequenceFileList()); + tsfiles.sort(this::compareMigrationPriority); + for (TsFileResource tsfile : tsfiles) { + try { + int tierLevel = tsfile.getTierLevel(); + // only migrate closed TsFiles not in the last tier + if (tsfile.getStatus() != TsFileResourceStatus.CLOSED + || tierLevel == iotdbConfig.getTierDataDirs().length - 1) { + continue; + } + // check tier ttl and disk space + long tierTTL = + DateTimeUtils.convertMilliTimeWithPrecision( + commonConfig.getTierTTLInMs()[tierLevel], iotdbConfig.getTimestampPrecision()); + if (tsfile.stillLives(tierTTL)) { + submitMigrationTask( + tierLevel, + MigrationCause.TTL, + tsfile, + tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq())); + } else if (needMigrationTiers.contains(tierLevel)) { + submitMigrationTask( + tierLevel, + MigrationCause.DISK_SPACE, + tsfile, + tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq())); + } + } catch (Exception e) { + logger.error( + "An error occurred when checking migration of TsFileResource {}", tsfile, e); + } + } + } + } + + private void submitMigrationTask( + int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) { + MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir); + workers.submit(task); + tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize(); + if (needMigrationTiers.contains(tierLevel)) { + double usage = tierDiskUsableSpace[tierLevel] * 1.0 / tierDiskTotalSpace[tierLevel]; + if (usage > TIER_DISK_SPACE_SAFE_THRESHOLD) { + needMigrationTiers.remove(tierLevel); + } + } + } + + private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) { + // old time partitions first + int res = Long.compare(f1.getTimePartition(), f2.getTimePartition()); + // sequence files in one partition + if (res == 0) { + if (f1.isSeq() && !f2.isSeq()) { + res = -1; + } else if (!f1.isSeq() && f2.isSeq()) { + res = 1; + } + } + // old version files in one partition + if (res == 0) { + res = Long.compare(f1.getVersion(), f2.getVersion()); + } + return res; + } + } + + @Override + public void stop() { + if (scheduler != null) { + scheduler.shutdownNow(); + } + if (workers != null) { + workers.shutdownNow(); + } + } + + @Override + public ServiceType getID() { + return ServiceType.MIGRATION_SERVICE; + } + + public static MigrationTaskManager getInstance() { + return InstanceHolder.INSTANCE; + } + + private static class InstanceHolder { + private InstanceHolder() {} + + private static final MigrationTaskManager INSTANCE = new MigrationTaskManager(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java new file mode 100644 index 00000000000..18136da0c46 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.migration; + +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class RemoteMigrationTask extends MigrationTask { + private static final Logger logger = LoggerFactory.getLogger(RemoteMigrationTask.class); + + RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) { + super(cause, tsFile, targetDir); + } + + @Override + public void migrate() { + // copy TsFile and resource file + tsFile.readLock(); + try { + fsFactory.copyFile(srcTsFile, destTsFile); + fsFactory.copyFile(srcResourceFile, destResourceFile); + } catch (IOException e) { + logger.error("Fail to copy TsFile {}", srcTsFile); + destTsFile.delete(); + destResourceFile.delete(); + return; + } finally { + tsFile.readUnlock(); + } + // replace TsFile path + tsFile.writeLock(); + try { + tsFile.setFile(destTsFile); + } finally { + tsFile.writeUnlock(); + } + // clear src files + srcTsFile.delete(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index b72150a93d5..850fda3ba78 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -2676,7 +2676,7 @@ public class DataRegion implements IDataRegionForQuery { case LOAD_UNSEQUENCE: targetFile = fsFactory.getFile( - TierManager.getInstance().getNextFolderForUnSequenceFile(0), + TierManager.getInstance().getNextFolderForTsFile(0, false), databaseName + File.separatorChar + dataRegionId @@ -2698,7 +2698,7 @@ public class DataRegion implements IDataRegionForQuery { case LOAD_SEQUENCE: targetFile = fsFactory.getFile( - TierManager.getInstance().getNextFolderForSequenceFile(0), + TierManager.getInstance().getNextFolderForTsFile(0, true), databaseName + File.separatorChar + dataRegionId diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java index c69f9bfe023..b4e936a6cfc 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java @@ -79,10 +79,7 @@ public class TsFileNameGenerator { long timePartitionId) throws DiskSpaceInsufficientException { TierManager tierManager = TierManager.getInstance(); - String baseDir = - sequence - ? tierManager.getNextFolderForSequenceFile(0) - : tierManager.getNextFolderForUnSequenceFile(0); + String baseDir = tierManager.getNextFolderForTsFile(0, sequence); return baseDir + File.separator + logicalStorageGroup diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 5ab10f08b59..9f2bca44cfb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.directories.TierManager; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.storagegroup.DataRegion.SettleTsFileCallBack; @@ -130,6 +131,10 @@ public class TsFileResource { private long ramSize; + private volatile int tierLevel = 0; + + private volatile boolean isMigrating = false; + private volatile long tsFileSize = -1L; private TsFileProcessor processor; @@ -170,6 +175,7 @@ public class TsFileResource { this.minPlanIndex = other.minPlanIndex; this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName()); this.tsFileSize = other.tsFileSize; + this.tierLevel = other.tierLevel; } /** for sealed TsFile, call setClosed to close TsFileResource */ @@ -177,6 +183,7 @@ public class TsFileResource { this.file = file; this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName()); this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex(); + this.tierLevel = TierManager.getInstance().getFileTierLevel(file); } /** Used for compaction to create target files. */ @@ -191,6 +198,7 @@ public class TsFileResource { this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName()); this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex(); this.processor = processor; + this.tierLevel = TierManager.getInstance().getFileTierLevel(file); } /** unsealed TsFile, for query */ @@ -206,6 +214,7 @@ public class TsFileResource { this.pathToChunkMetadataListMap.put(path, chunkMetadataList); this.originTsFileResource = originTsFileResource; this.version = originTsFileResource.version; + this.tierLevel = originTsFileResource.tierLevel; } /** unsealed TsFile, for query */ @@ -221,6 +230,7 @@ public class TsFileResource { generatePathToTimeSeriesMetadataMap(); this.originTsFileResource = originTsFileResource; this.version = originTsFileResource.version; + this.tierLevel = originTsFileResource.tierLevel; } @TestOnly @@ -355,9 +365,10 @@ public class TsFileResource { return compactionModFile; } - public void resetModFile() { + public void resetModFile() throws IOException { if (modFile != null) { synchronized (this) { + modFile.close(); modFile = null; } } @@ -375,6 +386,10 @@ public class TsFileResource { return file.getPath(); } + public int getTierLevel() { + return tierLevel; + } + public long getTsFileSize() { if (isClosed()) { if (tsFileSize == -1) { @@ -419,8 +434,7 @@ public class TsFileResource { public DeviceTimeIndex buildDeviceTimeIndex() throws IOException { readLock(); try (InputStream inputStream = - FSFactoryProducer.getFSFactory() - .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) { + FSFactoryProducer.getFSFactory().getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) { ReadWriteIOUtils.readByte(inputStream); ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream); if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) { @@ -429,7 +443,7 @@ public class TsFileResource { return (DeviceTimeIndex) timeIndexFromResourceFile; } catch (Exception e) { throw new IOException( - "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e); + "Can't read file " + file.getPath() + RESOURCE_SUFFIX + " from disk", e); } finally { readUnlock(); } @@ -618,6 +632,14 @@ public class TsFileResource { return this.status == TsFileResourceStatus.COMPACTION_CANDIDATE; } + public boolean isMigrating() { + return isMigrating; + } + + public void setIsMigrating(boolean isMigrating) { + this.isMigrating = isMigrating; + } + public void setStatus(TsFileResourceStatus status) { switch (status) { case CLOSED: diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 3e0f39ac4ff..1e95ffe09f9 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -66,6 +66,7 @@ import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.flush.FlushManager; +import org.apache.iotdb.db.engine.migration.MigrationTaskManager; import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine; import org.apache.iotdb.db.metadata.template.ClusterTemplateManager; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService; @@ -543,6 +544,7 @@ public class DataNode implements DataNodeMBean { registerManager.register(RegionMigrateService.getInstance()); registerManager.register(CompactionTaskManager.getInstance()); + registerManager.register(MigrationTaskManager.getInstance()); } /** set up RPC and protocols after DataNode is available */ diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java index 8029bcca1d5..386cc210baa 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java @@ -105,13 +105,21 @@ public interface FSFactory { BufferedOutputStream getBufferedOutputStream(String filePath); /** - * move file + * TODO(zhm) move file * * @param srcFile src file * @param destFile dest file */ void moveFile(File srcFile, File destFile); + /** + * TODO(zhm) copy file + * + * @param srcFile src file + * @param destFile dest file + */ + void copyFile(File srcFile, File destFile) throws IOException; + /** * list file by suffix * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java index ddf971b29cd..328c4f9e39e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java @@ -199,6 +199,9 @@ public class HDFSFactory implements FSFactory { } } + @Override + public void copyFile(File srcFile, File destFile) throws IOException {} + @Override public File[] listFilesBySuffix(String fileFolder, String suffix) { try { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java index b2d58149753..325b0ba5e52 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java @@ -103,6 +103,11 @@ public class HybridFSFactory implements FSFactory { // TODO } + @Override + public void copyFile(File srcFile, File destFile) throws IOException { + // TODO + } + @Override public File[] listFilesBySuffix(String fileFolder, String suffix) { FSPath folder = FSUtils.parse(fileFolder); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java index 53c1dcb31a6..b6d46989e88 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java @@ -123,6 +123,11 @@ public class LocalFSFactory implements FSFactory { } } + @Override + public void copyFile(File srcFile, File destFile) throws IOException { + FileUtils.copyFile(srcFile, destFile); + } + @Override public File[] listFilesBySuffix(String fileFolder, String suffix) { return new File(fileFolder).listFiles(file -> file.getName().endsWith(suffix)); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java index eb2b4c837ec..1f58fc6ffd6 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java @@ -198,6 +198,9 @@ public class OSFSFactory implements FSFactory { } } + @Override + public void copyFile(File srcFile, File destFile) throws IOException {} + @Override public File[] listFilesBySuffix(String fileFolder, String suffix) { try { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java index 0c9f80a591e..6ab36b56634 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java @@ -95,7 +95,11 @@ public class FSUtils { return new FSPath(type, path); } - public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, String dataNodeId) + public static String getOSDefaultPath(String bucket, int dataNodeId) { + return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + "/" + dataNodeId).getPath(); + } + + public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId) throws IOException { String canonicalPath = lcoalFile.getCanonicalPath(); int startIdx = canonicalPath.lastIndexOf("unsequence");
