This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7cb5f1517e [INLONG-11368][Manager] Determine whether to issue a
streamSource based on the stream status (#11371)
7cb5f1517e is described below
commit 7cb5f1517ee55d2bf1483cca800f81af6444f0f2
Author: fuweng11 <[email protected]>
AuthorDate: Fri Oct 18 14:15:22 2024 +0800
[INLONG-11368][Manager] Determine whether to issue a streamSource based on
the stream status (#11371)
---
.../manager/service/group/GroupCheckService.java | 20 ++++++++++++++++++--
.../service/source/AbstractSourceOperator.java | 16 ++++++++--------
.../manager/service/source/StreamSourceOperator.java | 4 ++--
.../service/source/StreamSourceServiceImpl.java | 10 +++++++---
4 files changed, 35 insertions(+), 15 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
index fe1c7a4451..6824cba63b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.group;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -36,7 +38,7 @@ public class GroupCheckService {
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
- private UserEntityMapper userMapper;
+ private InlongStreamEntityMapper streamMapper;
/**
* Check whether the inlong group status is temporary
@@ -58,4 +60,18 @@ public class GroupCheckService {
return inlongGroupEntity;
}
+ public InlongStreamEntity checkStreamStatus(String groupId, String
streamId, String operator) {
+ InlongStreamEntity inlongStreamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
+ if (inlongStreamEntity == null) {
+ throw new BusinessException(
+ String.format("InlongStream does not exist with
groupId=%s, streamId=%s", groupId, streamId));
+ }
+
+ StreamStatus status =
StreamStatus.forCode(inlongStreamEntity.getStatus());
+ if (StreamStatus.notAllowedUpdate(status)) {
+ throw new
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
status));
+ }
+
+ return inlongStreamEntity;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 885fd2a99e..cdf6290281 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -32,8 +32,8 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -146,12 +146,12 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
@Override
@Transactional(rollbackFor = Throwable.class)
- public Integer saveOpt(SourceRequest request, Integer groupStatus, String
operator) {
+ public Integer saveOpt(SourceRequest request, Integer streamStatus, String
operator) {
StreamSourceEntity entity = CommonBeanUtils.copyProperties(request,
StreamSourceEntity::new);
if (SourceType.AUTO_PUSH.equals(request.getSourceType())) {
// auto push task needs not be issued to agent
entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
- } else if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ } else if
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
} else {
entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
@@ -166,7 +166,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
if (request.getEnableSyncSchema()) {
syncSourceFieldInfo(request, operator);
}
- if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ if
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
updateAgentTaskConfig(request, operator);
}
return entity.getId();
@@ -188,7 +188,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
@Override
@Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
- public void updateOpt(SourceRequest request, Integer groupStatus, Integer
groupMode, String operator) {
+ public void updateOpt(SourceRequest request, Integer streamStatus, Integer
groupMode, String operator) {
StreamSourceEntity entity =
sourceMapper.selectByIdForUpdate(request.getId());
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
@@ -242,7 +242,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
if (InlongConstants.STANDARD_MODE.equals(groupMode)) {
SourceStatus sourceStatus =
SourceStatus.forCode(entity.getStatus());
Integer nextStatus = entity.getStatus();
- if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ if
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
} else {
switch (SourceStatus.forCode(entity.getStatus())) {
@@ -267,7 +267,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
}
updateFieldOpt(entity, request.getFieldList());
LOGGER.debug("success to update source of type={}",
request.getSourceType());
- if
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ if
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
updateAgentTaskConfig(request, operator);
}
}
@@ -479,7 +479,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
return cmdConfig;
}).collect(Collectors.toList());
if (CollectionUtils.isEmpty(taskLists)) {
- agentTaskConfigEntity.setTaskParams(null);
+ agentTaskConfigEntity.setTaskParams("");
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
return;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index e820140ae6..07b0879330 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -50,11 +50,11 @@ public interface StreamSourceOperator {
* Save the source info.
*
* @param request request of source
- * @param groupStatus the belongs group status
+ * @param streamStatus the belongs stream status
* @param operator name of operator
* @return source id after saving
*/
- Integer saveOpt(SourceRequest request, Integer groupStatus, String
operator);
+ Integer saveOpt(SourceRequest request, Integer streamStatus, String
operator);
/**
* Get source info by the given entity.
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index de96386b25..e82d9d2aeb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -27,6 +27,7 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
@@ -103,8 +104,8 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
// Check if it can be added
String groupId = request.getInlongGroupId();
- InlongGroupEntity groupEntity =
groupCheckService.checkGroupStatus(groupId, operator);
String streamId = request.getInlongStreamId();
+ InlongStreamEntity streamEntity =
groupCheckService.checkStreamStatus(groupId, streamId, operator);
String sourceName = request.getSourceName();
List<StreamSourceEntity> existList =
sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
if (CollectionUtils.isNotEmpty(existList)) {
@@ -119,7 +120,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
if (CollectionUtils.isNotEmpty(streamFields)) {
streamFields.forEach(streamField -> streamField.setId(null));
}
- int id = sourceOperator.saveOpt(request, groupEntity.getStatus(),
operator);
+ int id = sourceOperator.saveOpt(request, streamEntity.getStatus(),
operator);
LOGGER.info("success to save source info: {}", request);
return id;
@@ -300,6 +301,9 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
String.format("InlongGroup does not exist with
InlongGroupId=%s", groupId));
}
+ String streamId = request.getInlongStreamId();
+ InlongStreamEntity streamEntity =
groupCheckService.checkStreamStatus(groupId, streamId, operator);
+
userService.checkUser(groupEntity.getInCharges(), operator,
"Current user does not have permission to update source info");
StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
@@ -308,7 +312,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
if (CollectionUtils.isNotEmpty(streamFields)) {
streamFields.forEach(streamField -> streamField.setId(null));
}
- sourceOperator.updateOpt(request, groupEntity.getStatus(),
groupEntity.getInlongGroupMode(), operator);
+ sourceOperator.updateOpt(request, streamEntity.getStatus(),
groupEntity.getInlongGroupMode(), operator);
LOGGER.info("success to update source info: {}", request);
return true;