This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 17bf66b4a4c2ffa9a7e593a314f06beaaf211833 Author: fuweng11 <[email protected]> AuthorDate: Tue Jul 11 20:09:04 2023 +0800 [INLONG-8498][Manager] Add permission verification for InlongStreamProcessService (#8499) --- .../manager/service/core/impl/AuditServiceImpl.java | 2 +- .../service/stream/InlongStreamProcessService.java | 15 +++++++++++++++ .../controller/openapi/OpenInLongStreamController.java | 16 ++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 1552b13aa8..d8c85413ce 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -265,7 +265,7 @@ public class AuditServiceImpl implements AuditService { AuditInfo vo = new AuditInfo(); vo.setLogTs((String) s.get("logTs")); vo.setCount(((BigDecimal) s.get("total")).longValue()); - vo.setCount(((BigDecimal) s.get("total_delay")).longValue()); + vo.setDelay(((BigDecimal) s.get("total_delay")).longValue()); return vo; }).collect(Collectors.toList()); result.add(new AuditVO(auditId, auditSet, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java index 8f87a8e957..ffb1dbbf56 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java @@ -32,6 +32,7 @@ import org.apache.inlong.manager.pojo.user.UserInfo; import org.apache.inlong.manager.pojo.workflow.WorkflowResult; import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.service.group.InlongGroupService; +import org.apache.inlong.manager.service.user.UserService; import org.apache.inlong.manager.service.workflow.WorkflowService; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -72,6 +73,8 @@ public class InlongStreamProcessService { private InlongStreamService streamService; @Autowired private WorkflowService workflowService; + @Autowired + private UserService userService; /** * Create stream in synchronous/asynchronous way. @@ -87,6 +90,9 @@ public class InlongStreamProcessService { + " for groupId=%s", groupStatus, groupId)); } + // only the person in charges can start process + userService.checkUser(groupInfo.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage()); + InlongStreamInfo streamInfo = streamService.get(groupId, streamId); Preconditions.expectNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage()); StreamStatus status = StreamStatus.forCode(streamInfo.getStatus()); @@ -131,6 +137,9 @@ public class InlongStreamProcessService { + " for groupId=%s", groupStatus, groupId)); } + // only the person in charges can suspend process + userService.checkUser(groupInfo.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage()); + InlongStreamInfo streamInfo = streamService.get(groupId, streamId); Preconditions.expectNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage()); StreamStatus status = StreamStatus.forCode(streamInfo.getStatus()); @@ -172,6 +181,8 @@ public class InlongStreamProcessService { throw new BusinessException( String.format("group status=%s not support restart stream for groupId=%s", groupStatus, groupId)); } + // only the person in charges can restart process + userService.checkUser(groupInfo.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage()); InlongStreamInfo streamInfo = streamService.get(groupId, streamId); Preconditions.expectNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage()); @@ -212,6 +223,10 @@ public class InlongStreamProcessService { throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage() + " : " + groupId); } + + // only the person in charges can delete process + userService.checkUser(groupInfo.getInCharges(), operator, ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage()); + GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus()); if (GroupStatus.notAllowedTransition(groupStatus, GroupStatus.DELETING)) { throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java index ba929549d1..5e8f87fa3b 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java @@ -28,6 +28,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamRequest; import org.apache.inlong.manager.pojo.user.LoginUserUtils; import org.apache.inlong.manager.service.operationlog.OperationLog; +import org.apache.inlong.manager.service.stream.InlongStreamProcessService; import org.apache.inlong.manager.service.stream.InlongStreamService; import io.swagger.annotations.Api; @@ -36,6 +37,7 @@ import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -54,6 +56,8 @@ public class OpenInLongStreamController { @Autowired private InlongStreamService streamService; + @Autowired + private InlongStreamProcessService streamProcessOperation; @RequestMapping(value = "/stream/get", method = RequestMethod.GET) @ApiOperation(value = "Get inlong stream") @@ -121,4 +125,16 @@ public class OpenInLongStreamController { Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY); return Response.success(streamService.delete(groupId, streamId, LoginUserUtils.getLoginUser())); } + + @RequestMapping(value = "/stream/startProcess/{groupId}/{streamId}", method = RequestMethod.POST) + @ApiOperation(value = "Start inlong stream process") + @ApiImplicitParams({ + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), + @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true) + }) + public Response<Boolean> startProcess(@PathVariable String groupId, @PathVariable String streamId, + @RequestParam boolean sync) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(streamProcessOperation.startProcess(groupId, streamId, operator, sync)); + } }
