This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 0649dc76876 implement copy file、move file
0649dc76876 is described below
commit 0649dc76876150f7e8d52e190feb24158a12b9ec
Author: HeimingZ <[email protected]>
AuthorDate: Tue May 23 12:09:34 2023 +0800
implement copy file、move file
---
.../apache/iotdb/hadoop/fileSystem/HDFSFile.java | 18 ++++++++++
.../resources/conf/iotdb-common.properties | 9 +++++
.../org/apache/iotdb/os/fileSystem/OSFile.java | 16 +++++++++
.../apache/iotdb/os/io/ObjectStorageConnector.java | 2 ++
.../iotdb/os/io/aws/S3ObjectStorageConnector.java | 16 +++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++
.../db/engine/migration/MigrationTaskManager.java | 2 +-
.../tsfile/fileSystem/fsFactory/FSFactory.java | 8 ++---
.../tsfile/fileSystem/fsFactory/HDFSFactory.java | 38 ++++++++++++++++++++--
.../fileSystem/fsFactory/HybridFSFactory.java | 23 +++++++++++--
.../fileSystem/fsFactory/LocalFSFactory.java | 12 ++-----
.../tsfile/fileSystem/fsFactory/OSFSFactory.java | 35 ++++++++++++++++++--
.../org/apache/iotdb/tsfile/utils/FSUtils.java | 24 ++++++++------
13 files changed, 179 insertions(+), 34 deletions(-)
diff --git
a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
index afa0ecc0607..3fb545e9318 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +36,9 @@ import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URL;
@@ -283,6 +286,21 @@ public class HDFSFile extends File {
return files;
}
+ private void copyToLocal(File destFile) throws IOException {
+ fs.copyToLocalFile(hdfsPath, new Path(destFile.getPath()));
+ }
+
+ private void copyFromLocal(File srcFile) throws IOException {
+ fs.copyFromLocalFile(new Path(srcFile.getPath()), hdfsPath);
+ }
+
+ private void copyTo(File destFile) throws IOException {
+ try (InputStream in = fs.open(hdfsPath);
+ OutputStream out = fs.create(((HDFSFile) destFile).hdfsPath, true)) {
+ IOUtils.copyBytes(in, out, 4096);
+ }
+ }
+
@Override
public File getAbsoluteFile() {
return new HDFSFile(getAbsolutePath());
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index a3c906e4cf0..85daae311ea 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1138,8 +1138,17 @@ cluster_name=defaultCluster
### Object storage management
####################
+# Datatype: string
# object_storage_name=aws_s3
+
+# Datatype: string
# object_storage_bucket=iotdb
+
+# Datatype: string
# object_storage_endpoiont=yourEndpoint
+
+# Datatype: string
# object_storage_access_key=yourAccessKey
+
+# Datatype: string
# object_storage_access_secret=yourAccessSecret
\ No newline at end of file
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index d04eaa397ea..a21e2b6251c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -430,6 +430,22 @@ public class OSFile extends File {
}
}
+ public void putFile(File localFile) {
+ try {
+ connector.putLocalFile(osUri, localFile);
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to put local file {} to the object storage {}.",
localFile, osUri, e);
+ }
+ }
+
+ public void copyTo(File destFile) {
+ try {
+ connector.copyRemoteFile(osUri, ((OSFile) destFile).toOSURI());
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to copy file {} to {}.", osUri, destFile, e);
+ }
+ }
+
// test only
public void setConnector(ObjectStorageConnector connector) {
this.connector = connector;
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
index c396eaeb816..64322fe90fe 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
@@ -43,4 +43,6 @@ public interface ObjectStorageConnector {
void putLocalFile(OSURI osUri, File lcoalFile) throws ObjectStorageException;
byte[] getRemoteFile(OSURI osUri, long position, int len) throws
ObjectStorageException;
+
+ void copyRemoteFile(OSURI srcUri, OSURI destUri) throws
ObjectStorageException;
}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
index 2c5d3060860..24416857382 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
@@ -176,6 +176,22 @@ public class S3ObjectStorageConnector implements
ObjectStorageConnector {
}
}
+ @Override
+ public void copyRemoteFile(OSURI srcUri, OSURI destUri) throws
ObjectStorageException {
+ try {
+ CopyObjectRequest req =
+ CopyObjectRequest.builder()
+ .sourceBucket(srcUri.getBucket())
+ .sourceKey(srcUri.getKey())
+ .destinationBucket(destUri.getBucket())
+ .destinationKey(destUri.getKey())
+ .build();
+ s3Client.copyObject(req);
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
+ }
+ }
+
public void close() {
s3Client.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e249ff7b2de..677f7c3d0ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1106,6 +1106,8 @@ public class IoTDBConfig {
*/
private String RateLimiterType = "FixedIntervalRateLimiter";
+ private int migrateThreadCount = 3;
+
private String objectStorageName = "aws_s3";
private String objectStorageBucket = "iotdb";
private String objectStorageEndpoiont = "yourEndpoint";
@@ -3817,6 +3819,14 @@ public class IoTDBConfig {
return sortTmpDir;
}
+ public int getMigrateThreadCount() {
+ return migrateThreadCount;
+ }
+
+ public void setMigrateThreadCount(int migrateThreadCount) {
+ this.migrateThreadCount = migrateThreadCount;
+ }
+
public String getObjectStorageName() {
return objectStorageName;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index ddf3f66159f..8f3b2872a34 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -71,7 +71,7 @@ public class MigrationTaskManager implements IService {
ThreadName.MIGRATION_SCHEDULER.getName());
workers =
IoTDBThreadPoolFactory.newFixedThreadPool(
- iotdbConfig.getCompactionThreadCount(),
ThreadName.MIGRATION.getName());
+ iotdbConfig.getMigrateThreadCount(),
ThreadName.MIGRATION.getName());
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
scheduler,
() -> new MigrationScheduleTask().run(),
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
index 386cc210baa..0e53690144d 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
@@ -105,15 +105,15 @@ public interface FSFactory {
BufferedOutputStream getBufferedOutputStream(String filePath);
/**
- * TODO(zhm) move file
+ * move file
*
* @param srcFile src file
* @param destFile dest file
*/
- void moveFile(File srcFile, File destFile);
+ void moveFile(File srcFile, File destFile) throws IOException;
/**
- * TODO(zhm) copy file
+ * copy file
*
* @param srcFile src file
* @param destFile dest file
@@ -145,6 +145,6 @@ public interface FSFactory {
*/
boolean deleteIfExists(File file) throws IOException;
- /** TODO(zhm) Force delete the directory */
+ /** Force delete the directory */
void deleteDirectory(String dir) throws IOException;
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index 328c4f9e39e..fc13c99e7eb 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.tsfile.fileSystem.fsFactory;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +50,9 @@ public class HDFSFactory implements FSFactory {
private static Method listFilesBySuffix;
private static Method listFilesByPrefix;
private static Method renameTo;
+ private static Method copyToLocal;
+ private static Method copyFromLocal;
+ private static Method copyTo;
static {
try {
@@ -62,6 +68,9 @@ public class HDFSFactory implements FSFactory {
listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class,
String.class);
listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class,
String.class);
renameTo = clazz.getMethod("renameTo", File.class);
+ copyToLocal = clazz.getMethod("copyToLocal", File.class);
+ copyFromLocal = clazz.getMethod("copyFromLocal", File.class);
+ copyTo = clazz.getMethod("copyTo", File.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger.error(
"Failed to get Hadoop file system. Please check your dependency of
Hadoop module.", e);
@@ -188,7 +197,7 @@ public class HDFSFactory implements FSFactory {
}
@Override
- public void moveFile(File srcFile, File destFile) {
+ public void moveFile(File srcFile, File destFile) throws IOException {
try {
renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
} catch (InstantiationException | InvocationTargetException |
IllegalAccessException e) {
@@ -196,11 +205,32 @@ public class HDFSFactory implements FSFactory {
"Failed to rename file from {} to {}. Please check your dependency
of Hadoop module.",
srcFile.getName(),
destFile.getName());
+ throw new IOException(e);
}
}
@Override
- public void copyFile(File srcFile, File destFile) throws IOException {}
+ public void copyFile(File srcFile, File destFile) throws IOException {
+ FSType srcType = FSUtils.getFSType(srcFile);
+ FSType destType = FSUtils.getFSType(destFile);
+ try {
+ if (srcType == FSType.HDFS && destType == FSType.HDFS) {
+
copyTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
+ } else if (srcType == FSType.LOCAL) {
+ copyFromLocal.invoke(
+ constructorWithPathname.newInstance(destFile.getAbsolutePath()),
srcFile);
+ } else {
+ copyToLocal.invoke(
+ constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
+ }
+ } catch (InstantiationException | InvocationTargetException |
IllegalAccessException e) {
+ logger.error(
+ "Failed to copy file from {} to {}. Please check your dependency of
object storage module.",
+ srcFile.getName(),
+ destFile.getName());
+ throw new IOException(e);
+ }
+ }
@Override
public File[] listFilesBySuffix(String fileFolder, String suffix) {
@@ -240,5 +270,7 @@ public class HDFSFactory implements FSFactory {
}
@Override
- public void deleteDirectory(String dir) throws IOException {}
+ public void deleteDirectory(String dir) throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
index 325b0ba5e52..5e9f76914fa 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
@@ -99,13 +99,30 @@ public class HybridFSFactory implements FSFactory {
}
@Override
- public void moveFile(File srcFile, File destFile) {
- // TODO
+ public void moveFile(File srcFile, File destFile) throws IOException {
+ FSType srcType = FSUtils.getFSType(srcFile);
+ FSType destType = FSUtils.getFSType(destFile);
+ if (srcType == destType) {
+ fsFactories.get(destType).moveFile(srcFile, destFile);
+ } else {
+ throw new IOException(
+ String.format("Doesn't support move file from %s to %s.", srcType,
destType));
+ }
}
@Override
public void copyFile(File srcFile, File destFile) throws IOException {
- // TODO
+ FSType srcType = FSUtils.getFSType(srcFile);
+ FSType destType = FSUtils.getFSType(destFile);
+ if (srcType == destType || (srcType == FSType.LOCAL && destType ==
FSType.OBJECT_STORAGE)) {
+ fsFactories.get(destType).copyFile(srcFile, destFile);
+ } else if ((srcType == FSType.LOCAL || srcType == FSType.HDFS)
+ && (destType == FSType.LOCAL || destType == FSType.HDFS)) {
+ fsFactories.get(FSType.HDFS).copyFile(srcFile, destFile);
+ } else {
+ throw new IOException(
+ String.format("Doesn't support move file from %s to %s.", srcType,
destType));
+ }
}
@Override
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
index b6d46989e88..d110aa9b164 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
@@ -111,16 +111,8 @@ public class LocalFSFactory implements FSFactory {
}
@Override
- public void moveFile(File srcFile, File destFile) {
- try {
- FileUtils.moveFile(srcFile, destFile);
- } catch (IOException e) {
- logger.error(
- "Failed to move file from {} to {}. ",
- srcFile.getAbsolutePath(),
- destFile.getAbsolutePath(),
- e);
- }
+ public void moveFile(File srcFile, File destFile) throws IOException {
+ FileUtils.moveFile(srcFile, destFile);
}
@Override
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
index 1f58fc6ffd6..59b175d4926 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.tsfile.fileSystem.fsFactory;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +50,8 @@ public class OSFSFactory implements FSFactory {
private static Method listFilesBySuffix;
private static Method listFilesByPrefix;
private static Method renameTo;
+ private static Method putFile;
+ private static Method copyTo;
static {
try {
@@ -62,6 +67,8 @@ public class OSFSFactory implements FSFactory {
listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class,
String.class);
listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class,
String.class);
renameTo = clazz.getMethod("renameTo", File.class);
+ renameTo = clazz.getMethod("putFile", File.class);
+ renameTo = clazz.getMethod("copyTo", File.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger.error(
"Failed to get object storage. Please check your dependency of
object storage module.",
@@ -187,7 +194,7 @@ public class OSFSFactory implements FSFactory {
}
@Override
- public void moveFile(File srcFile, File destFile) {
+ public void moveFile(File srcFile, File destFile) throws IOException {
try {
renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
} catch (InstantiationException | InvocationTargetException |
IllegalAccessException e) {
@@ -195,11 +202,31 @@ public class OSFSFactory implements FSFactory {
"Failed to rename file from {} to {}. Please check your dependency
of object storage module.",
srcFile.getName(),
destFile.getName());
+ throw new IOException(e);
}
}
@Override
- public void copyFile(File srcFile, File destFile) throws IOException {}
+ public void copyFile(File srcFile, File destFile) throws IOException {
+ FSType srcType = FSUtils.getFSType(srcFile);
+ try {
+ if (srcType == FSType.LOCAL) {
+
putFile.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
+ } else if (srcType == FSType.OBJECT_STORAGE) {
+
copyTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
+ } else {
+ throw new IOException(
+ String.format(
+ "Doesn't support move file from %s to %s.", srcType,
FSType.OBJECT_STORAGE));
+ }
+ } catch (InstantiationException | InvocationTargetException |
IllegalAccessException e) {
+ logger.error(
+ "Failed to copy file from {} to {}. Please check your dependency of
object storage module.",
+ srcFile.getName(),
+ destFile.getName());
+ throw new IOException(e);
+ }
+ }
@Override
public File[] listFilesBySuffix(String fileFolder, String suffix) {
@@ -239,5 +266,7 @@ public class OSFSFactory implements FSFactory {
}
@Override
- public void deleteDirectory(String dir) throws IOException {}
+ public void deleteDirectory(String dir) throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
index 6ab36b56634..7133841052c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -26,11 +26,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
public class FSUtils {
private static final Logger logger = LoggerFactory.getLogger(FSUtils.class);
private static final FSType[] fsTypes = {FSType.OBJECT_STORAGE, FSType.HDFS};
public static final String[] fsPrefix = {"os://", "hdfs://"};
+ public static final String OS_FILE_SEPARATOR = "/";
private static final String[] fsFileClassName = {
"org.apache.iotdb.os.fileSystem.OSFile",
"org.apache.iotdb.hadoop.fileSystem.HDFSFile"
};
@@ -96,22 +98,24 @@ public class FSUtils {
}
public static String getOSDefaultPath(String bucket, int dataNodeId) {
- return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + "/" +
dataNodeId).getPath();
+ return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + OS_FILE_SEPARATOR +
dataNodeId)
+ .getPath();
}
public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket,
int dataNodeId)
throws IOException {
- String canonicalPath = lcoalFile.getCanonicalPath();
- int startIdx = canonicalPath.lastIndexOf("unsequence");
- if (startIdx < 0) {
- startIdx = canonicalPath.lastIndexOf("sequence");
- }
- if (startIdx < 0) {
- throw new IllegalArgumentException(canonicalPath + "isn't a TsFile
path.");
- }
+ String[] filePathSplits =
FilePathUtils.splitTsFilePath(lcoalFile.getCanonicalPath());
return new FSPath(
FSType.OBJECT_STORAGE,
- fsPrefix[0] + bucket + "/" + dataNodeId + "/" +
canonicalPath.substring(startIdx));
+ fsPrefix[0]
+ + bucket
+ + OS_FILE_SEPARATOR
+ + dataNodeId
+ + OS_FILE_SEPARATOR
+ + String.join(
+ OS_FILE_SEPARATOR,
+ Arrays.copyOfRange(
+ filePathSplits, filePathSplits.length - 5,
filePathSplits.length)));
}
public static boolean isLocal(String fsPath) {