This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch tiered_storage in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d263283d704bf5340f92d6ba3ec6bbc6c97fa60b Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue May 23 21:11:44 2023 +0800 test migration --- .../apache/iotdb/commons/conf/CommonConfig.java | 2 +- .../org/apache/iotdb/os/cache/OSFileCache.java | 8 ++- .../org/apache/iotdb/os/cache/OSFileCacheKey.java | 4 ++ .../apache/iotdb/os/conf/ObjectStorageConfig.java | 8 +-- .../iotdb/os/conf/provider/OSProviderConfig.java | 2 +- .../apache/iotdb/os/conf/provider/TestConfig.java | 2 +- .../apache/iotdb/os/fileSystem/OSTsFileInput.java | 4 ++ .../os/io/test/TestObjectStorageConnector.java | 68 +++++++++++++++++++--- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 7 ++- .../iotdb/db/conf/directories/TierManager.java | 5 ++ .../iotdb/db/engine/migration/MigrationTask.java | 11 ---- .../db/engine/migration/MigrationTaskManager.java | 39 ++++++++++--- .../java/org/apache/iotdb/db/service/DataNode.java | 1 + .../fileInputFactory/HybridFileInputFactory.java | 5 +- .../tsfile/fileSystem/fsFactory/OSFSFactory.java | 8 +-- .../org/apache/iotdb/tsfile/utils/FSUtils.java | 42 ++++++------- 16 files changed, 149 insertions(+), 67 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index b48e18c400f..79072d8681a 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -92,7 +92,7 @@ public class CommonConfig { * Notice: if this property is changed, previous created database which are not set TTL will also * be affected. Unit: millisecond */ - private long[] tierTTLInMs = {Long.MAX_VALUE}; + private long[] tierTTLInMs = {2 * 24 * 60 * 60 * 1000L, Long.MAX_VALUE}; /** Thrift socket and connection timeout between data node and config node. */ private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20); diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java index a11c140d848..b302d6983ad 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java @@ -63,7 +63,13 @@ public class OSFileCache { } public OSFileCacheValue get(OSFileCacheKey key) { - return remotePos2LocalCacheFile.get(key); + OSFileCacheValue value = remotePos2LocalCacheFile.get(key); + // TODO try to simplify the logic here + if (!value.getCacheFile().exists()) { + logger.info("want {} but file deleted", key); + remotePos2LocalCacheFile.invalidate(key); + } + return value; } /** This method is used by the recover procedure */ diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java index c71724a34c4..4bd6f47f085 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java @@ -79,4 +79,8 @@ public class OSFileCacheKey implements Serializable { OSFileCacheKey that = (OSFileCacheKey) obj; return file.equals(that.file) && startPosition == that.startPosition; } + + public String toString() { + return file.getName() + "," + startPosition; + } } diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java index 9126fb21cf6..b66b4871768 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java @@ -26,17 +26,17 @@ import org.apache.iotdb.os.utils.ObjectStorageType; import java.io.File; public class ObjectStorageConfig { - private ObjectStorageType osType = ObjectStorageType.AWS_S3; + private ObjectStorageType osType = ObjectStorageType.TEST; - private OSProviderConfig providerConfig = new AWSS3Config(); + private OSProviderConfig providerConfig = new TestConfig(); private String[] cacheDirs = { "data" + File.separator + "datanode" + File.separator + "data" + File.separator + "cache" }; - private long cacheMaxDiskUsage = 50 * 1024 * 1024 * 1024L; + private long cacheMaxDiskUsage = 50 * 1024 * 1024L; - private int cachePageSize = 20 * 1024 * 1024; + private int cachePageSize = 10 * 1024 * 1024; ObjectStorageConfig() {} diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java index 9c42c673a99..ca0fff2ec18 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java @@ -20,7 +20,7 @@ package org.apache.iotdb.os.conf.provider; public abstract class OSProviderConfig { protected String endpoint; - protected String bucketName; + protected String bucketName = "iotdb_data"; protected String accessKeyId; protected String accessKeySecret; diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java index ac32757b0c6..558d01fbe8d 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java @@ -21,7 +21,7 @@ package org.apache.iotdb.os.conf.provider; import java.io.File; public class TestConfig extends OSProviderConfig { - private String testDir = "target" + File.separator + "test"; + private String testDir = "data" + File.separator + "test_s3"; public String getTestDir() { return testDir; diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java index 41d893f9291..3e83e56f726 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java @@ -29,6 +29,10 @@ public class OSTsFileInput implements TsFileInput { private OSFile file; private OSFileChannel channel; + public OSTsFileInput(String fileURI) throws IOException { + this(new OSFile(fileURI)); + } + public OSTsFileInput(OSFile file) throws IOException { this.file = file; this.channel = new OSFileChannel(file); diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java b/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java index 70ee1a0d817..cfeb74e6908 100644 --- a/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java +++ b/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java @@ -24,9 +24,18 @@ import org.apache.iotdb.os.exception.ObjectStorageException; import org.apache.iotdb.os.fileSystem.OSURI; import org.apache.iotdb.os.io.IMetaData; import org.apache.iotdb.os.io.ObjectStorageConnector; +import org.apache.iotdb.os.io.aws.S3MetaData; +import org.apache.iotdb.os.utils.ObjectStorageConstant; + +import org.apache.commons.io.FileUtils; import java.io.File; +import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; public class TestObjectStorageConnector implements ObjectStorageConnector { private final TestConfig testConfig = @@ -34,45 +43,88 @@ public class TestObjectStorageConnector implements ObjectStorageConnector { @Override public boolean doesObjectExist(OSURI osUri) throws ObjectStorageException { - return false; + File file = new File(getDstFilePath(osUri)); + return file.exists(); } @Override public IMetaData getMetaData(OSURI osUri) throws ObjectStorageException { - return null; + File file = new File(getDstFilePath(osUri)); + return new S3MetaData(file.length(), System.currentTimeMillis()); } @Override public boolean createNewEmptyObject(OSURI osUri) throws ObjectStorageException { + File file = new File(getDstFilePath(osUri)); + if (!file.exists()) { + try { + return file.createNewFile(); + } catch (IOException e) { + throw new ObjectStorageException(e); + } + } return false; } @Override public boolean delete(OSURI osUri) throws ObjectStorageException { - return false; + File file = new File(getDstFilePath(osUri)); + return file.delete(); } @Override public boolean renameTo(OSURI fromOSUri, OSURI toOSUri) throws ObjectStorageException { - return false; + File file = new File(getDstFilePath(fromOSUri)); + return file.renameTo(new File(getDstFilePath(toOSUri))); } @Override public InputStream getInputStream(OSURI osUri) throws ObjectStorageException { - return null; + File file = new File(getDstFilePath(osUri)); + try { + return Channels.newInputStream(FileChannel.open(file.toPath(), StandardOpenOption.READ)); + } catch (IOException e) { + throw new ObjectStorageException(e); + } } @Override public OSURI[] list(OSURI osUri) throws ObjectStorageException { - return new OSURI[0]; + return null; } @Override - public void putLocalFile(OSURI osUri, File lcoalFile) throws ObjectStorageException {} + public void putLocalFile(OSURI osUri, File localFile) throws ObjectStorageException { + try { + File targetFile = new File(getDstFilePath(osUri)); + if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) { + throw new ObjectStorageException( + String.format( + "[TieredMigration] cannot mkdir for path %s", + targetFile.getParentFile().getAbsolutePath())); + } + FileUtils.copyFile(localFile, targetFile); + } catch (IOException e) { + throw new ObjectStorageException(e); + } + } @Override public byte[] getRemoteFile(OSURI osUri, long position, int len) throws ObjectStorageException { - return new byte[0]; + File file = new File(getDstFilePath(osUri)); + ByteBuffer dst = ByteBuffer.allocate(len); + try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { + channel.read(dst, position); + } catch (IOException e) { + throw new ObjectStorageException(e); + } + return dst.array(); + } + + private String getDstFilePath(OSURI osuri) { + return testConfig.getTestDir() + + File.separator + + osuri.getKey().replace(ObjectStorageConstant.FILE_SEPARATOR, File.separator); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index e48757b398d..c59bd9c558e 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -300,7 +300,8 @@ public class IoTDBConfig { /** Tiered data directories. It can be settled as dataDirs = {{"data1"}, {"data2", "data3"}}; */ private String[][] tierDataDirs = { - {IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME} + {IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME}, + {IoTDBConstant.OBJECT_STORAGE_DIR} }; private String loadTsFileDir = @@ -528,7 +529,7 @@ public class IoTDBConfig { private int candidateCompactionTaskQueueSize = 50; /** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */ - private boolean metaDataCacheEnable = true; + private boolean metaDataCacheEnable = false; /** Memory allocated for bloomFilter cache in read process */ private long allocateMemoryForBloomFilterCache = allocateMemoryForRead / 1001; @@ -1136,7 +1137,7 @@ public class IoTDBConfig { private String RateLimiterType = "FixedIntervalRateLimiter"; /** Threads for migration tasks */ - private int migrateThreadCount = 3; + private int migrateThreadCount = 1; /** Enable hdfs or not */ private boolean enableObjectStorage = false; diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java index e0483be0558..32b8e8aea94 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java @@ -92,6 +92,11 @@ public class TierManager { return; } + seqTiers.clear(); + unSeqTiers.clear(); + seqDir2TierLevel.clear(); + unSeqDir2TierLevel.clear(); + String[][] tierDirs = config.getTierDataDirs(); for (int i = 0; i < tierDirs.length; ++i) { for (int j = 0; j < tierDirs[i].length; ++j) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java index 998e903925c..36766750832 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java @@ -70,18 +70,7 @@ public abstract class MigrationTask implements Runnable { @Override public void run() { - if (canMigrate()) { - tsFile.setIsMigrating(true); - if (!canMigrate()) { - tsFile.setIsMigrating(false); - return; - } - } else { - return; - } - migrate(); - tsFile.setIsMigrating(false); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java index 4bd9712c263..0ebaa49d7f2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java @@ -50,7 +50,7 @@ public class MigrationTaskManager implements IService { private static final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private static final TierManager tierManager = TierManager.getInstance(); - private static final long CHECK_INTERVAL_IN_SECONDS = 10 * 60; + private static final long CHECK_INTERVAL_IN_SECONDS = 10; private static final double TIER_DISK_SPACE_WARN_THRESHOLD = commonConfig.getDiskSpaceWarningThreshold() + 0.1; private static final double TIER_DISK_SPACE_SAFE_THRESHOLD = @@ -107,28 +107,30 @@ public class MigrationTaskManager implements IService { tsfiles.sort(this::compareMigrationPriority); for (TsFileResource tsfile : tsfiles) { try { - int tierLevel = tsfile.getTierLevel(); + int currentTier = tsfile.getTierLevel(); + int nextTier = currentTier + 1; // only migrate closed TsFiles not in the last tier if (tsfile.getStatus() != TsFileResourceStatus.NORMAL - || tierLevel == iotdbConfig.getTierDataDirs().length - 1) { + || nextTier == iotdbConfig.getTierDataDirs().length) { continue; } // check tier ttl and disk space long tierTTL = DateTimeUtils.convertMilliTimeWithPrecision( - commonConfig.getTierTTLInMs()[tierLevel], iotdbConfig.getTimestampPrecision()); + commonConfig.getTierTTLInMs()[currentTier], + iotdbConfig.getTimestampPrecision()); if (tsfile.stillLives(tierTTL)) { submitMigrationTask( - tierLevel, + currentTier, MigrationCause.TTL, tsfile, - tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq())); - } else if (needMigrationTiers.contains(tierLevel)) { + tierManager.getNextFolderForTsFile(nextTier, tsfile.isSeq())); + } else if (needMigrationTiers.contains(currentTier)) { submitMigrationTask( - tierLevel, + currentTier, MigrationCause.DISK_SPACE, tsfile, - tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq())); + tierManager.getNextFolderForTsFile(nextTier, tsfile.isSeq())); } } catch (Exception e) { logger.error( @@ -140,6 +142,9 @@ public class MigrationTaskManager implements IService { private void submitMigrationTask( int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) { + if (!checkAndMarkMigrate(sourceTsFile)) { + return; + } MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir); workers.submit(task); tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize(); @@ -151,6 +156,22 @@ public class MigrationTaskManager implements IService { } } + private boolean checkAndMarkMigrate(TsFileResource tsFile) { + if (canMigrate(tsFile)) { + tsFile.setIsMigrating(true); + if (!canMigrate(tsFile)) { + tsFile.setIsMigrating(false); + return false; + } + return true; + } + return false; + } + + private boolean canMigrate(TsFileResource tsFile) { + return tsFile.getStatus() == TsFileResourceStatus.NORMAL; + } + private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) { // old time partitions first int res = Long.compare(f1.getTimePartition(), f2.getTimePartition()); diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 829494c6375..4010f2ed6b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -459,6 +459,7 @@ public class DataNode implements DataNodeMBean { /* Store runtime configurations when restart request is accepted */ storeRuntimeConfigurations( dataNodeRestartResp.getConfigNodeList(), dataNodeRestartResp.getRuntimeConfiguration()); + configOSStorage(config.getDataNodeId()); logger.info("Restart request to cluster: {} is accepted.", config.getClusterName()); } else { /* Throw exception when restart is rejected */ diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java index 3a7cc7d099e..f9140af71e9 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java @@ -42,8 +42,7 @@ public class HybridFileInputFactory implements FileInputFactory { @Override public TsFileInput getTsFileInput(String filePath) throws IOException { - return inputFactories.get(FSType.OBJECT_STORAGE).getTsFileInput(filePath); -// FSPath path = FSUtils.parse(filePath); -// return inputFactories.get(path.getFsType()).getTsFileInput(path.getPath()); + FSPath path = FSUtils.parse(filePath); + return inputFactories.get(path.getFsType()).getTsFileInput(path.getPath()); } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java index 59b175d4926..130ee579d7b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java @@ -67,8 +67,8 @@ public class OSFSFactory implements FSFactory { listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class); listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class); renameTo = clazz.getMethod("renameTo", File.class); - renameTo = clazz.getMethod("putFile", File.class); - renameTo = clazz.getMethod("copyTo", File.class); + putFile = clazz.getMethod("putFile", File.class); + copyTo = clazz.getMethod("copyTo", File.class); } catch (ClassNotFoundException | NoSuchMethodException e) { logger.error( "Failed to get object storage. Please check your dependency of object storage module.", @@ -211,13 +211,13 @@ public class OSFSFactory implements FSFactory { FSType srcType = FSUtils.getFSType(srcFile); try { if (srcType == FSType.LOCAL) { - putFile.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile); + putFile.invoke(destFile, srcFile); } else if (srcType == FSType.OBJECT_STORAGE) { copyTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile); } else { throw new IOException( String.format( - "Doesn't support move file from %s to %s.", srcType, FSType.OBJECT_STORAGE)); + "Doesn't support copy file from %s to %s.", srcType, FSType.OBJECT_STORAGE)); } } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) { logger.error( diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java index 2a8169655cd..5b12fa6e69f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java @@ -102,7 +102,6 @@ public class FSUtils { for (int i = 0; i < fsTypes.length; ++i) { if (fsPath.startsWith(fsPrefix[i])) { type = fsTypes[i]; - path = fsPath.substring(fsPrefix[i].length()); break; } } @@ -110,32 +109,33 @@ public class FSUtils { } public static String getOSDefaultPath(String bucket, int dataNodeId) { - return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + OS_FILE_SEPARATOR + dataNodeId) + return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + bucket + OS_FILE_SEPARATOR + dataNodeId) .getPath(); } -// public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId) -// throws IOException { -// String[] filePathSplits = FilePathUtils.splitTsFilePath(lcoalFile.getCanonicalPath()); -// return new FSPath( -// FSType.OBJECT_STORAGE, -// fsPrefix[0] -// + bucket -// + OS_FILE_SEPARATOR -// + dataNodeId -// + OS_FILE_SEPARATOR -// + String.join( -// OS_FILE_SEPARATOR, -// Arrays.copyOfRange( -// filePathSplits, filePathSplits.length - 5, filePathSplits.length))); -// } - - public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId) + public static FSPath parseLocalTsFile2OSFile(File localFile, String bucket, int dataNodeId) throws IOException { - String fileName = lcoalFile.getName(); - return new FSPath(FSType.LOCAL, "/Users/jinruizhangjinrui/Documents/work/iotdb/data/datanode/s3/" + fileName); + String[] filePathSplits = FilePathUtils.splitTsFilePath(localFile.getCanonicalPath()); + return new FSPath( + FSType.OBJECT_STORAGE, + fsPrefix[0] + + bucket + + OS_FILE_SEPARATOR + + dataNodeId + + OS_FILE_SEPARATOR + + String.join( + OS_FILE_SEPARATOR, + Arrays.copyOfRange( + filePathSplits, filePathSplits.length - 5, filePathSplits.length))); } + // public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId) + // throws IOException { + // String fileName = lcoalFile.getName(); + // return new FSPath(FSType.OBJECT_STORAGE, + // "os://bucket/Users/jinruizhangjinrui/Documents/work/iotdb/data/datanode/s3/" + fileName); + // } + public static boolean isLocal(String fsPath) { return getFSType(fsPath) == FSType.LOCAL; }
