This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7214e0be0 [INLONG-7948][Manager] Add user authentication when operate
inlong consume (#7949)
7214e0be0 is described below
commit 7214e0be078c20ef53b27bcc8f4666d205fbf38e
Author: fuweng11 <[email protected]>
AuthorDate: Mon May 1 15:13:58 2023 +0800
[INLONG-7948][Manager] Add user authentication when operate inlong consume
(#7949)
---
.../consume/InlongConsumeProcessService.java | 7 +++--
.../service/consume/InlongConsumeService.java | 3 +-
.../service/consume/InlongConsumeServiceImpl.java | 13 ++++++--
.../manager/service/sink/StreamSinkService.java | 3 +-
.../service/sink/StreamSinkServiceImpl.java | 35 +++++++++++++++-------
.../service/consume/InlongConsumeServiceTest.java | 6 ++--
.../web/controller/InlongConsumeController.java | 2 +-
.../web/controller/StreamSinkController.java | 2 +-
8 files changed, 47 insertions(+), 24 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
index ddd942dc4..b1dd5595c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
@@ -47,12 +47,13 @@ public class InlongConsumeProcessService {
*/
public WorkflowResult startProcess(Integer id, String operator) {
consumeService.updateStatus(id,
ConsumeStatus.TO_BE_APPROVAL.getCode(), operator);
- return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS,
operator, genApplyConsumeProcessForm(id));
+ return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS,
operator,
+ genApplyConsumeProcessForm(id, operator));
}
- private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id) {
+ private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id,
String operator) {
ApplyConsumeProcessForm form = new ApplyConsumeProcessForm();
- form.setConsumeInfo(consumeService.get(id));
+ form.setConsumeInfo(consumeService.get(id, operator));
return form;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
index cb295cb22..72764878f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -53,9 +53,10 @@ public interface InlongConsumeService {
* Get inlong consume info based on ID
*
* @param id inlong consume id
+ * @param currentUser currentUser
* @return detail of inlong group
*/
- InlongConsumeInfo get(Integer id);
+ InlongConsumeInfo get(Integer id, String currentUser);
/**
* Check whether the consumer group exists or not
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index f22cb1744..bb0b7608b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -37,6 +37,7 @@ import
org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.service.user.UserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -64,6 +65,8 @@ public class InlongConsumeServiceImpl implements
InlongConsumeService {
private InlongConsumeEntityMapper consumeMapper;
@Autowired
private InlongConsumeOperatorFactory consumeOperatorFactory;
+ @Autowired
+ private UserService userService;
@Override
public Integer save(InlongConsumeRequest request, String operator) {
@@ -131,13 +134,15 @@ public class InlongConsumeServiceImpl implements
InlongConsumeService {
}
@Override
- public InlongConsumeInfo get(Integer id) {
+ public InlongConsumeInfo get(Integer id, String currentUser) {
Preconditions.expectNotNull(id, "inlong consume id cannot be null");
InlongConsumeEntity entity = consumeMapper.selectById(id);
if (entity == null) {
LOGGER.error("inlong consume not found with id={}", id);
throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
}
+ userService.checkUser(entity.getInCharges(), currentUser,
+ "Current user does not have permission to get inlong consume");
InlongConsumeOperator consumeOperator =
consumeOperatorFactory.getInstance(entity.getMqType());
InlongConsumeInfo consumeInfo = consumeOperator.getFromEntity(entity);
@@ -194,8 +199,8 @@ public class InlongConsumeServiceImpl implements
InlongConsumeService {
Integer consumeId = request.getId();
InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId);
Preconditions.expectNotNull(existEntity, "inlong consume not exist
with id " + consumeId);
- Preconditions.expectTrue(existEntity.getInCharges().contains(operator),
- "operator" + operator + " has no privilege for the inlong
consume");
+ userService.checkUser(existEntity.getInCharges(), operator,
+ "Current user does not have permission to update inlong
consume");
if (!Objects.equals(existEntity.getVersion(), request.getVersion())) {
LOGGER.error(String.format("inlong consume has already updated,
id=%s, curVersion=%s",
@@ -244,6 +249,8 @@ public class InlongConsumeServiceImpl implements
InlongConsumeService {
Preconditions.expectNotNull(id, "inlong consume id cannot be null");
InlongConsumeEntity entity = consumeMapper.selectById(id);
Preconditions.expectNotNull(entity, "inlong consume not exist with id
" + id);
+ userService.checkUser(entity.getInCharges(), operator,
+ "Current user does not have permission to delete inlong
consume");
entity.setIsDeleted(id);
entity.setStatus(ConsumeStatus.DELETED.getCode());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index bfaebf883..17cd18f2f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -116,9 +116,10 @@ public interface StreamSinkService {
* Paging query stream sink info based on conditions.
*
* @param request paging request
+ * @param operator operator
* @return sink page list
*/
- PageResult<? extends StreamSink> listByCondition(SinkPageRequest request);
+ PageResult<? extends StreamSink> listByCondition(SinkPageRequest request,
String operator);
/**
* Paging query stream sink info based on conditions.
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index f369d2214..7a4180dff 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -24,6 +24,12 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -65,12 +71,6 @@ import
org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import net.sf.jsqlparser.JSQLParserException;
-import net.sf.jsqlparser.parser.CCJSqlParserManager;
-import net.sf.jsqlparser.statement.Statement;
-import net.sf.jsqlparser.statement.create.table.ColDataType;
-import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
-import net.sf.jsqlparser.statement.create.table.CreateTable;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
@@ -81,14 +81,14 @@ import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;
import static
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
import static
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
-import static
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_COMMENT_PROP;
-import static
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_NAME_PROP;
-import static
org.apache.inlong.manager.common.consts.InlongConstants.BATCH_PARSING_FILED_JSON_TYPE_PROP;
/**
* Implementation of sink service interface
@@ -315,15 +315,28 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
}
@Override
- public PageResult<? extends StreamSink> listByCondition(SinkPageRequest
request) {
+ public PageResult<? extends StreamSink> listByCondition(SinkPageRequest
request, String operator) {
Preconditions.expectNotBlank(request.getInlongGroupId(),
ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-
+ UserInfo userInfo = userService.getByName(operator);
+ boolean isAdmin =
UserTypeEnum.ADMIN.getCode().equals(userInfo.getAccountType());
PageHelper.startPage(request.getPageNum(), request.getPageSize());
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
List<StreamSinkEntity> entityPage =
sinkMapper.selectByCondition(request);
Map<String, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
for (StreamSinkEntity streamSink : entityPage) {
+ InlongGroupEntity groupEntity =
+ groupMapper.selectByGroupId(streamSink.getInlongGroupId());
+ if (groupEntity == null) {
+ continue;
+ }
+ // only the person in charges can query
+ if (!isAdmin) {
+ List<String> inCharges =
Arrays.asList(groupEntity.getInCharges().split(InlongConstants.COMMA));
+ if (!inCharges.contains(operator)) {
+ continue;
+ }
+ }
sinkMap.computeIfAbsent(streamSink.getSinkType(), k -> new
Page<>()).add(streamSink);
}
List<StreamSink> responseList = Lists.newArrayList();
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
index 3742aaaac..f6f847ca8 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
@@ -73,7 +73,7 @@ public class InlongConsumeServiceTest extends ServiceBaseTest
{
Assertions.assertNotNull(consumeId);
// test get operation
- InlongConsumeInfo consumeInfo = this.testGet(consumeId);
+ InlongConsumeInfo consumeInfo = this.testGet(consumeId,
GLOBAL_OPERATOR);
Assertions.assertEquals(consumeInfo.getId(), consumeId);
// test list operation
@@ -113,8 +113,8 @@ public class InlongConsumeServiceTest extends
ServiceBaseTest {
return consumeService.save(request, GLOBAL_OPERATOR);
}
- private InlongConsumeInfo testGet(Integer id) {
- return consumeService.get(id);
+ private InlongConsumeInfo testGet(Integer id, String operator) {
+ return consumeService.get(id, operator);
}
private PageResult<InlongConsumeBriefInfo> testList() {
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
index 0dafa4765..98e1686b9 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -71,7 +71,7 @@ public class InlongConsumeController {
@ApiOperation(value = "Get inlong consume")
@ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass
= Integer.class, required = true)
public Response<InlongConsumeInfo> get(@PathVariable(name = "id") Integer
id) {
- return Response.success(consumeService.get(id));
+ return Response.success(consumeService.get(id,
LoginUserUtils.getLoginUser().getName()));
}
@GetMapping(value = "/consume/countStatus")
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 6faf55010..b761a9221 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -74,7 +74,7 @@ public class StreamSinkController {
@RequestMapping(value = "/sink/list", method = RequestMethod.POST)
@ApiOperation(value = "List stream sinks by paginating")
public Response<PageResult<? extends StreamSink>>
listByCondition(@RequestBody SinkPageRequest request) {
- return Response.success(sinkService.listByCondition(request));
+ return Response.success(sinkService.listByCondition(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/sink/update", method = RequestMethod.POST)