This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 0db916473e [Improvement-13437][UT] Add UT for S3StorageOperator
(#13530)
0db916473e is described below
commit 0db916473e93d5e71f2475a487f08b6946952b2a
Author: Rick Cheng <[email protected]>
AuthorDate: Wed Feb 22 19:57:18 2023 +0800
[Improvement-13437][UT] Add UT for S3StorageOperator (#13530)
---
.../plugin/storage/s3/S3StorageOperator.java | 117 +++++---
.../storage/s3/S3StorageOperatorFactory.java | 4 +-
.../plugin/storage/s3/S3StorageOperatorTest.java | 298 +++++++++++++++++++++
3 files changed, 377 insertions(+), 42 deletions(-)
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
index e9600a0737..ca54143c9b 100644
---
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
+++
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.AmazonServiceException;
@@ -78,38 +79,72 @@ import
com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.google.common.base.Joiner;
@Slf4j
+@Data
public class S3StorageOperator implements Closeable, StorageOperate {
- // todo: move to s3
- private static final String ACCESS_KEY_ID =
PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+ private String accessKeyId;
- private static final String SECRET_KEY_ID =
PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+ private String accessKeySecret;
- private static final String REGION =
PropertyUtils.getString(TaskConstants.AWS_REGION);
+ private String region;
- private static final String BUCKET_NAME =
PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
+ private String bucketName;
- private final AmazonS3 s3Client;
+ private String endPoint;
+
+ private AmazonS3 s3Client;
public S3StorageOperator() {
- if (!StringUtils.isEmpty(PropertyUtils.getString(AWS_END_POINT))) {
- s3Client = AmazonS3ClientBuilder
+ }
+
+ public void init() {
+ accessKeyId = readAccessKeyID();
+ accessKeySecret = readAccessKeySecret();
+ region = readRegion();
+ bucketName = readBucketName();
+ endPoint = readEndPoint();
+ s3Client = buildS3Client();
+ checkBucketNameExists(bucketName);
+ }
+
+ protected AmazonS3 buildS3Client() {
+ if (!StringUtils.isEmpty(endPoint)) {
+ return AmazonS3ClientBuilder
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
- PropertyUtils.getString(AWS_END_POINT),
Regions.fromName(REGION).getName()))
+ endPoint, Regions.fromName(region).getName()))
.withCredentials(
- new AWSStaticCredentialsProvider(new
BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
+ new AWSStaticCredentialsProvider(new
BasicAWSCredentials(accessKeyId, accessKeySecret)))
.build();
} else {
- s3Client = AmazonS3ClientBuilder
+ return AmazonS3ClientBuilder
.standard()
.withCredentials(
- new AWSStaticCredentialsProvider(new
BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
- .withRegion(Regions.fromName(REGION))
+ new AWSStaticCredentialsProvider(new
BasicAWSCredentials(accessKeyId, accessKeySecret)))
+ .withRegion(Regions.fromName(region))
.build();
}
- checkBucketNameExists(BUCKET_NAME);
+ }
+
+ protected String readAccessKeyID() {
+ return PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
+ }
+
+ protected String readAccessKeySecret() {
+ return PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
+ }
+
+ protected String readRegion() {
+ return PropertyUtils.getString(TaskConstants.AWS_REGION);
+ }
+
+ protected String readBucketName() {
+ return PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
+ }
+
+ protected String readEndPoint() {
+ return PropertyUtils.getString(AWS_END_POINT);
}
@Override
@@ -136,11 +171,11 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
@Override
public boolean mkdir(String tenantCode, String path) throws IOException {
String objectName = path + FOLDER_SEPARATOR;
- if (!s3Client.doesObjectExist(BUCKET_NAME, objectName)) {
+ if (!s3Client.doesObjectExist(bucketName, objectName)) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
- PutObjectRequest putObjectRequest = new
PutObjectRequest(BUCKET_NAME, objectName, emptyContent, metadata);
+ PutObjectRequest putObjectRequest = new
PutObjectRequest(bucketName, objectName, emptyContent, metadata);
s3Client.putObject(putObjectRequest);
}
return true;
@@ -191,7 +226,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
} else {
Files.createDirectories(dstFile.getParentFile().toPath());
}
- S3Object o = s3Client.getObject(BUCKET_NAME, srcFilePath);
+ S3Object o = s3Client.getObject(bucketName, srcFilePath);
try (
S3ObjectInputStream s3is = o.getObjectContent();
FileOutputStream fos = new FileOutputStream(dstFilePath)) {
@@ -210,13 +245,13 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
@Override
public boolean exists(String fullName) throws IOException {
- return s3Client.doesObjectExist(BUCKET_NAME, fullName);
+ return s3Client.doesObjectExist(bucketName, fullName);
}
@Override
public boolean delete(String fullName, boolean recursive) throws
IOException {
try {
- s3Client.deleteObject(BUCKET_NAME, fullName);
+ s3Client.deleteObject(bucketName, fullName);
return true;
} catch (AmazonServiceException e) {
log.error("delete the object error,the resource path is {}",
fullName);
@@ -229,7 +264,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
// append the resource fullName to the list for deletion.
childrenPathList.add(fullName);
- DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(BUCKET_NAME)
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(bucketName)
.withKeys(childrenPathList.stream().toArray(String[]::new));
try {
s3Client.deleteObjects(deleteObjectsRequest);
@@ -243,8 +278,8 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
@Override
public boolean copy(String srcPath, String dstPath, boolean deleteSource,
boolean overwrite) throws IOException {
- s3Client.copyObject(BUCKET_NAME, srcPath, BUCKET_NAME, dstPath);
- s3Client.deleteObject(BUCKET_NAME, srcPath);
+ s3Client.copyObject(bucketName, srcPath, bucketName, dstPath);
+ s3Client.deleteObject(bucketName, srcPath);
return true;
}
@@ -265,10 +300,10 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
public boolean upload(String tenantCode, String srcFile, String dstPath,
boolean deleteSource,
boolean overwrite) throws IOException {
try {
- s3Client.putObject(BUCKET_NAME, dstPath, new File(srcFile));
+ s3Client.putObject(bucketName, dstPath, new File(srcFile));
return true;
} catch (AmazonServiceException e) {
- log.error("upload failed,the bucketName is {},the filePath is {}",
BUCKET_NAME, dstPath);
+ log.error("upload failed,the bucketName is {},the filePath is {}",
bucketName, dstPath);
return false;
}
}
@@ -279,7 +314,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
log.error("file path:{} is blank", filePath);
return Collections.emptyList();
}
- S3Object s3Object = s3Client.getObject(BUCKET_NAME, filePath);
+ S3Object s3Object = s3Client.getObject(bucketName, filePath);
try (BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(s3Object.getObjectContent()))) {
Stream<String> stream =
bufferedReader.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
@@ -297,7 +332,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
* @param tenantCode tenant code
* @return S3 resource dir
*/
- public static String getS3ResDir(String tenantCode) {
+ public String getS3ResDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_FILE,
getS3TenantDir(tenantCode));
}
@@ -307,7 +342,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
* @param tenantCode tenant code
* @return get udf dir on S3
*/
- public static String getS3UdfDir(String tenantCode) {
+ public String getS3UdfDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_UDF,
getS3TenantDir(tenantCode));
}
@@ -315,7 +350,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
* @param tenantCode tenant code
* @return file directory of tenants on S3
*/
- public static String getS3TenantDir(String tenantCode) {
+ public String getS3TenantDir(String tenantCode) {
return String.format(FORMAT_S_S, getS3DataBasePath(), tenantCode);
}
@@ -324,7 +359,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
*
* @return data S3 path
*/
- public static String getS3DataBasePath() {
+ public String getS3DataBasePath() {
if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
return "";
} else {
@@ -332,9 +367,9 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
}
}
- private void deleteTenantCode(String tenantCode) {
- deleteDirectory(getResDir(tenantCode));
- deleteDirectory(getUdfDir(tenantCode));
+ protected void deleteTenantCode(String tenantCode) {
+ deleteDir(getResDir(tenantCode));
+ deleteDir(getUdfDir(tenantCode));
}
/**
@@ -346,7 +381,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
* @param strPath
*/
private void uploadDirectory(String tenantCode, String keyPrefix, String
strPath) {
- s3Client.putObject(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR +
keyPrefix, new File(strPath));
+ s3Client.putObject(bucketName, tenantCode + FOLDER_SEPARATOR +
keyPrefix, new File(strPath));
}
/**
@@ -361,10 +396,10 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
TransferManager tm =
TransferManagerBuilder.standard().withS3Client(s3Client).build();
try {
MultipleFileDownload download =
- tm.downloadDirectory(BUCKET_NAME, tenantCode +
FOLDER_SEPARATOR + keyPrefix, new File(srcPath));
+ tm.downloadDirectory(bucketName, tenantCode +
FOLDER_SEPARATOR + keyPrefix, new File(srcPath));
download.waitForCompletion();
} catch (AmazonS3Exception | InterruptedException e) {
- log.error("download the directory failed with the bucketName is {}
and the keyPrefix is {}", BUCKET_NAME,
+ log.error("download the directory failed with the bucketName is {}
and the keyPrefix is {}", bucketName,
tenantCode + FOLDER_SEPARATOR + keyPrefix);
Thread.currentThread().interrupt();
} finally {
@@ -394,9 +429,9 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
/**
* only delete the object of directory ,it`s better to delete the files in
it -r
*/
- private void deleteDirectory(String directoryName) {
- if (s3Client.doesObjectExist(BUCKET_NAME, directoryName)) {
- s3Client.deleteObject(BUCKET_NAME, directoryName);
+ protected void deleteDir(String directoryName) {
+ if (s3Client.doesObjectExist(bucketName, directoryName)) {
+ s3Client.deleteObject(bucketName, directoryName);
}
}
@@ -446,7 +481,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
// TODO: optimize pagination
ListObjectsV2Request request = new ListObjectsV2Request();
- request.setBucketName(BUCKET_NAME);
+ request.setBucketName(bucketName);
request.setPrefix(path);
request.setDelimiter("/");
@@ -520,7 +555,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
// Since we still want to access it on frontend, this is a workaround
using listObjects.
ListObjectsV2Request request = new ListObjectsV2Request();
- request.setBucketName(BUCKET_NAME);
+ request.setBucketName(bucketName);
request.setPrefix(path);
request.setDelimiter("/");
@@ -574,7 +609,7 @@ public class S3StorageOperator implements Closeable,
StorageOperate {
}
}
- throw new FileNotFoundException("Object is not found in S3 Bucket: " +
BUCKET_NAME);
+ throw new FileNotFoundException("Object is not found in S3 Bucket: " +
bucketName);
}
/**
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorFactory.java
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorFactory.java
index c3b0821cdd..e1c3a41743 100644
---
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorFactory.java
+++
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorFactory.java
@@ -28,7 +28,9 @@ public class S3StorageOperatorFactory implements
StorageOperateFactory {
@Override
public StorageOperate createStorageOperate() {
- return new S3StorageOperator();
+ S3StorageOperator s3StorageOperator = new S3StorageOperator();
+ s3StorageOperator.init();
+ return s3StorageOperator;
}
@Override
diff --git
a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java
new file mode 100644
index 0000000000..c94dfd44ec
--- /dev/null
+++
b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.storage.s3;
+
+import static
org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
+import static
org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+
+@ExtendWith(MockitoExtension.class)
+public class S3StorageOperatorTest {
+
+ private static final String ACCESS_KEY_ID_MOCK = "ACCESS_KEY_ID_MOCK";
+
+ private static final String ACCESS_KEY_SECRET_MOCK =
"ACCESS_KEY_SECRET_MOCK";
+
+ private static final String REGION_MOCK = "REGION_MOCK";
+
+ private static final String END_POINT_MOCK = "END_POINT_MOCK";
+
+ private static final String BUCKET_NAME_MOCK = "BUCKET_NAME_MOCK";
+
+ private static final String TENANT_CODE_MOCK = "TENANT_CODE_MOCK";
+
+ private static final String DIR_MOCK = "DIR_MOCK";
+
+ private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK";
+
+ private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK";
+
+ private static final String FULL_NAME = "/tmp/dir1/";
+
+ private static final String DEFAULT_PATH = "/tmp/";
+
+ @Mock
+ private AmazonS3 s3Client;
+
+ private S3StorageOperator s3StorageOperator;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ s3StorageOperator = Mockito.spy(new S3StorageOperator());
+
+ doReturn(ACCESS_KEY_ID_MOCK).when(s3StorageOperator)
+ .readAccessKeyID();
+ doReturn(ACCESS_KEY_SECRET_MOCK).when(s3StorageOperator)
+ .readAccessKeySecret();
+ doReturn(REGION_MOCK).when(s3StorageOperator).readRegion();
+ doReturn(BUCKET_NAME_MOCK).when(s3StorageOperator).readBucketName();
+ doReturn(END_POINT_MOCK).when(s3StorageOperator).readEndPoint();
+ Mockito.doReturn(s3Client)
+ .when(s3StorageOperator).buildS3Client();
+ Mockito.doNothing()
+ .when(s3StorageOperator).checkBucketNameExists(Mockito.any());
+
+ s3StorageOperator.init();
+ }
+
+ @Test
+ public void testInit() {
+ verify(s3StorageOperator, times(1)).buildS3Client();
+ Assertions.assertEquals(ACCESS_KEY_ID_MOCK,
s3StorageOperator.getAccessKeyId());
+ Assertions.assertEquals(ACCESS_KEY_SECRET_MOCK,
s3StorageOperator.getAccessKeySecret());
+ Assertions.assertEquals(REGION_MOCK, s3StorageOperator.getRegion());
+ Assertions.assertEquals(BUCKET_NAME_MOCK,
s3StorageOperator.getBucketName());
+ }
+
+ @Test
+ public void testTearDown() throws IOException {
+ doNothing().when(s3Client).shutdown();
+ s3StorageOperator.close();
+ verify(s3Client, times(1)).shutdown();
+ }
+
+ @Test
+ public void testCreateTenantResAndUdfDir() throws Exception {
+
doReturn(DIR_MOCK).when(s3StorageOperator).getS3ResDir(TENANT_CODE_MOCK);
+
doReturn(DIR_MOCK).when(s3StorageOperator).getS3UdfDir(TENANT_CODE_MOCK);
+ doReturn(true).when(s3StorageOperator).mkdir(TENANT_CODE_MOCK,
DIR_MOCK);
+ s3StorageOperator.createTenantDirIfNotExists(TENANT_CODE_MOCK);
+ verify(s3StorageOperator, times(2)).mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+ }
+
+ @Test
+ public void testGetResDir() {
+ final String expectedResourceDir =
String.format("dolphinscheduler/%s/resources/", TENANT_CODE_MOCK);
+ final String dir = s3StorageOperator.getResDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedResourceDir, dir);
+ }
+
+ @Test
+ public void testGetUdfDir() {
+ final String expectedUdfDir =
String.format("dolphinscheduler/%s/udfs/", TENANT_CODE_MOCK);
+ final String dir = s3StorageOperator.getUdfDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedUdfDir, dir);
+ }
+
+ @Test
+ public void mkdirWhenDirExists() {
+ boolean isSuccess = false;
+ try {
+ final String key = DIR_MOCK + FOLDER_SEPARATOR;
+ doReturn(true).when(s3Client).doesObjectExist(BUCKET_NAME_MOCK,
key);
+ isSuccess = s3StorageOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+ verify(s3Client, times(1)).doesObjectExist(BUCKET_NAME_MOCK, key);
+
+ } catch (IOException e) {
+ Assertions.fail("test failed due to unexpected IO exception");
+ }
+
+ Assertions.assertTrue(isSuccess);
+ }
+
+ @Test
+ public void mkdirWhenDirNotExists() {
+ boolean isSuccess = true;
+ try {
+ final String key = DIR_MOCK + FOLDER_SEPARATOR;
+ doReturn(false).when(s3Client).doesObjectExist(BUCKET_NAME_MOCK,
key);
+ isSuccess = s3StorageOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK);
+ verify(s3Client, times(1)).doesObjectExist(BUCKET_NAME_MOCK, key);
+
+ } catch (IOException e) {
+ Assertions.fail("test failed due to unexpected IO exception");
+ }
+
+ Assertions.assertTrue(isSuccess);
+ }
+
+ @Test
+ public void getResourceFileName() {
+ final String expectedResourceFileName =
+ String.format("dolphinscheduler/%s/resources/%s",
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ final String resourceFileName =
s3StorageOperator.getResourceFileName(TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ Assertions.assertEquals(expectedResourceFileName, resourceFileName);
+ }
+
+ @Test
+ public void getFileName() {
+ final String expectedFileName =
+ String.format("dolphinscheduler/%s/resources/%s",
TENANT_CODE_MOCK, FILE_NAME_MOCK);
+ final String fileName =
s3StorageOperator.getFileName(ResourceType.FILE, TENANT_CODE_MOCK,
FILE_NAME_MOCK);
+ Assertions.assertEquals(expectedFileName, fileName);
+ }
+
+ @Test
+ public void exists() {
+ boolean doesExist = false;
+ doReturn(true).when(s3Client).doesObjectExist(BUCKET_NAME_MOCK,
FILE_NAME_MOCK);
+ try {
+ doesExist = s3StorageOperator.exists(FILE_NAME_MOCK);
+ } catch (IOException e) {
+ Assertions.fail("unexpected IO exception in unit test");
+ }
+
+ Assertions.assertTrue(doesExist);
+ verify(s3Client, times(1)).doesObjectExist(BUCKET_NAME_MOCK,
FILE_NAME_MOCK);
+ }
+
+ @Test
+ public void delete() {
+ doNothing().when(s3Client).deleteObject(anyString(), anyString());
+ try {
+ s3StorageOperator.delete(FILE_NAME_MOCK, true);
+ } catch (IOException e) {
+ Assertions.fail("unexpected IO exception in unit test");
+ }
+
+ verify(s3Client, times(1)).deleteObject(anyString(), anyString());
+ }
+
+ @Test
+ public void copy() {
+ boolean isSuccess = false;
+ doReturn(null).when(s3Client).copyObject(anyString(), anyString(),
anyString(), anyString());
+ doNothing().when(s3Client).deleteObject(anyString(), anyString());
+ try {
+ isSuccess = s3StorageOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK,
false, false);
+ } catch (IOException e) {
+ Assertions.fail("unexpected IO exception in unit test");
+ }
+
+ Assertions.assertTrue(isSuccess);
+ verify(s3Client, times(1)).copyObject(anyString(), anyString(),
anyString(), anyString());
+ verify(s3Client, times(1)).deleteObject(anyString(), anyString());
+ }
+
+ @Test
+ public void deleteTenant() {
+ doNothing().when(s3StorageOperator).deleteTenantCode(anyString());
+ try {
+ s3StorageOperator.deleteTenant(TENANT_CODE_MOCK);
+ } catch (Exception e) {
+ Assertions.fail("unexpected exception caught in unit test");
+ }
+
+ verify(s3StorageOperator, times(1)).deleteTenantCode(anyString());
+ }
+
+ @Test
+ public void testGetS3ResDir() {
+ final String expectedS3ResDir =
String.format("dolphinscheduler/%s/resources", TENANT_CODE_MOCK);
+ final String s3ResDir =
s3StorageOperator.getS3ResDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedS3ResDir, s3ResDir);
+ }
+
+ @Test
+ public void testGetS3UdfDir() {
+ final String expectedS3UdfDir =
String.format("dolphinscheduler/%s/udfs", TENANT_CODE_MOCK);
+ final String s3UdfDir =
s3StorageOperator.getS3UdfDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedS3UdfDir, s3UdfDir);
+ }
+
+ @Test
+ public void testGetS3TenantDir() {
+ final String expectedS3TenantDir = String.format(FORMAT_S_S, DIR_MOCK,
TENANT_CODE_MOCK);
+ doReturn(DIR_MOCK).when(s3StorageOperator).getS3DataBasePath();
+ final String s3TenantDir =
s3StorageOperator.getS3TenantDir(TENANT_CODE_MOCK);
+ Assertions.assertEquals(expectedS3TenantDir, s3TenantDir);
+ }
+
+ @Test
+ public void deleteDir() {
+ doReturn(true).when(s3Client).doesObjectExist(anyString(),
anyString());
+ s3StorageOperator.deleteDir(DIR_MOCK);
+ verify(s3Client, times(1)).deleteObject(anyString(), anyString());
+ }
+
+ @Test
+ public void testGetFileStatus() throws Exception {
+ doReturn(new
ListObjectsV2Result()).when(s3Client).listObjectsV2(Mockito.any(ListObjectsV2Request.class));
+ StorageEntity entity =
+ s3StorageOperator.getFileStatus(FULL_NAME, DEFAULT_PATH,
TENANT_CODE_MOCK, ResourceType.FILE);
+ Assertions.assertEquals(FULL_NAME, entity.getFullName());
+ Assertions.assertEquals("dir1/", entity.getFileName());
+ }
+
+ @Test
+ public void testListFilesStatus() throws Exception {
+ doReturn(new
ListObjectsV2Result()).when(s3Client).listObjectsV2(Mockito.any(ListObjectsV2Request.class));
+ List<StorageEntity> result =
+ s3StorageOperator.listFilesStatus(FULL_NAME, DEFAULT_PATH,
TENANT_CODE_MOCK, ResourceType.FILE);
+ Assertions.assertEquals(0, result.size());
+ }
+
+ @Test
+ public void testListFilesStatusRecursively() throws Exception {
+ StorageEntity entity = new StorageEntity();
+ entity.setFullName(FULL_NAME);
+
+ doReturn(entity).when(s3StorageOperator).getFileStatus(FULL_NAME,
DEFAULT_PATH, TENANT_CODE_MOCK,
+ ResourceType.FILE);
+
doReturn(Collections.EMPTY_LIST).when(s3StorageOperator).listFilesStatus(anyString(),
anyString(), anyString(),
+ Mockito.any(ResourceType.class));
+
+ List<StorageEntity> result =
+ s3StorageOperator.listFilesStatusRecursively(FULL_NAME,
DEFAULT_PATH, TENANT_CODE_MOCK,
+ ResourceType.FILE);
+ Assertions.assertEquals(0, result.size());
+ }
+}