This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 024602157c0 impl delete directory
024602157c0 is described below
commit 024602157c0f62f16560d3d1d09c8ec375297456
Author: HeimingZ <[email protected]>
AuthorDate: Thu May 25 14:38:33 2023 +0800
impl delete directory
---
.../org/apache/iotdb/os/cache/OSFileCache.java | 2 +-
.../org/apache/iotdb/os/fileSystem/OSFile.java | 12 +++++-
.../apache/iotdb/os/io/ObjectStorageConnector.java | 6 ++-
.../iotdb/os/io/aws/S3ObjectStorageConnector.java | 45 ++++++++++++++++++++-
.../os/io/test/TestObjectStorageConnector.java | 14 ++++++-
.../org/apache/iotdb/os/cache/OSFileCacheTest.java | 4 +-
.../apache/iotdb/os/cache/OSFileChannelTest.java | 3 +-
.../apache/iotdb/os/cache/OSInputStreamTest.java | 3 +-
.../iotdb/db/engine/storagegroup/DataRegion.java | 47 ++++++++++++----------
.../engine/storagegroup/FakedTsFileResource.java | 4 +-
.../tsfile/fileSystem/fsFactory/HDFSFactory.java | 2 +-
.../tsfile/fileSystem/fsFactory/OSFSFactory.java | 19 ++++++---
12 files changed, 119 insertions(+), 42 deletions(-)
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index b302d6983ad..a43c28d8016 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -86,7 +86,7 @@ public class OSFileCache {
@Override
public @Nullable OSFileCacheValue load(@NonNull OSFileCacheKey key) throws
Exception {
byte[] data =
- connector.getRemoteFile(
+ connector.getRemoteObject(
key.getFile().toOSURI(), key.getStartPosition(),
config.getCachePageSize());
return cacheFileManager.persist(key, data);
}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index a21e2b6251c..96c9a4e7eb6 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -245,7 +245,7 @@ public class OSFile extends File {
@Override
public File[] listFiles() {
- return super.listFiles();
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
}
@Override
@@ -440,12 +440,20 @@ public class OSFile extends File {
public void copyTo(File destFile) {
try {
- connector.copyRemoteFile(osUri, ((OSFile) destFile).toOSURI());
+ connector.copyObject(osUri, ((OSFile) destFile).toOSURI());
} catch (ObjectStorageException e) {
logger.error("Fail to copy file {} to {}.", osUri, destFile, e);
}
}
+ public void deleteObjectsByPrefix() {
+ try {
+ connector.deleteObjectsByPrefix(osUri);
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to delete objects by prefix {}.", osUri, e);
+ }
+ }
+
// test only
public void setConnector(ObjectStorageConnector connector) {
this.connector = connector;
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
index 64322fe90fe..a8bb812011c 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
@@ -42,7 +42,9 @@ public interface ObjectStorageConnector {
void putLocalFile(OSURI osUri, File lcoalFile) throws ObjectStorageException;
- byte[] getRemoteFile(OSURI osUri, long position, int len) throws
ObjectStorageException;
+ byte[] getRemoteObject(OSURI osUri, long position, int len) throws
ObjectStorageException;
- void copyRemoteFile(OSURI srcUri, OSURI destUri) throws
ObjectStorageException;
+ void copyObject(OSURI srcUri, OSURI destUri) throws ObjectStorageException;
+
+ void deleteObjectsByPrefix(OSURI prefixUri) throws ObjectStorageException;
}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
index 964dd77fcd4..7f4b78ccc85 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
@@ -33,19 +33,25 @@ import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import java.io.File;
import java.io.InputStream;
+import java.util.stream.Collectors;
public class S3ObjectStorageConnector implements ObjectStorageConnector {
private static final String RANGE_FORMAT = "%d-%d";
@@ -169,7 +175,7 @@ public class S3ObjectStorageConnector implements
ObjectStorageConnector {
}
@Override
- public byte[] getRemoteFile(OSURI osUri, long position, int len) throws
ObjectStorageException {
+ public byte[] getRemoteObject(OSURI osUri, long position, int len) throws
ObjectStorageException {
String rangeStr = String.format(RANGE_FORMAT, position, position + len -
1);
try {
GetObjectRequest req =
@@ -186,7 +192,7 @@ public class S3ObjectStorageConnector implements
ObjectStorageConnector {
}
@Override
- public void copyRemoteFile(OSURI srcUri, OSURI destUri) throws
ObjectStorageException {
+ public void copyObject(OSURI srcUri, OSURI destUri) throws
ObjectStorageException {
try {
CopyObjectRequest req =
CopyObjectRequest.builder()
@@ -201,6 +207,41 @@ public class S3ObjectStorageConnector implements
ObjectStorageConnector {
}
}
+ @Override
+ public void deleteObjectsByPrefix(OSURI prefixUri) throws
ObjectStorageException {
+ try {
+ ListObjectsV2Request listReq =
+ ListObjectsV2Request.builder()
+ .bucket(prefixUri.getBucket())
+ .prefix(prefixUri.getKey())
+ .build();
+ ListObjectsV2Response listRes;
+
+ do {
+ listRes = s3Client.listObjectsV2(listReq);
+ Delete del =
+ Delete.builder()
+ .objects(
+ listRes.contents().stream()
+ .map(s3Object ->
ObjectIdentifier.builder().key(s3Object.key()).build())
+ .collect(Collectors.toList()))
+ .build();
+ DeleteObjectsRequest deleteObjectsRequest =
+ DeleteObjectsRequest.builder().delete(del).build();
+ s3Client.deleteObjects(deleteObjectsRequest);
+
+ listReq =
+ ListObjectsV2Request.builder()
+ .bucket(prefixUri.getBucket())
+ .prefix(prefixUri.getKey())
+ .continuationToken(listRes.nextContinuationToken())
+ .build();
+ } while (listRes.isTruncated());
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
+ }
+ }
+
public void close() {
s3Client.close();
}
diff --git
a/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java
b/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java
index cfeb74e6908..125e5f9f79e 100644
---
a/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java
+++
b/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java
@@ -110,7 +110,7 @@ public class TestObjectStorageConnector implements
ObjectStorageConnector {
}
@Override
- public byte[] getRemoteFile(OSURI osUri, long position, int len) throws
ObjectStorageException {
+ public byte[] getRemoteObject(OSURI osUri, long position, int len) throws
ObjectStorageException {
File file = new File(getDstFilePath(osUri));
ByteBuffer dst = ByteBuffer.allocate(len);
try (FileChannel channel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
@@ -121,6 +121,16 @@ public class TestObjectStorageConnector implements
ObjectStorageConnector {
return dst.array();
}
+ @Override
+ public void deleteObjectsByPrefix(OSURI prefixUri) throws
ObjectStorageException {
+ File file = new File(getDstFilePath(prefixUri));
+ try {
+ FileUtils.deleteDirectory(file);
+ } catch (IOException e) {
+ throw new ObjectStorageException(e);
+ }
+ }
+
private String getDstFilePath(OSURI osuri) {
return testConfig.getTestDir()
+ File.separator
@@ -128,5 +138,5 @@ public class TestObjectStorageConnector implements
ObjectStorageConnector {
}
@Override
- public void copyRemoteFile(OSURI srcUri, OSURI destUri) throws
ObjectStorageException {}
+ public void copyObject(OSURI srcUri, OSURI destUri) throws
ObjectStorageException {}
}
diff --git
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
index 2e76d3f84e1..3b3b2f0a1c8 100644
---
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileCacheTest.java
@@ -97,7 +97,7 @@ public class OSFileCacheTest {
for (int i = 0; i < 100; ++i) {
bytes0to100[i] = (byte) i;
}
- when(connector.getRemoteFile(testFile.toOSURI(), 0,
100)).thenReturn(bytes0to100);
+ when(connector.getRemoteObject(testFile.toOSURI(), 0,
100)).thenReturn(bytes0to100);
OSFileCacheKey key0to100 = new OSFileCacheKey(testFile, 0);
OSFileCacheValue value0to100 = cache.get(key0to100);
assertTrue(value0to100.getCacheFile().exists());
@@ -118,7 +118,7 @@ public class OSFileCacheTest {
for (int i = 0; i < 150 - 100; ++i) {
bytes100to150[i] = (byte) i;
}
- when(connector.getRemoteFile(testFile.toOSURI(), 100,
100)).thenReturn(bytes100to150);
+ when(connector.getRemoteObject(testFile.toOSURI(), 100,
100)).thenReturn(bytes100to150);
OSFileCacheKey key100to150 = new OSFileCacheKey(testFile, 100);
OSFileCacheValue value100to150 = cache.get(key100to150);
assertTrue(value100to150.getCacheFile().exists());
diff --git
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
index eb6bcfa0f7d..614affeeede 100644
---
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSFileChannelTest.java
@@ -104,7 +104,8 @@ public class OSFileChannelTest {
for (int i = 0; i < bytes.length; ++i) {
bytes[i] = (byte) (startPos + i);
}
- when(connector.getRemoteFile(testFile.toOSURI(), startPos,
cachePageSize)).thenReturn(bytes);
+ when(connector.getRemoteObject(testFile.toOSURI(), startPos,
cachePageSize))
+ .thenReturn(bytes);
startPos += cachePageSize;
}
}
diff --git
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
index 37574ec70df..3835923af38 100644
---
a/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
+++
b/object-storage/src/test/java/org/apache/iotdb/os/cache/OSInputStreamTest.java
@@ -98,7 +98,8 @@ public class OSInputStreamTest {
for (int i = 0; i < bytes.length; ++i) {
bytes[i] = (byte) (startPos + i);
}
- when(connector.getRemoteFile(testFile.toOSURI(), startPos,
cachePageSize)).thenReturn(bytes);
+ when(connector.getRemoteObject(testFile.toOSURI(), startPos,
cachePageSize))
+ .thenReturn(bytes);
startPos += cachePageSize;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 42a74b27217..54f97a29609 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -103,8 +103,10 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.FSUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -325,25 +327,20 @@ public class DataRegion implements IDataRegionForQuery {
"Skip recovering data region {}[{}] when consensus protocol is ratis
and storage engine is not ready.",
databaseName,
dataRegionId);
- for (String fileFolder :
TierManager.getInstance().getAllLocalFilesFolders()) {
+ for (String fileFolder : TierManager.getInstance().getAllFilesFolders())
{
File dataRegionFolder =
fsFactory.getFile(fileFolder, databaseName + File.separator +
dataRegionId);
- if (dataRegionFolder.exists()) {
- File[] timePartitions = dataRegionFolder.listFiles();
- if (timePartitions != null) {
- for (File timePartition : timePartitions) {
- try {
- FileUtils.forceDelete(timePartition);
- } catch (IOException e) {
- logger.error(
- "Exception occurs when deleting time partition directory
{} for {}-{}",
- timePartitions,
- databaseName,
- dataRegionId,
- e);
- }
- }
- }
+ try {
+ fsFactory.deleteDirectory(dataRegionFolder.getPath());
+ } catch (IOException e) {
+ logger.error(
+ "Exception occurs when deleting data region folder for {}-{}",
+ databaseName,
+ dataRegionId,
+ e);
+ }
+ if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) {
+ dataRegionFolder.mkdirs();
}
}
} else {
@@ -1606,7 +1603,7 @@ public class DataRegion implements IDataRegionForQuery {
TsFileMetricManager.getInstance().decreaseModFileSize(x.getModFile().getSize());
}
});
- deleteAllSGFolders(TierManager.getInstance().getAllLocalFilesFolders());
+ deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders());
this.workSequenceTsFileProcessors.clear();
this.workUnsequenceTsFileProcessors.clear();
@@ -1622,9 +1619,17 @@ public class DataRegion implements IDataRegionForQuery {
for (String tsfilePath : folder) {
File dataRegionDataFolder =
fsFactory.getFile(tsfilePath, databaseName + File.separator +
dataRegionId);
- if (dataRegionDataFolder.exists()) {
- org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
- dataRegionDataFolder);
+ if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) {
+ try {
+ fsFactory.deleteDirectory(dataRegionDataFolder.getPath());
+ } catch (IOException e) {
+ logger.error("Fail to delete data region folder {}",
dataRegionDataFolder);
+ }
+ } else {
+ if (dataRegionDataFolder.exists()) {
+
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
+ dataRegionDataFolder);
+ }
}
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java
index 085c1279374..d8535833372 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FakedTsFileResource.java
@@ -35,8 +35,8 @@ public class FakedTsFileResource extends TsFileResource {
public FakedTsFileResource(long tsFileSize, String name) {
this.timeIndex = new FileTimeIndex();
this.tsFileSize = tsFileSize;
- super.status = TsFileResourceStatus.NORMAL;
fakeTsfileName = name;
+ setStatusForTest(TsFileResourceStatus.NORMAL);
}
public void setTsFileSize(long tsFileSize) {
@@ -52,7 +52,7 @@ public class FakedTsFileResource extends TsFileResource {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(tsFileSize).append(",");
- builder.append(status);
+ builder.append(getStatus());
return builder.toString();
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index fc13c99e7eb..057ccf2cf29 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -271,6 +271,6 @@ public class HDFSFactory implements FSFactory {
@Override
public void deleteDirectory(String dir) throws IOException {
- throw new UnsupportedOperationException();
+ getFile(dir).delete();
}
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
index 130ee579d7b..6f9fe4e49ab 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
@@ -52,6 +52,7 @@ public class OSFSFactory implements FSFactory {
private static Method renameTo;
private static Method putFile;
private static Method copyTo;
+ private static Method deleteObjectsByPrefix;
static {
try {
@@ -69,6 +70,7 @@ public class OSFSFactory implements FSFactory {
renameTo = clazz.getMethod("renameTo", File.class);
putFile = clazz.getMethod("putFile", File.class);
copyTo = clazz.getMethod("copyTo", File.class);
+ deleteObjectsByPrefix = clazz.getMethod("deleteObjectsByPrefix");
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger.error(
"Failed to get object storage. Please check your dependency of
object storage module.",
@@ -196,8 +198,8 @@ public class OSFSFactory implements FSFactory {
@Override
public void moveFile(File srcFile, File destFile) throws IOException {
try {
-
renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
- } catch (InstantiationException | InvocationTargetException |
IllegalAccessException e) {
+ renameTo.invoke(srcFile, destFile);
+ } catch (InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to rename file from {} to {}. Please check your dependency
of object storage module.",
srcFile.getName(),
@@ -213,13 +215,13 @@ public class OSFSFactory implements FSFactory {
if (srcType == FSType.LOCAL) {
putFile.invoke(destFile, srcFile);
} else if (srcType == FSType.OBJECT_STORAGE) {
-
copyTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()),
destFile);
+ copyTo.invoke(srcFile, destFile);
} else {
throw new IOException(
String.format(
"Doesn't support copy file from %s to %s.", srcType,
FSType.OBJECT_STORAGE));
}
- } catch (InstantiationException | InvocationTargetException |
IllegalAccessException e) {
+ } catch (InvocationTargetException | IllegalAccessException e) {
logger.error(
"Failed to copy file from {} to {}. Please check your dependency of
object storage module.",
srcFile.getName(),
@@ -267,6 +269,13 @@ public class OSFSFactory implements FSFactory {
@Override
public void deleteDirectory(String dir) throws IOException {
- throw new UnsupportedOperationException();
+ try {
+ deleteObjectsByPrefix.invoke(constructorWithPathname.newInstance(dir));
+ } catch (InstantiationException | InvocationTargetException |
IllegalAccessException e) {
+ logger.error(
+ "Failed to delete directory {}. Please check your dependency of
object storage module.",
+ dir,
+ e);
+ }
}
}