This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch 1.3.2-release
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.2-release by this push:
new 88bc30b [Improvement-3327][api]support re-upload the resource file
(#3395)
88bc30b is described below
commit 88bc30b32b9af8dc1e8b306fb7808175e50f8fab
Author: lgcareer <[email protected]>
AuthorDate: Tue Aug 4 14:55:49 2020 +0800
[Improvement-3327][api]support re-upload the resource file (#3395)
---
.../api/controller/ResourcesController.java | 35 ++++-----
.../api/service/ResourcesService.java | 91 ++++++++++++++++++----
.../api/service/ResourcesServiceTest.java | 19 +++--
3 files changed, 102 insertions(+), 43 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
index cc09b2d..2af8ccd 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
@@ -16,6 +16,11 @@
*/
package org.apache.dolphinscheduler.api.controller;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ResourcesService;
@@ -26,11 +31,6 @@ import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
-import io.swagger.annotations.ApiOperation;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -63,23 +63,15 @@ public class ResourcesController extends BaseController {
private UdfFuncService udfFuncService;
/**
- * create resource
+ * create directory
*
- * @param loginUser login user
- * @param alias alias
- * @param description description
- * @param type type
- * @return create result code
- */
-
- /**
* @param loginUser login user
* @param type type
* @param alias alias
* @param description description
* @param pid parent id
* @param currentDir current directory
- * @return
+ * @return create result code
*/
@ApiOperation(value = "createDirctory", notes = "CREATE_RESOURCE_NOTES")
@ApiImplicitParams({
@@ -140,6 +132,7 @@ public class ResourcesController extends BaseController {
* @param resourceId resource id
* @param type resource type
* @param description description
+ * @param file resource file
* @return update result code
*/
@ApiOperation(value = "updateResource", notes = "UPDATE_RESOURCE_NOTES")
@@ -147,7 +140,8 @@ public class ResourcesController extends BaseController {
@ApiImplicitParam(name = "id", value = "RESOURCE_ID", required =
true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required
= true, dataType = "ResourceType"),
@ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required
= true, dataType = "String"),
- @ApiImplicitParam(name = "description", value = "RESOURCE_DESC",
dataType = "String")
+ @ApiImplicitParam(name = "description", value = "RESOURCE_DESC",
dataType = "String"),
+ @ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required
= true, dataType = "MultipartFile")
})
@PostMapping(value = "/update")
@ApiException(UPDATE_RESOURCE_ERROR)
@@ -155,10 +149,11 @@ public class ResourcesController extends BaseController {
@RequestParam(value = "id") int resourceId,
@RequestParam(value = "type") ResourceType
type,
@RequestParam(value = "name") String alias,
- @RequestParam(value = "description", required
= false) String description) {
- logger.info("login user {}, update resource, type: {}, resource alias:
{}, desc: {}",
- loginUser.getUserName(), type, alias, description);
- return resourceService.updateResource(loginUser, resourceId, alias,
description, type);
+ @RequestParam(value = "description", required
= false) String description,
+ @RequestParam(value = "file" ,required =
false) MultipartFile file) {
+ logger.info("login user {}, update resource, type: {}, resource alias:
{}, desc: {}, file: {}",
+ loginUser.getUserName(), type, alias, description, file);
+ return resourceService.updateResource(loginUser, resourceId, alias,
description, type, file);
}
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index 15b106f..e0ee711 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -287,6 +287,7 @@ public class ResourcesService extends BaseService {
* @param name name
* @param desc description
* @param type resource type
+ * @param file resource file
* @return update result code
*/
@Transactional(rollbackFor = Exception.class)
@@ -294,7 +295,8 @@ public class ResourcesService extends BaseService {
int resourceId,
String name,
String desc,
- ResourceType type) {
+ ResourceType type,
+ MultipartFile file) {
Result result = new Result();
// if resource upload startup
@@ -330,6 +332,42 @@ public class ResourcesService extends BaseService {
return result;
}
+ if (file != null) {
+
+ // file is empty
+ if (file.isEmpty()) {
+ logger.error("file is empty: {}", file.getOriginalFilename());
+ putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
+ return result;
+ }
+
+ // file suffix
+ String fileSuffix = FileUtils.suffix(file.getOriginalFilename());
+ String nameSuffix = FileUtils.suffix(name);
+
+ // determine file suffix
+ if (!(StringUtils.isNotEmpty(fileSuffix) &&
fileSuffix.equalsIgnoreCase(nameSuffix))) {
+ /**
+ * rename file suffix and original suffix must be consistent
+ */
+ logger.error("rename file suffix and original suffix must be
consistent: {}", file.getOriginalFilename());
+ putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
+ return result;
+ }
+
+ //If resource type is UDF, only jar packages are allowed to be
uploaded, and the suffix must be .jar
+ if (Constants.UDF.equals(type.name()) &&
!JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) {
+ logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
+ putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
+ return result;
+ }
+ if (file.getSize() > Constants.MAX_FILE_SIZE) {
+ logger.error("file size is too large: {}",
file.getOriginalFilename());
+ putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
+ return result;
+ }
+ }
+
// query tenant by user id
String tenantCode = getTenantCode(resource.getUserId(),result);
if (StringUtils.isEmpty(tenantCode)){
@@ -379,26 +417,33 @@ public class ResourcesService extends BaseService {
}
// updateResource data
- List<Integer> childrenResource = listAllChildren(resource,false);
+
Date now = new Date();
resource.setAlias(name);
resource.setFullName(fullName);
resource.setDescription(desc);
resource.setUpdateTime(now);
+ if (file != null) {
+ resource.setFileName(file.getOriginalFilename());
+ resource.setSize(file.getSize());
+ }
try {
resourcesMapper.updateById(resource);
- if (resource.isDirectory() &&
CollectionUtils.isNotEmpty(childrenResource)) {
- String matcherFullName = Matcher.quoteReplacement(fullName);
- List<Resource> childResourceList = new ArrayList<>();
- List<Resource> resourceList =
resourcesMapper.listResourceByIds(childrenResource.toArray(new
Integer[childrenResource.size()]));
- childResourceList = resourceList.stream().map(t -> {
- t.setFullName(t.getFullName().replaceFirst(originFullName,
matcherFullName));
- t.setUpdateTime(now);
- return t;
- }).collect(Collectors.toList());
- resourcesMapper.batchUpdateResource(childResourceList);
+ if (resource.isDirectory()) {
+ List<Integer> childrenResource =
listAllChildren(resource,false);
+ if (CollectionUtils.isNotEmpty(childrenResource)) {
+ String matcherFullName =
Matcher.quoteReplacement(fullName);
+ List<Resource> childResourceList = new ArrayList<>();
+ List<Resource> resourceList =
resourcesMapper.listResourceByIds(childrenResource.toArray(new
Integer[childrenResource.size()]));
+ childResourceList = resourceList.stream().map(t -> {
+
t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName));
+ t.setUpdateTime(now);
+ return t;
+ }).collect(Collectors.toList());
+ resourcesMapper.batchUpdateResource(childResourceList);
+ }
}
putMsg(result, Status.SUCCESS);
@@ -414,11 +459,31 @@ public class ResourcesService extends BaseService {
logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e);
throw new ServiceException(Status.UPDATE_RESOURCE_ERROR);
}
+
// if name unchanged, return directly without moving on HDFS
- if (originResourceName.equals(name)) {
+ if (originResourceName.equals(name) && file == null) {
return result;
}
+ if (file != null) {
+ // fail upload
+ if (!upload(loginUser, fullName, file, type)) {
+ logger.error("upload resource: {} file: {} failed.", name,
file.getOriginalFilename());
+ putMsg(result, Status.HDFS_OPERATION_ERROR);
+ throw new RuntimeException(String.format("upload resource: %s
file: %s failed.", name, file.getOriginalFilename()));
+ }
+ if (!fullName.equals(originFullName)) {
+ try {
+ HadoopUtils.getInstance().delete(originHdfsFileName,false);
+ } catch (IOException e) {
+ logger.error(e.getMessage(),e);
+ throw new RuntimeException(String.format("delete resource:
%s failed.", originFullName));
+ }
+ }
+ return result;
+ }
+
+
// get the path of dest file in hdfs
String destHdfsFileName =
HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
index 407f6b5..1e3c95d 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@@ -159,19 +158,19 @@ public class ResourcesServiceTest {
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = new User();
//HDFS_NOT_STARTUP
- Result result =
resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
+ Result result =
resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg());
//RESOURCE_NOT_EXIST
Mockito.when(resourcesMapper.selectById(1)).thenReturn(getResource());
PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
- result =
resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
+ result =
resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//USER_NO_OPERATION_PERM
- result =
resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE);
+ result =
resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(),result.getMsg());
@@ -186,7 +185,7 @@ public class ResourcesServiceTest {
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
- result = resourcesService.updateResource(user, 1,
"ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF);
+ result = resourcesService.updateResource(user, 1,
"ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF,null);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg());
//SUCCESS
@@ -199,25 +198,25 @@ public class ResourcesServiceTest {
logger.error(e.getMessage(),e);
}
- result =
resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE);
+ result =
resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
//RESOURCE_EXIST
Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0,
0)).thenReturn(getResourceList());
- result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE);
+ result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE,null);
logger.info(result.toString());
Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg());
//USER_NOT_EXIST
Mockito.when(userMapper.selectById(Mockito.anyInt())).thenReturn(null);
- result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
+ result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode());
//TENANT_NOT_EXIST
Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null);
- result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF);
+ result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg());
@@ -231,7 +230,7 @@ public class ResourcesServiceTest {
logger.error(e.getMessage(),e);
}
- result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF);
+ result =
resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF,null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());