This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch tiered_storage in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 92e7cfcf277e870d11dd3dbfb9aa4ce986296a98 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed May 24 10:59:18 2023 +0800 refine code --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/db/conf/directories/TierManager.java | 1 + .../db/engine/migration/LocalMigrationTask.java | 22 +++++++------- .../iotdb/db/engine/migration/MigrationTask.java | 34 +++++++++++++--------- .../db/engine/migration/MigrationTaskManager.java | 14 ++++++--- .../db/engine/migration/RemoteMigrationTask.java | 17 ++++++----- .../db/engine/storagegroup/TsFileProcessor.java | 4 ++- .../db/engine/storagegroup/TsFileResource.java | 6 +++- .../iotdb/tsfile/common/conf/TSFileConfig.java | 3 +- .../org/apache/iotdb/tsfile/utils/FSUtils.java | 20 ++++++------- 10 files changed, 72 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c59bd9c558e..40a8c3387db 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1140,7 +1140,7 @@ public class IoTDBConfig { private int migrateThreadCount = 1; /** Enable hdfs or not */ - private boolean enableObjectStorage = false; + private boolean enableObjectStorage = true; /** Config for object storage */ private ObjectStorageConfig osConfig = ObjectStorageDescriptor.getInstance().getConfig(); diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java index 7c3335910b8..734d7a5decf 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java @@ -206,6 +206,7 @@ public class TierManager { } public int getFileTierLevel(File file) { + // If the file does not exist on Local disk, it is assumed be on remote Object Storage if (!file.exists()) { return getTiersNum() - 1; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java index 7746007fb0c..889466733a5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java @@ -28,33 +28,35 @@ import java.io.IOException; public class LocalMigrationTask extends MigrationTask { private static final Logger logger = LoggerFactory.getLogger(LocalMigrationTask.class); - LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) { + protected LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) + throws IOException { super(cause, tsFile, targetDir); } @Override public void migrate() { // copy TsFile and resource file - tsFile.readLock(); + tsFileResource.readLock(); try { - fsFactory.copyFile(srcTsFile, destTsFile); + fsFactory.copyFile(srcFile, destTsFile); fsFactory.copyFile(srcResourceFile, destResourceFile); } catch (IOException e) { - logger.error("Fail to copy TsFile {}", srcTsFile); + logger.error("Fail to copy TsFile {}", srcFile); destTsFile.delete(); destResourceFile.delete(); return; } finally { - tsFile.readUnlock(); + tsFileResource.readUnlock(); } // close mods file and replace TsFile path - tsFile.writeLock(); + tsFileResource.writeLock(); try { - tsFile.resetModFile(); + tsFileResource.resetModFile(); + // migrate MOD file only when it exists if (srcModsFile.exists()) { fsFactory.copyFile(srcModsFile, destModsFile); } - tsFile.setFile(destTsFile); + tsFileResource.setFile(destTsFile); } catch (IOException e) { logger.error("Fail to copy mods file {}", srcModsFile); destTsFile.delete(); @@ -62,10 +64,10 @@ public class LocalMigrationTask extends MigrationTask { destModsFile.delete(); return; } finally { - tsFile.writeUnlock(); + tsFileResource.writeUnlock(); } // clear src files - srcTsFile.delete(); + srcFile.delete(); srcResourceFile.delete(); srcModsFile.delete(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java index 8bdfc751256..a894334a84c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java @@ -26,41 +26,47 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; import org.apache.iotdb.tsfile.utils.FSUtils; import java.io.File; +import java.io.IOException; public abstract class MigrationTask implements Runnable { protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); protected final MigrationCause cause; - protected final TsFileResource tsFile; + protected final TsFileResource tsFileResource; protected final String targetDir; - protected final File srcTsFile; + protected final File srcFile; protected final File destTsFile; protected final File srcResourceFile; protected final File destResourceFile; protected final File srcModsFile; protected final File destModsFile; - MigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) { + protected MigrationTask(MigrationCause cause, TsFileResource tsFileResource, String targetDir) + throws IOException { this.cause = cause; - this.tsFile = tsFile; + this.tsFileResource = tsFileResource; this.targetDir = targetDir; - this.srcTsFile = tsFile.getTsFile(); - this.destTsFile = fsFactory.getFile(targetDir, tsFile.getTsFile().getName()); + this.srcFile = tsFileResource.getTsFile(); + this.destTsFile = fsFactory.getFile(targetDir, getDestTsFilePath(srcFile)); this.srcResourceFile = fsFactory.getFile( - srcTsFile.getParentFile(), srcTsFile.getName() + TsFileResource.RESOURCE_SUFFIX); + srcFile.getParentFile(), srcFile.getName() + TsFileResource.RESOURCE_SUFFIX); this.destResourceFile = - fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + TsFileResource.RESOURCE_SUFFIX); + fsFactory.getFile(targetDir, getDestTsFilePath(srcFile) + TsFileResource.RESOURCE_SUFFIX); this.srcModsFile = fsFactory.getFile( - srcTsFile.getParentFile(), srcTsFile.getName() + ModificationFile.FILE_SUFFIX); + srcFile.getParentFile(), srcFile.getName() + ModificationFile.FILE_SUFFIX); this.destModsFile = - fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + ModificationFile.FILE_SUFFIX); + fsFactory.getFile(targetDir, getDestTsFilePath(srcFile) + ModificationFile.FILE_SUFFIX); + } + + private String getDestTsFilePath(File src) throws IOException { + return FSUtils.getLocalTsFileShortPath(src, FSUtils.PATH_FROM_DATABASE_LEVEL); } public static MigrationTask newTask( - MigrationCause cause, TsFileResource sourceTsFile, String targetDir) { + MigrationCause cause, TsFileResource sourceTsFile, String targetDir) throws IOException { if (FSUtils.isLocal(targetDir)) { return new LocalMigrationTask(cause, sourceTsFile, targetDir); } else { @@ -71,12 +77,12 @@ public abstract class MigrationTask implements Runnable { @Override public void run() { migrate(); - tsFile.increaseTierLevel(); - tsFile.setIsMigrating(false); + tsFileResource.increaseTierLevel(); + tsFileResource.setIsMigrating(false); } protected boolean canMigrate() { - return tsFile.getStatus() == TsFileResourceStatus.NORMAL; + return tsFileResource.getStatus() == TsFileResourceStatus.NORMAL; } public abstract void migrate(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java index 0ebaa49d7f2..ede3ecce410 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.utils.DateTimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -134,14 +135,15 @@ public class MigrationTaskManager implements IService { } } catch (Exception e) { logger.error( - "An error occurred when checking migration of TsFileResource {}", tsfile, e); + "An error occurred when check and try to migrate TsFileResource {}", tsfile, e); } } } } private void submitMigrationTask( - int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) { + int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) + throws IOException { if (!checkAndMarkMigrate(sourceTsFile)) { return; } @@ -159,7 +161,7 @@ public class MigrationTaskManager implements IService { private boolean checkAndMarkMigrate(TsFileResource tsFile) { if (canMigrate(tsFile)) { tsFile.setIsMigrating(true); - if (!canMigrate(tsFile)) { + if (occupiedByCompaction(tsFile)) { tsFile.setIsMigrating(false); return false; } @@ -169,7 +171,11 @@ public class MigrationTaskManager implements IService { } private boolean canMigrate(TsFileResource tsFile) { - return tsFile.getStatus() == TsFileResourceStatus.NORMAL; + return tsFile.getStatus() == TsFileResourceStatus.NORMAL && !tsFile.isMigrating(); + } + + private boolean occupiedByCompaction(TsFileResource tsFile) { + return tsFile.getStatus() != TsFileResourceStatus.NORMAL; } private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java index 65aeb4d9181..428778ecea8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java @@ -28,31 +28,32 @@ import java.io.IOException; public class RemoteMigrationTask extends MigrationTask { private static final Logger logger = LoggerFactory.getLogger(RemoteMigrationTask.class); - RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) { + protected RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) + throws IOException { super(cause, tsFile, targetDir); } @Override public void migrate() { // copy TsFile and resource file - tsFile.readLock(); + tsFileResource.readLock(); try { - fsFactory.copyFile(srcTsFile, destTsFile); + fsFactory.copyFile(srcFile, destTsFile); fsFactory.copyFile(srcResourceFile, destResourceFile); } catch (IOException e) { - logger.error("Fail to copy TsFile {}", srcTsFile); + logger.error("Fail to copy TsFile {}", srcFile); destTsFile.delete(); destResourceFile.delete(); return; } finally { - tsFile.readUnlock(); + tsFileResource.readUnlock(); } // clear src files - tsFile.writeLock(); + tsFileResource.writeLock(); try { - srcTsFile.delete(); + srcFile.delete(); } finally { - tsFile.writeUnlock(); + tsFileResource.writeUnlock(); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index f4f33c10473..513a0a25458 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -185,11 +185,13 @@ public class TsFileProcessor { boolean sequence) throws IOException { this.storageGroupName = storageGroupName; + // this.sequence should be assigned at first because `this` will be passed as parameter to other + // val later + this.sequence = sequence; this.tsFileResource = new TsFileResource(tsfile, this); this.dataRegionInfo = dataRegionInfo; this.writer = new RestorableTsFileIOWriter(tsfile); this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback; - this.sequence = sequence; this.walNode = WALManager.getInstance() .applyForWALNode(WALManager.getApplicantUniqueId(storageGroupName, sequence)); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index b7536902ccc..fe1b9c102d8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -185,6 +185,8 @@ public class TsFileResource { this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName()); this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex(); this.isSeq = FilePathUtils.isSequence(this.file.getAbsolutePath()); + // This method is invoked when DataNode recovers, so the tierLevel should be calculated when + // restarting this.tierLevel = TierManager.getInstance().getFileTierLevel(file); } @@ -201,7 +203,9 @@ public class TsFileResource { this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex(); this.processor = processor; this.isSeq = processor.isSequence(); - this.tierLevel = TierManager.getInstance().getFileTierLevel(file); + // this method is invoked when a new TsFile is created and a newly created TsFile's the + // tierLevel is 0 by default + this.tierLevel = 0; } /** unsealed TsFile, for query */ diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java index 8712ba0aba5..9d7813784a9 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java @@ -120,7 +120,8 @@ public class TSFileConfig implements Serializable { /** Default endian value is BIG_ENDIAN. */ private String endian = "BIG_ENDIAN"; /** Default storage is in local file system */ - private FSType[] TSFileStorageFs = new FSType[] {FSType.LOCAL}; + // TODO: (haiming) fix the bug that the config is not loaded + private FSType[] TSFileStorageFs = new FSType[] {FSType.LOCAL, FSType.OBJECT_STORAGE}; /** Default core-site.xml file path is /etc/hadoop/conf/core-site.xml */ private String coreSitePath = "/etc/hadoop/conf/core-site.xml"; /** Default hdfs-site.xml file path is /etc/hadoop/conf/hdfs-site.xml */ diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java index 5291985b678..c5f80343d70 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java @@ -32,6 +32,8 @@ import java.util.Arrays; public class FSUtils { private static final Logger logger = LoggerFactory.getLogger(FSUtils.class); private static final FSType[] fsTypes = {FSType.OBJECT_STORAGE, FSType.HDFS}; + public static final int PATH_FROM_SEQUENCE_LEVEL = 5; + public static final int PATH_FROM_DATABASE_LEVEL = 4; public static final String[] fsPrefix = {"os://", "hdfs://"}; public static final String OS_FILE_SEPARATOR = "/"; private static final String[] fsFileClassName = { @@ -119,7 +121,6 @@ public class FSUtils { public static FSPath parseLocalTsFile2OSFile(File localFile, String bucket, int dataNodeId) throws IOException { - String[] filePathSplits = FilePathUtils.splitTsFilePath(localFile.getCanonicalPath()); return new FSPath( FSType.OBJECT_STORAGE, fsPrefix[0] @@ -127,18 +128,15 @@ public class FSUtils { + OS_FILE_SEPARATOR + dataNodeId + OS_FILE_SEPARATOR - + String.join( - OS_FILE_SEPARATOR, - Arrays.copyOfRange( - filePathSplits, filePathSplits.length - 5, filePathSplits.length))); + + getLocalTsFileShortPath(localFile, PATH_FROM_SEQUENCE_LEVEL)); } - // public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId) - // throws IOException { - // String fileName = lcoalFile.getName(); - // return new FSPath(FSType.OBJECT_STORAGE, - // "os://bucket/Users/jinruizhangjinrui/Documents/work/iotdb/data/datanode/s3/" + fileName); - // } + public static String getLocalTsFileShortPath(File localTsFile, int level) throws IOException { + String[] filePathSplits = FilePathUtils.splitTsFilePath(localTsFile.getCanonicalPath()); + return String.join( + OS_FILE_SEPARATOR, + Arrays.copyOfRange(filePathSplits, filePathSplits.length - level, filePathSplits.length)); + } public static boolean isLocal(String fsPath) { return getFSType(fsPath) == FSType.LOCAL;
