This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 17bf66b4a4c2ffa9a7e593a314f06beaaf211833
Author: fuweng11 <[email protected]>
AuthorDate: Tue Jul 11 20:09:04 2023 +0800

    [INLONG-8498][Manager] Add permission verification for 
InlongStreamProcessService (#8499)
---
 .../manager/service/core/impl/AuditServiceImpl.java      |  2 +-
 .../service/stream/InlongStreamProcessService.java       | 15 +++++++++++++++
 .../controller/openapi/OpenInLongStreamController.java   | 16 ++++++++++++++++
 3 files changed, 32 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 1552b13aa8..d8c85413ce 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -265,7 +265,7 @@ public class AuditServiceImpl implements AuditService {
                     AuditInfo vo = new AuditInfo();
                     vo.setLogTs((String) s.get("logTs"));
                     vo.setCount(((BigDecimal) s.get("total")).longValue());
-                    vo.setCount(((BigDecimal) 
s.get("total_delay")).longValue());
+                    vo.setDelay(((BigDecimal) 
s.get("total_delay")).longValue());
                     return vo;
                 }).collect(Collectors.toList());
                 result.add(new AuditVO(auditId, auditSet,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index 8f87a8e957..ffb1dbbf56 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.pojo.user.UserInfo;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.service.user.UserService;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -72,6 +73,8 @@ public class InlongStreamProcessService {
     private InlongStreamService streamService;
     @Autowired
     private WorkflowService workflowService;
+    @Autowired
+    private UserService userService;
 
     /**
      * Create stream in synchronous/asynchronous way.
@@ -87,6 +90,9 @@ public class InlongStreamProcessService {
                     + " for groupId=%s", groupStatus, groupId));
         }
 
+        // only the person in charges can start process
+        userService.checkUser(groupInfo.getInCharges(), operator, 
ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
+
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         Preconditions.expectNotNull(streamInfo, 
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
@@ -131,6 +137,9 @@ public class InlongStreamProcessService {
                     + " for groupId=%s", groupStatus, groupId));
         }
 
+        // only the person in charges can suspend process
+        userService.checkUser(groupInfo.getInCharges(), operator, 
ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
+
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         Preconditions.expectNotNull(streamInfo, 
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
@@ -172,6 +181,8 @@ public class InlongStreamProcessService {
             throw new BusinessException(
                     String.format("group status=%s not support restart stream 
for groupId=%s", groupStatus, groupId));
         }
+        // only the person in charges can restart process
+        userService.checkUser(groupInfo.getInCharges(), operator, 
ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
 
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         Preconditions.expectNotNull(streamInfo, 
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
@@ -212,6 +223,10 @@ public class InlongStreamProcessService {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
                     ErrorCodeEnum.GROUP_NOT_FOUND.getMessage() + " : " + 
groupId);
         }
+
+        // only the person in charges can delete process
+        userService.checkUser(groupInfo.getInCharges(), operator, 
ErrorCodeEnum.GROUP_PERMISSION_DENIED.getMessage());
+
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
         if (GroupStatus.notAllowedTransition(groupStatus, 
GroupStatus.DELETING)) {
             throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED,
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index ba929549d1..5e8f87fa3b 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -28,6 +28,7 @@ import 
org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 
 import io.swagger.annotations.Api;
@@ -36,6 +37,7 @@ import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
@@ -54,6 +56,8 @@ public class OpenInLongStreamController {
 
     @Autowired
     private InlongStreamService streamService;
+    @Autowired
+    private InlongStreamProcessService streamProcessOperation;
 
     @RequestMapping(value = "/stream/get", method = RequestMethod.GET)
     @ApiOperation(value = "Get inlong stream")
@@ -121,4 +125,16 @@ public class OpenInLongStreamController {
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamService.delete(groupId, streamId, 
LoginUserUtils.getLoginUser()));
     }
+
+    @RequestMapping(value = "/stream/startProcess/{groupId}/{streamId}", 
method = RequestMethod.POST)
+    @ApiOperation(value = "Start inlong stream process")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, 
required = true),
+            @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, 
required = true)
+    })
+    public Response<Boolean> startProcess(@PathVariable String groupId, 
@PathVariable String streamId,
+            @RequestParam boolean sync) {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return Response.success(streamProcessOperation.startProcess(groupId, 
streamId, operator, sync));
+    }
 }

Reply via email to