This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7214e0be0 [INLONG-7948][Manager] Add user authentication when operate 
inlong consume (#7949)
7214e0be0 is described below

commit 7214e0be078c20ef53b27bcc8f4666d205fbf38e
Author: fuweng11 <[email protected]>
AuthorDate: Mon May 1 15:13:58 2023 +0800

    [INLONG-7948][Manager] Add user authentication when operate inlong consume 
(#7949)
---
 .../consume/InlongConsumeProcessService.java       |  7 +++--
 .../service/consume/InlongConsumeService.java      |  3 +-
 .../service/consume/InlongConsumeServiceImpl.java  | 13 ++++++--
 .../manager/service/sink/StreamSinkService.java    |  3 +-
 .../service/sink/StreamSinkServiceImpl.java        | 35 +++++++++++++++-------
 .../service/consume/InlongConsumeServiceTest.java  |  6 ++--
 .../web/controller/InlongConsumeController.java    |  2 +-
 .../web/controller/StreamSinkController.java       |  2 +-
 8 files changed, 47 insertions(+), 24 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
index ddd942dc4..b1dd5595c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
@@ -47,12 +47,13 @@ public class InlongConsumeProcessService {
      */
     public WorkflowResult startProcess(Integer id, String operator) {
         consumeService.updateStatus(id, 
ConsumeStatus.TO_BE_APPROVAL.getCode(), operator);
-        return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, 
operator, genApplyConsumeProcessForm(id));
+        return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, 
operator,
+                genApplyConsumeProcessForm(id, operator));
     }
 
-    private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id) {
+    private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id, 
String operator) {
         ApplyConsumeProcessForm form = new ApplyConsumeProcessForm();
-        form.setConsumeInfo(consumeService.get(id));
+        form.setConsumeInfo(consumeService.get(id, operator));
         return form;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
index cb295cb22..72764878f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -53,9 +53,10 @@ public interface InlongConsumeService {
      * Get inlong consume info based on ID
      *
      * @param id inlong consume id
+     * @param currentUser currentUser
      * @return detail of inlong group
      */
-    InlongConsumeInfo get(Integer id);
+    InlongConsumeInfo get(Integer id, String currentUser);
 
     /**
      * Check whether the consumer group exists or not
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index f22cb1744..bb0b7608b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.service.user.UserService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -64,6 +65,8 @@ public class InlongConsumeServiceImpl implements 
InlongConsumeService {
     private InlongConsumeEntityMapper consumeMapper;
     @Autowired
     private InlongConsumeOperatorFactory consumeOperatorFactory;
+    @Autowired
+    private UserService userService;
 
     @Override
     public Integer save(InlongConsumeRequest request, String operator) {
@@ -131,13 +134,15 @@ public class InlongConsumeServiceImpl implements 
InlongConsumeService {
     }
 
     @Override
-    public InlongConsumeInfo get(Integer id) {
+    public InlongConsumeInfo get(Integer id, String currentUser) {
         Preconditions.expectNotNull(id, "inlong consume id cannot be null");
         InlongConsumeEntity entity = consumeMapper.selectById(id);
         if (entity == null) {
             LOGGER.error("inlong consume not found with id={}", id);
             throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
         }
+        userService.checkUser(entity.getInCharges(), currentUser,
+                "Current user does not have permission to get inlong consume");
 
         InlongConsumeOperator consumeOperator = 
consumeOperatorFactory.getInstance(entity.getMqType());
         InlongConsumeInfo consumeInfo = consumeOperator.getFromEntity(entity);
@@ -194,8 +199,8 @@ public class InlongConsumeServiceImpl implements 
InlongConsumeService {
         Integer consumeId = request.getId();
         InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId);
         Preconditions.expectNotNull(existEntity, "inlong consume not exist 
with id " + consumeId);
-        Preconditions.expectTrue(existEntity.getInCharges().contains(operator),
-                "operator" + operator + " has no privilege for the inlong 
consume");
+        userService.checkUser(existEntity.getInCharges(), operator,
+                "Current user does not have permission to update inlong 
consume");
 
         if (!Objects.equals(existEntity.getVersion(), request.getVersion())) {
             LOGGER.error(String.format("inlong consume has already updated, 
id=%s, curVersion=%s",
@@ -244,6 +249,8 @@ public class InlongConsumeServiceImpl implements 
InlongConsumeService {
         Preconditions.expectNotNull(id, "inlong consume id cannot be null");
         InlongConsumeEntity entity = consumeMapper.selectById(id);
         Preconditions.expectNotNull(entity, "inlong consume not exist with id 
" + id);
+        userService.checkUser(entity.getInCharges(), operator,
+                "Current user does not have permission to delete inlong 
consume");
 
         entity.setIsDeleted(id);
         entity.setStatus(ConsumeStatus.DELETED.getCode());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index bfaebf883..17cd18f2f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -116,9 +116,10 @@ public interface StreamSinkService {
      * Paging query stream sink info based on conditions.
      *
      * @param request paging request
+     * @param operator operator
      * @return sink page list
      */
-    PageResult<? extends StreamSink> listByCondition(SinkPageRequest request);
+    PageResult<? extends StreamSink> listByCondition(SinkPageRequest request, 
String operator);
 
     /**
      * Paging query stream sink info based on conditions.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index f369d2214..7a4180dff 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -24,6 +24,12 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -65,12 +71,6 @@ import 
org.springframework.beans.factory.config.AutowireCapableBeanFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import net.sf.jsqlparser.JSQLParserException;
-import net.sf.jsqlparser.parser.CCJSqlParserManager;
-import net.sf.jsqlparser.statement.Statement;
-import net.sf.jsqlparser.statement.create.table.ColDataType;
-import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
-import net.sf.jsqlparser.statement.create.table.CreateTable;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -81,14 +81,14 @@ import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
-import static 
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
-import static 
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
-import static 
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;
 
 /**
  * Implementation of sink service interface
@@ -315,15 +315,28 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     }
 
     @Override
-    public PageResult<? extends StreamSink> listByCondition(SinkPageRequest 
request) {
+    public PageResult<? extends StreamSink> listByCondition(SinkPageRequest 
request, String operator) {
         Preconditions.expectNotBlank(request.getInlongGroupId(), 
ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-
+        UserInfo userInfo = userService.getByName(operator);
+        boolean isAdmin = 
UserTypeEnum.ADMIN.getCode().equals(userInfo.getAccountType());
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         OrderFieldEnum.checkOrderField(request);
         OrderTypeEnum.checkOrderType(request);
         List<StreamSinkEntity> entityPage = 
sinkMapper.selectByCondition(request);
         Map<String, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
         for (StreamSinkEntity streamSink : entityPage) {
+            InlongGroupEntity groupEntity =
+                    groupMapper.selectByGroupId(streamSink.getInlongGroupId());
+            if (groupEntity == null) {
+                continue;
+            }
+            // only the person in charges can query
+            if (!isAdmin) {
+                List<String> inCharges = 
Arrays.asList(groupEntity.getInCharges().split(InlongConstants.COMMA));
+                if (!inCharges.contains(operator)) {
+                    continue;
+                }
+            }
             sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new 
Page<>()).add(streamSink);
         }
         List<StreamSink> responseList = Lists.newArrayList();
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
index 3742aaaac..f6f847ca8 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
@@ -73,7 +73,7 @@ public class InlongConsumeServiceTest extends ServiceBaseTest 
{
         Assertions.assertNotNull(consumeId);
 
         // test get operation
-        InlongConsumeInfo consumeInfo = this.testGet(consumeId);
+        InlongConsumeInfo consumeInfo = this.testGet(consumeId, 
GLOBAL_OPERATOR);
         Assertions.assertEquals(consumeInfo.getId(), consumeId);
 
         // test list operation
@@ -113,8 +113,8 @@ public class InlongConsumeServiceTest extends 
ServiceBaseTest {
         return consumeService.save(request, GLOBAL_OPERATOR);
     }
 
-    private InlongConsumeInfo testGet(Integer id) {
-        return consumeService.get(id);
+    private InlongConsumeInfo testGet(Integer id, String operator) {
+        return consumeService.get(id, operator);
     }
 
     private PageResult<InlongConsumeBriefInfo> testList() {
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
index 0dafa4765..98e1686b9 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -71,7 +71,7 @@ public class InlongConsumeController {
     @ApiOperation(value = "Get inlong consume")
     @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass 
= Integer.class, required = true)
     public Response<InlongConsumeInfo> get(@PathVariable(name = "id") Integer 
id) {
-        return Response.success(consumeService.get(id));
+        return Response.success(consumeService.get(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @GetMapping(value = "/consume/countStatus")
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 6faf55010..b761a9221 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -74,7 +74,7 @@ public class StreamSinkController {
     @RequestMapping(value = "/sink/list", method = RequestMethod.POST)
     @ApiOperation(value = "List stream sinks by paginating")
     public Response<PageResult<? extends StreamSink>> 
listByCondition(@RequestBody SinkPageRequest request) {
-        return Response.success(sinkService.listByCondition(request));
+        return Response.success(sinkService.listByCondition(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/sink/update", method = RequestMethod.POST)

Reply via email to