This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b8da024cde [INLONG-10484][Manager] Refactor code in manager-service
module (#10485)
b8da024cde is described below
commit b8da024cde85b0d9a0aa74a2d6e8c4d8efbbc796
Author: AloysZhang <[email protected]>
AuthorDate: Sat Jun 22 10:57:58 2024 +0800
[INLONG-10484][Manager] Refactor code in manager-service module (#10485)
---
.../service/cluster/InlongClusterService.java | 120 --------
.../service/cluster/InlongClusterServiceImpl.java | 331 ---------------------
.../manager/service/group/InlongGroupService.java | 9 -
.../service/group/InlongGroupServiceImpl.java | 20 --
.../manager/service/sink/StreamSinkService.java | 37 ---
.../service/sink/StreamSinkServiceImpl.java | 174 +----------
.../service/source/StreamSourceService.java | 36 ---
.../service/source/StreamSourceServiceImpl.java | 130 +-------
.../service/stream/InlongStreamService.java | 41 ---
.../service/stream/InlongStreamServiceImpl.java | 151 ----------
.../service/transform/StreamTransformService.java | 38 ---
.../transform/StreamTransformServiceImpl.java | 122 --------
.../web/controller/InlongStreamController.java | 2 +-
.../web/controller/StreamSinkController.java | 2 +-
.../web/controller/StreamSourceController.java | 2 +-
.../openapi/OpenInLongClusterController.java | 27 +-
.../openapi/OpenInLongGroupController.java | 2 +-
.../openapi/OpenInLongStreamController.java | 8 +-
.../openapi/OpenStreamSinkController.java | 8 +-
.../openapi/OpenStreamSourceController.java | 8 +-
.../openapi/OpenStreamTransformController.java | 9 +-
21 files changed, 44 insertions(+), 1233 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 9b7dac0ea2..22282ba67e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -52,15 +52,6 @@ public interface InlongClusterService {
*/
Integer saveTag(ClusterTagRequest request, String operator);
- /**
- * Save cluster tag.
- *
- * @param request cluster tag
- * @param opInfo userinfo of operator
- * @return cluster tag id after saving
- */
- Integer saveTag(ClusterTagRequest request, UserInfo opInfo);
-
/**
* Get cluster tag by id.
*
@@ -70,15 +61,6 @@ public interface InlongClusterService {
*/
ClusterTagResponse getTag(Integer id, String currentUser);
- /**
- * Get cluster tag by id.
- *
- * @param id cluster tag id
- * @param opInfo userinfo of operator
- * @return cluster tag info
- */
- ClusterTagResponse getTag(Integer id, UserInfo opInfo);
-
/**
* Paging query cluster tags according to conditions.
*
@@ -105,15 +87,6 @@ public interface InlongClusterService {
*/
Boolean updateTag(ClusterTagRequest request, String operator);
- /**
- * Update cluster tag.
- *
- * @param request cluster tag to be modified
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean updateTag(ClusterTagRequest request, UserInfo opInfo);
-
/**
* Delete cluster tag.
*
@@ -123,15 +96,6 @@ public interface InlongClusterService {
*/
Boolean deleteTag(Integer id, String operator);
- /**
- * Delete cluster tag.
- *
- * @param id cluster tag id to be deleted
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean deleteTag(Integer id, UserInfo opInfo);
-
/**
* Save cluster info.
*
@@ -141,15 +105,6 @@ public interface InlongClusterService {
*/
Integer save(ClusterRequest request, String operator);
- /**
- * Save cluster info.
- *
- * @param request inlong cluster info
- * @param opInfo userinfo of operator
- * @return cluster id after saving
- */
- Integer save(ClusterRequest request, UserInfo opInfo);
-
/**
* Get cluster info by id.
*
@@ -159,15 +114,6 @@ public interface InlongClusterService {
*/
ClusterInfo get(Integer id, String currentUser);
- /**
- * Get cluster info by id.
- *
- * @param id cluster id
- * @param opInfo userinfo of operator
- * @return cluster info
- */
- ClusterInfo get(Integer id, UserInfo opInfo);
-
/**
* Get one cluster by the cluster tag, cluster name and cluster type.
*
@@ -214,15 +160,6 @@ public interface InlongClusterService {
*/
Boolean update(ClusterRequest request, String operator);
- /**
- * Update cluster information
- *
- * @param request cluster info to be modified
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean update(ClusterRequest request, UserInfo opInfo);
-
/**
* Update cluster information by unique key
*
@@ -241,15 +178,6 @@ public interface InlongClusterService {
*/
Boolean bindTag(BindTagRequest request, String operator);
- /**
- * Bind or unbind cluster tag for clusters.
- *
- * @param request cluster info to be modified
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean bindTag(BindTagRequest request, UserInfo opInfo);
-
/**
* Delete cluster information.
*
@@ -287,15 +215,6 @@ public interface InlongClusterService {
*/
Integer saveNode(ClusterNodeRequest request, String operator);
- /**
- * Save cluster node info.
- *
- * @param request inlong cluster info
- * @param opInfo userinfo of operator
- * @return cluster id after saving
- */
- Integer saveNode(ClusterNodeRequest request, UserInfo opInfo);
-
/**
* Get cluster node info by id.
*
@@ -305,15 +224,6 @@ public interface InlongClusterService {
*/
ClusterNodeResponse getNode(Integer id, String currentUser);
- /**
- * Get cluster node info by id.
- *
- * @param id cluster id
- * @param opInfo userinfo of operator
- * @return cluster info
- */
- ClusterNodeResponse getNode(Integer id, UserInfo opInfo);
-
/**
* Paging query cluster nodes according to conditions.
*
@@ -342,18 +252,6 @@ public interface InlongClusterService {
*/
List<ClusterNodeResponse> listNodeByGroupId(String inlongGroupId, String
clusterType, String protocolType);
- /**
- * List cluster nodes
- *
- * @param inlongGroupId inlong group id
- * @param clusterType cluster type
- * @param protocolType protocol type, such as: TCP, HTTP
- * @param opInfo userinfo of operator
- * @return cluster node list
- */
- List<ClusterNodeResponse> listNodeByGroupId(
- String inlongGroupId, String clusterType, String protocolType,
UserInfo opInfo);
-
/**
* Query node IP list by cluster type
*
@@ -371,15 +269,6 @@ public interface InlongClusterService {
*/
Boolean updateNode(ClusterNodeRequest request, String operator);
- /**
- * Update cluster node.
- *
- * @param request cluster node to be modified
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean updateNode(ClusterNodeRequest request, UserInfo opInfo);
-
/**
* Delete cluster node.
*
@@ -391,15 +280,6 @@ public interface InlongClusterService {
Boolean unloadNode(Integer id, String operator);
- /**
- * Delete cluster node.
- *
- * @param id cluster node id to be deleted
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean deleteNode(Integer id, UserInfo opInfo);
-
/**
* Retrieves the SSH public key from the manager node for agent
installation.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index a85695d6fd..5be7ffd3b1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -193,24 +193,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return entity.getId();
}
- @Override
- public Integer saveTag(ClusterTagRequest request, UserInfo opInfo) {
- // check if the cluster tag already exist
- InlongClusterTagEntity exist =
clusterTagMapper.selectByTag(request.getClusterTag());
- if (exist != null) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("inlong cluster tag [%s] already exist",
request.getClusterTag()));
- }
- InlongClusterTagEntity entity =
CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
- request.setExtParams(entity.getExtParams());
- String extParam = packExtParams(request);
- entity.setExtParams(extParam);
- entity.setCreator(opInfo.getName());
- entity.setModifier(opInfo.getName());
- clusterTagMapper.insert(entity);
- return entity.getId();
- }
-
@Override
public ClusterTagResponse getTag(Integer id, String currentUser) {
Preconditions.expectNotNull(id, "inlong cluster tag id cannot be
empty");
@@ -233,26 +215,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return response;
}
- @Override
- public ClusterTagResponse getTag(Integer id, UserInfo opInfo) {
- InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
- String.format("inlong cluster tag not found by id=%s",
id));
- }
- ClusterTagResponse response = CommonBeanUtils.copyProperties(entity,
ClusterTagResponse::new);
- unpackExtParams(response);
-
- List<String> tenantList = tenantClusterTagMapper
- .selectByTag(entity.getClusterTag()).stream()
- .map(TenantClusterTagEntity::getTenant)
- .collect(Collectors.toList());
- response.setTenantList(tenantList);
-
- LOGGER.debug("success to get cluster tag info by id={}", id);
- return response;
- }
-
@Override
public PageResult<ClusterTagResponse> listTag(ClusterTagPageRequest
request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
@@ -395,65 +357,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
- public Boolean updateTag(ClusterTagRequest request, UserInfo opInfo) {
- InlongClusterTagEntity exist =
clusterTagMapper.selectById(request.getId());
- if (exist == null) {
- throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
- String.format("inlong cluster tag was not exist for
id=%s", request.getId()));
- }
- // check record version
- Preconditions.expectEquals(exist.getVersion(), request.getVersion(),
- ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("record has expired with record version=%d,
request version=%d",
- exist.getVersion(), request.getVersion()));
- // check if the cluster tag was changed, need to check whether the new
tag already exists
- if (StringUtils.isNotEmpty(request.getClusterTag())) {
- String newClusterTag = request.getClusterTag();
- String oldClusterTag = exist.getClusterTag();
- if (!newClusterTag.equals(oldClusterTag)) {
- InlongClusterTagEntity tagConflict =
clusterTagMapper.selectByTag(newClusterTag);
- if (tagConflict != null) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("inlong cluster tag [%s] to changed
already exist", newClusterTag));
- }
- // check if there are some InlongGroups that uses this tag
- List<InlongGroupEntity> usedGroupEntity =
groupMapper.selectByClusterTag(oldClusterTag);
- if (CollectionUtils.isNotEmpty(usedGroupEntity)) {
- throw new BusinessException(ErrorCodeEnum.RECORD_IN_USED,
- String.format("inlong cluster tag [%s] was used by
inlong group", oldClusterTag));
- }
- // update the associated cluster tag in inlong_cluster
- List<InlongClusterEntity> clusterEntities =
clusterMapper.selectByKey(oldClusterTag, null, null);
- if (CollectionUtils.isNotEmpty(clusterEntities)) {
- clusterEntities.forEach(entity -> {
- Set<String> tagSet =
Sets.newHashSet(entity.getClusterTags().split(InlongConstants.COMMA));
- tagSet.remove(oldClusterTag);
- tagSet.add(newClusterTag);
- String updateTags = Joiner.on(",").join(tagSet);
- entity.setClusterTags(updateTags);
- entity.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterMapper.updateById(entity)) {
- throw new
BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("cluster has already updated
with name=%s, type=%s, curVersion=%s",
- entity.getName(),
entity.getType(), entity.getVersion()));
- }
- });
- }
- }
- }
- CommonBeanUtils.copyProperties(request, exist, true);
- request.setExtParams(exist.getExtParams());
- String extParams = packExtParams(request);
- exist.setExtParams(extParams);
- exist.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterTagMapper.updateByIdSelective(exist)) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- return true;
- }
-
@Override
public Boolean deleteTag(Integer id, String operator) {
Preconditions.expectNotNull(id, "cluster tag id cannot be empty");
@@ -496,37 +399,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return true;
}
- @Override
- public Boolean deleteTag(Integer id, UserInfo opInfo) {
- InlongClusterTagEntity exist = clusterTagMapper.selectById(id);
- if (exist == null || exist.getIsDeleted() >
InlongConstants.UN_DELETED) {
- return true;
- }
- // check if there are some InlongGroups that uses this tag
- String clusterTag = exist.getClusterTag();
- // check if there are some InlongGroups that uses this tag
- List<InlongGroupEntity> usedGroupEntity =
groupMapper.selectByClusterTag(clusterTag);
- if (CollectionUtils.isNotEmpty(usedGroupEntity)) {
- throw new BusinessException(ErrorCodeEnum.RECORD_IN_USED,
- String.format("inlong cluster tag [%s] was used by inlong
group", clusterTag));
- }
- // update the associated cluster tag in inlong_cluster
- List<InlongClusterEntity> clusterEntities =
clusterMapper.selectByKey(clusterTag, null, null);
- if (CollectionUtils.isNotEmpty(clusterEntities)) {
- clusterEntities.forEach(entity -> {
- this.removeClusterTag(entity, clusterTag, opInfo.getName());
- });
- }
- exist.setIsDeleted(exist.getId());
- exist.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterTagMapper.updateByIdSelective(exist)) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("cluster tag has already updated with
name=%s, curVersion=%s",
- exist.getClusterTag(), exist.getVersion()));
- }
- return true;
- }
-
@Override
public Integer save(ClusterRequest request, String operator) {
LOGGER.debug("begin to save inlong cluster={}", request);
@@ -556,26 +428,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return id;
}
- @Override
- public Integer save(ClusterRequest request, UserInfo opInfo) {
- // The name cannot be modified and is automatically generated by the
system
- String name = request.getName();
- if (StringUtils.isBlank(name)) {
- name = UUID.randomUUID().toString();
- request.setName(name);
- }
- // check if the cluster already exist
- List<InlongClusterEntity> exist = clusterMapper.selectByKey(
- request.getClusterTags(), request.getName(),
request.getType());
- if (CollectionUtils.isNotEmpty(exist)) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("inlong cluster already exist for cluster
tag=%s name=%s type=%s",
- request.getClusterTags(), request.getName(),
request.getType()));
- }
- InlongClusterOperator instance =
clusterOperatorFactory.getInstance(request.getType());
- return instance.saveOpt(request, opInfo.getName());
- }
-
@Override
public ClusterInfo get(Integer id, String currentUser) {
Preconditions.expectNotNull(id, "inlong cluster id cannot be empty");
@@ -591,18 +443,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return clusterInfo;
}
- @Override
- public ClusterInfo get(Integer id, UserInfo opInfo) {
- InlongClusterEntity entity = clusterMapper.selectById(id);
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
- String.format("inlong cluster not found by id=%s", id));
- }
-
- InlongClusterOperator instance =
clusterOperatorFactory.getInstance(entity.getType());
- return instance.getFromEntity(entity);
- }
-
@Override
public PageResult<ClusterInfo> list(ClusterPageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
@@ -701,29 +541,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return true;
}
- @Override
- public Boolean update(ClusterRequest request, UserInfo opInfo) {
- InlongClusterEntity entity = clusterMapper.selectById(request.getId());
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
- String.format("inlong cluster not found by id=%s",
request.getId()));
- }
- // check parameters
- chkUnmodifiableParams(entity, request);
- // check whether the cluster already exists
- List<InlongClusterEntity> exist = clusterMapper.selectByKey(
- request.getClusterTags(), request.getName(),
request.getType());
- if (CollectionUtils.isNotEmpty(exist) &&
!Objects.equals(request.getId(), exist.get(0).getId())) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("inlong cluster already exist for cluster
tag=%s name=%s type=%s",
- request.getClusterTags(), request.getName(),
request.getType()));
- }
- // update record
- InlongClusterOperator instance =
clusterOperatorFactory.getInstance(request.getType());
- instance.updateOpt(request, opInfo.getName());
- return true;
- }
-
@Override
public UpdateResult updateByKey(ClusterRequest request, String operator) {
LOGGER.debug("begin to update inlong cluster: {}", request);
@@ -779,42 +596,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return true;
}
- @Override
- public Boolean bindTag(BindTagRequest request, UserInfo opInfo) {
- if (CollectionUtils.isNotEmpty(request.getBindClusters())) {
- request.getBindClusters().forEach(id -> {
- InlongClusterEntity entity = clusterMapper.selectById(id);
- Set<String> tagSet =
Sets.newHashSet(entity.getClusterTags().split(InlongConstants.COMMA));
- tagSet.add(request.getClusterTag());
- String updateTags = Joiner.on(",").join(tagSet);
- InlongClusterEntity updateEntity =
clusterMapper.selectById(id);
- updateEntity.setClusterTags(updateTags);
- updateEntity.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterMapper.updateById(updateEntity)) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("cluster has already updated with
name=%s, type=%s, curVersion=%s",
- updateEntity.getName(),
updateEntity.getType(), updateEntity.getVersion()));
- }
- });
- }
- if (CollectionUtils.isNotEmpty(request.getUnbindClusters())) {
- request.getUnbindClusters().forEach(id -> {
- InlongClusterEntity entity = clusterMapper.selectById(id);
- Set<String> tagSet =
Sets.newHashSet(entity.getClusterTags().split(InlongConstants.COMMA));
- tagSet.remove(request.getClusterTag());
- String updateTags = Joiner.on(",").join(tagSet);
- entity.setClusterTags(updateTags);
- entity.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterMapper.updateById(entity)) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("cluster has already updated with
name=%s, type=%s, curVersion=%s",
- entity.getName(), entity.getType(),
entity.getVersion()));
- }
- });
- }
- return true;
- }
-
@Override
public Boolean deleteByKey(String name, String type, String operator) {
Preconditions.expectNotBlank(name, ErrorCodeEnum.INVALID_PARAMETER,
"cluster name should not be empty or null");
@@ -915,24 +696,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return id;
}
- @Override
- public Integer saveNode(ClusterNodeRequest request, UserInfo opInfo) {
- // check cluster info
- InlongClusterEntity entity =
clusterMapper.selectById(request.getParentId());
- Preconditions.expectNotNull(entity, ErrorCodeEnum.CLUSTER_NOT_FOUND,
- String.format("inlong cluster not found by id=%s, or was
already deleted", request.getParentId()));
- // check cluster node if exist
- InlongClusterNodeEntity exist =
clusterNodeMapper.selectByUniqueKey(request);
- if (exist != null) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("inlong cluster node already exist for
type=%s ip=%s port=%s",
- request.getType(), request.getIp(),
request.getPort()));
- }
- // add record
- InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(request.getType());
- return instance.saveOpt(request, opInfo.getName());
- }
-
@Override
public ClusterNodeResponse getNode(Integer id, String currentUser) {
Preconditions.expectNotNull(id, "cluster node id cannot be empty");
@@ -945,16 +708,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return instance.getFromEntity(entity);
}
- @Override
- public ClusterNodeResponse getNode(Integer id, UserInfo opInfo) {
- InlongClusterNodeEntity entity = clusterNodeMapper.selectById(id);
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
- }
- InlongClusterEntity cluster =
clusterMapper.selectById(entity.getParentId());
- return CommonBeanUtils.copyProperties(entity,
ClusterNodeResponse::new);
- }
-
@Override
public PageResult<ClusterNodeResponse> listNode(ClusterPageRequest
request, String currentUser) {
if (StringUtils.isNotBlank(request.getClusterTag())) {
@@ -1024,34 +777,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return result;
}
- @Override
- public List<ClusterNodeResponse> listNodeByGroupId(
- String groupId, String clusterType, String protocolType, UserInfo
opInfo) {
- InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("inlong group not exists for groupId=%s",
groupId));
- }
- String clusterTag = groupEntity.getInlongClusterTag();
- if (StringUtils.isBlank(clusterTag)) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND,
- String.format("not found any cluster tag for groupId=%s",
groupId));
- }
- List<InlongClusterEntity> clusterList =
clusterMapper.selectByKey(clusterTag, null, clusterType);
- if (CollectionUtils.isEmpty(clusterList)) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
- String.format("not found any data proxy cluster for
groupId=%s and clusterTag=%s",
- groupId, clusterTag));
- }
- // TODO if more than one data proxy cluster, currently takes first
- List<InlongClusterNodeEntity> nodeEntities =
- clusterNodeMapper.selectByParentId(clusterList.get(0).getId(),
protocolType);
- if (CollectionUtils.isEmpty(nodeEntities)) {
- return Collections.emptyList();
- }
- return CommonBeanUtils.copyListProperties(nodeEntities,
ClusterNodeResponse::new);
- }
-
public List<ClusterNodeResponse> listNodeByClusterTag(ClusterPageRequest
request) {
List<InlongClusterEntity> clusterList =
clusterMapper.selectByKey(request.getClusterTag(), request.getName(),
request.getType());
@@ -1122,45 +847,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
- public Boolean updateNode(ClusterNodeRequest request, UserInfo opInfo) {
- InlongClusterNodeEntity entity =
clusterNodeMapper.selectById(request.getId());
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
- String.format("cluster node not found by id=%s",
request.getId()));
- }
- // check type
- Preconditions.expectEquals(entity.getType(), request.getType(),
- ErrorCodeEnum.INVALID_PARAMETER, "type not allowed modify");
- // check record version
- Preconditions.expectEquals(entity.getVersion(), request.getVersion(),
- ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("record has expired with record version=%d,
request version=%d",
- entity.getVersion(), request.getVersion()));
- // check protocol type
- if (StringUtils.isBlank(request.getProtocolType())) {
- request.setProtocolType(entity.getProtocolType());
- }
- // check wanted cluster node if exist
- InlongClusterNodeEntity exist =
clusterNodeMapper.selectByUniqueKey(request);
- if (exist != null && !Objects.equals(request.getId(), exist.getId())) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- "inlong cluster node already exist for " + request);
- }
- // check parent id
- InlongClusterEntity cluster =
clusterMapper.selectById(entity.getParentId());
- if (cluster == null) {
- throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
- String.format("The cluster to which the node belongs not
found by clusterId=%s",
- request.getParentId()));
- }
- // update record
- InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(request.getType());
- instance.updateOpt(request, opInfo.getName());
- return true;
- }
-
@Override
public Boolean deleteNode(Integer id, String operator) {
Preconditions.expectNotNull(id, "cluster node id cannot be empty");
@@ -1189,23 +875,6 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return isSuccess;
}
- @Override
- public Boolean deleteNode(Integer id, UserInfo opInfo) {
- InlongClusterNodeEntity entity = clusterNodeMapper.selectById(id);
- Preconditions.expectNotNull(entity, ErrorCodeEnum.CLUSTER_NOT_FOUND);
- // delete record
- entity.setIsDeleted(entity.getId());
- entity.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterNodeMapper.updateById(entity)) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format(
- "cluster node has already updated with
parentId=%s, type=%s, ip=%s, port=%s, protocolType=%s",
- entity.getParentId(), entity.getType(),
entity.getIp(), entity.getPort(),
- entity.getProtocolType()));
- }
- return true;
- }
-
@Override
public String getManagerSSHPublicKey() {
String homeDirectory = System.getProperty("user.home");
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 82844da825..4906e91aa7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -86,15 +86,6 @@ public interface InlongGroupService {
*/
String getTenant(String groupId, String operator);
- /**
- * Get inlong group info based on inlong group id
- *
- * @param groupId inlong group id
- * @param opInfo userinfo of operator
- * @return detail of inlong group
- */
- InlongGroupInfo get(String groupId, UserInfo opInfo);
-
/**
* Query the group information of each status of the current user
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index e0e7c486c7..19e546938f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -266,26 +266,6 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
return groupInfo;
}
- @Override
- public InlongGroupInfo get(String groupId, UserInfo opInfo) {
- InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
-
- // query mq information
- InlongGroupOperator instance =
groupOperatorFactory.getInstance(entity.getMqType());
- InlongGroupInfo groupInfo = instance.getFromEntity(entity);
- // get all ext info
- List<InlongGroupExtEntity> extEntityList =
groupExtMapper.selectByGroupId(groupId);
- List<InlongGroupExtInfo> extList =
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
- groupInfo.setExtList(extList);
- List<InlongStreamExtEntity> streamExtEntities =
streamExtMapper.selectByRelatedId(groupId, null);
- BaseSortConf sortConf = buildSortConfig(streamExtEntities);
- groupInfo.setSortConf(sortConf);
- return groupInfo;
- }
-
@Override
public String getTenant(String groupId, String operator) {
InlongGroupEntity groupEntity =
groupMapper.selectByGroupIdWithoutTenant(groupId);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 79da89d512..ea46dc3432 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -51,15 +51,6 @@ public interface StreamSinkService {
*/
Integer save(SinkRequest request, String operator);
- /**
- * Save the sink info.
- *
- * @param request sink request need to save
- * @param opInfo userinfo of operator
- * @return sink id after saving
- */
- Integer save(SinkRequest request, UserInfo opInfo);
-
/**
* Batch save the sink info.
*
@@ -77,15 +68,6 @@ public interface StreamSinkService {
*/
StreamSink get(Integer id);
- /**
- * Get stream sink info based on id.
- *
- * @param id sink id
- * @param opInfo userinfo of operator
- * @return detail of stream sink info
- */
- StreamSink get(Integer id, UserInfo opInfo);
-
/**
* List the stream sinks based on inlong group id and inlong stream id.
*
@@ -151,15 +133,6 @@ public interface StreamSinkService {
*/
Boolean update(SinkRequest sinkRequest, String operator);
- /**
- * Modify stream sink info by id.
- *
- * @param sinkRequest stream sink request that needs to be modified
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean update(SinkRequest sinkRequest, UserInfo opInfo);
-
/**
* Modify stream sink info by key.
*
@@ -188,16 +161,6 @@ public interface StreamSinkService {
*/
Boolean delete(Integer id, Boolean startProcess, String operator);
- /**
- * Delete the stream sink by the given id and sink type.
- *
- * @param id stream sink id
- * @param startProcess whether to start the process after saving or
updating
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean delete(Integer id, Boolean startProcess, UserInfo opInfo);
-
/**
* Delete the stream sink by given group id, stream id, and sink name.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4d7fd556ac..2180400305 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
@@ -180,62 +179,6 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return id;
}
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Integer save(SinkRequest request, UserInfo opInfo) {
- // check request parameter
- checkSinkRequestParams(request);
- InlongGroupEntity entity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
- }
- // check group status
- GroupStatus curState = GroupStatus.forCode(entity.getStatus());
- if (GroupStatus.notAllowedUpdate(curState)) {
- throw new
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
curState));
- }
- // Check whether the stream exist or not
- InlongStreamEntity streamEntity =
- streamMapper.selectByIdentifier(request.getInlongGroupId(),
request.getInlongStreamId());
- if (streamEntity == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
- }
- // Check whether the sink name exists with the same groupId and
streamId
- StreamSinkEntity exists = sinkMapper.selectByUniqueKey(
- request.getInlongGroupId(), request.getInlongStreamId(),
request.getSinkName());
- if (exists != null &&
exists.getSinkName().equals(request.getSinkName())) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("sink name=%s already exists with the
groupId=%s streamId=%s",
- request.getSinkName(), request.getInlongGroupId(),
request.getInlongStreamId()));
- }
- // According to the sink type, save sink information
- StreamSinkOperator sinkOperator =
operatorFactory.getInstance(request.getSinkType());
- List<SinkField> fields = request.getSinkFieldList();
- // Remove id in sinkField when save
- if (CollectionUtils.isNotEmpty(fields)) {
- fields.forEach(sinkField -> sinkField.setId(null));
- }
- int id = sinkOperator.saveOpt(request, opInfo.getName());
- boolean streamSuccess =
StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
- if (streamSuccess ||
StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
- boolean enableCreateResource =
InlongConstants.ENABLE_CREATE_RESOURCE.equals(
- request.getEnableCreateResource());
- SinkStatus nextStatus = request.getStartProcess() ?
SinkStatus.CONFIG_ING : SinkStatus.NEW;
- if (!enableCreateResource) {
- nextStatus = SinkStatus.CONFIG_SUCCESSFUL;
- }
- StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
- sinkEntity.setStatus(nextStatus.getCode());
- sinkMapper.updateStatus(sinkEntity);
- }
- // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the
[CREATE_STREAM_RESOURCE] process
- if (streamSuccess && request.getStartProcess()) {
- this.startProcessForSink(request.getInlongGroupId(),
request.getInlongStreamId(), opInfo.getName());
- }
- return id;
- }
-
@Override
public List<BatchResult> batchSave(List<SinkRequest> requestList, String
operator) {
List<BatchResult> resultList = new ArrayList<>();
@@ -269,16 +212,6 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND,
String.format("sink not found by id=%s", id));
}
- StreamSinkOperator sinkOperator =
operatorFactory.getInstance(entity.getSinkType());
- return sinkOperator.getFromEntity(entity);
- }
-
- @Override
- public StreamSink get(Integer id, UserInfo opInfo) {
- StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
- }
InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(entity.getInlongGroupId());
if (groupEntity == null) {
@@ -453,63 +386,6 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Boolean update(SinkRequest request, UserInfo opInfo) {
- if (request.getId() == null) {
- throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
- }
- StreamSinkEntity curEntity =
sinkMapper.selectByPrimaryKey(request.getId());
- if (curEntity == null) {
- throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
- }
- chkUnmodifiableParams(curEntity, request);
- // check group record
- InlongGroupEntity curGroupEntity =
groupMapper.selectByGroupId(curEntity.getInlongGroupId());
- if (curGroupEntity == null) {
- throw new
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", curEntity.getInlongGroupId()));
- }
- // Check if group status can be modified
- GroupStatus curState = GroupStatus.forCode(curEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(curState)) {
- throw new
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
curState));
- }
- // Check whether the stream exist or not
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
- request.getInlongGroupId(), request.getInlongStreamId());
- if (streamEntity == null) {
- throw new
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
- String.format("stream record not found with the groupId=%s
streamId=%s",
- curEntity.getInlongGroupId(),
curEntity.getInlongStreamId()));
- }
- // Check whether the sink name exists with the same groupId and
streamId
- StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(
- request.getInlongGroupId(), request.getInlongStreamId(),
request.getSinkName());
- if (existEntity != null &&
!existEntity.getId().equals(request.getId())) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("sink name=%s already exists with the
groupId=%s streamId=%s",
- request.getSinkName(), request.getInlongGroupId(),
request.getInlongStreamId()));
- }
- // update record
- SinkStatus nextStatus = null;
- boolean enableConfig =
StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus())
- ||
StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus());
- if (enableConfig) {
- boolean enableCreateResource =
InlongConstants.ENABLE_CREATE_RESOURCE.equals(
- request.getEnableCreateResource());
- nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING :
SinkStatus.CONFIG_SUCCESSFUL;
- }
- StreamSinkOperator sinkOperator =
operatorFactory.getInstance(request.getSinkType());
- sinkOperator.updateOpt(request, nextStatus, opInfo.getName());
- // If the stream is [CONFIG_SUCCESSFUL] or [CONFIG_FAILED], then
asynchronously start the
- // [CREATE_STREAM_RESOURCE] process
- if (enableConfig && request.getStartProcess()) {
- this.startProcessForSink(request.getInlongGroupId(),
request.getInlongStreamId(), opInfo.getName());
- }
- return true;
- }
-
@Override
@Transactional(rollbackFor = Throwable.class)
public UpdateResult updateByKey(SinkRequest request, String operator) {
@@ -564,34 +440,6 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Boolean delete(Integer id, Boolean startProcess, UserInfo opInfo) {
- // check stream sink record
- StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
- if (sinkEntity == null) {
- throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
- }
- // check group record
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(sinkEntity.getInlongGroupId());
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", sinkEntity.getInlongGroupId()));
- }
- // Check if group status can be modified
- GroupStatus curState = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(curState)) {
- throw new
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
curState));
- }
- // delete record
- StreamSinkOperator sinkOperator =
operatorFactory.getInstance(sinkEntity.getSinkType());
- sinkOperator.deleteOpt(sinkEntity, opInfo.getName());
- if (startProcess) {
- this.deleteProcessForSink(sinkEntity.getInlongGroupId(),
sinkEntity.getInlongStreamId(), opInfo.getName());
- }
- return true;
- }
-
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean deleteByKey(String groupId, String streamId, String
sinkName,
@@ -638,12 +486,7 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
entity.setIsDeleted(id);
entity.setModifier(operator);
int rowCount = sinkMapper.updateByIdSelective(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error("sink has already updated with groupId={},
streamId={}, name={}, curVersion={}",
- entity.getInlongGroupId(),
entity.getInlongStreamId(), entity.getSinkName(),
- entity.getVersion());
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
+ checkAffectRowCount(rowCount, entity);
sinkFieldMapper.logicDeleteAll(id);
});
}
@@ -652,6 +495,14 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return true;
}
+ private void checkAffectRowCount(int affectRowCount, StreamSinkEntity
entity) {
+ if (affectRowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("sink has already updated with groupId={},
streamId={}, name={}, curVersion={}",
+ entity.getInlongGroupId(), entity.getInlongStreamId(),
entity.getSinkName(), entity.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ }
+
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean deleteAll(String groupId, String streamId, String operator)
{
@@ -716,12 +567,7 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
entity.setStatus(status);
entity.setModifier(operator);
int rowCount = sinkMapper.updateByIdSelective(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error("sink has already updated with groupId={},
streamId={}, name={}, curVersion={}",
- entity.getInlongGroupId(), entity.getInlongStreamId(),
entity.getSinkName(),
- entity.getVersion());
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
+ checkAffectRowCount(rowCount, entity);
}
LOGGER.info("success to update sink after approve: {}", approveList);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 0a2cfa9239..b622d7a303 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -44,15 +44,6 @@ public interface StreamSourceService {
*/
Integer save(SourceRequest request, String operator);
- /**
- * Save the source information
- *
- * @param request Source request.
- * @param opInfo userinfo of operator
- * @return source id after saving.
- */
- Integer save(SourceRequest request, UserInfo opInfo);
-
/**
* Batch save the source information
*
@@ -70,15 +61,6 @@ public interface StreamSourceService {
*/
StreamSource get(Integer id);
- /**
- * Query source information based on id
- *
- * @param id source id.
- * @param opInfo userinfo of operator
- * @return Source info
- */
- StreamSource get(Integer id, UserInfo opInfo);
-
/**
* Query source information based on inlong group id and inlong stream id.
*
@@ -135,15 +117,6 @@ public interface StreamSourceService {
*/
Boolean update(SourceRequest sourceRequest, String operator);
- /**
- * Modify data source information
- *
- * @param sourceRequest Information that needs to be modified
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean update(SourceRequest sourceRequest, UserInfo opInfo);
-
/**
* Update source status by the given groupId and streamId
*
@@ -164,15 +137,6 @@ public interface StreamSourceService {
*/
Boolean delete(Integer id, String operator);
- /**
- * Delete the stream source by the given id and source type.
- *
- * @param id The primary key of the source.
- * @param opInfo userinfo of operator
- * @return Whether succeed
- */
- Boolean delete(Integer id, UserInfo opInfo);
-
/**
* Force deletes the stream source by groupId and streamId
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index ec4d70f076..f2a263ba82 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.service.source;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
@@ -28,7 +27,6 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
@@ -123,47 +121,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
return id;
}
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
- public Integer save(SourceRequest request, UserInfo opInfo) {
- // Check if it can be added
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
- }
- // get stream information
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
- request.getInlongGroupId(), request.getInlongStreamId());
- if (streamEntity == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND,
- String.format("InlongStream does not exist with
InlongGroupId=%s, InLongStreamId=%s",
- request.getInlongGroupId(),
request.getInlongStreamId()));
- }
- // Check if the record to be added exists
- List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(
- request.getInlongGroupId(), request.getInlongStreamId(),
request.getSourceName());
- if (CollectionUtils.isNotEmpty(existList)) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("source name=%s already exists with
groupId=%s streamId=%s",
- request.getSourceName(),
request.getInlongGroupId(), request.getInlongStreamId()));
- }
- // check inlong group status
- GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(status)) {
- throw new
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
- }
- // According to the source type, save source information
- StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
- // Remove id in sourceField when save
- List<StreamField> streamFields = request.getFieldList();
- if (CollectionUtils.isNotEmpty(streamFields)) {
- streamFields.forEach(streamField -> streamField.setId(null));
- }
- return sourceOperator.saveOpt(request, groupEntity.getStatus(),
opInfo.getName());
- }
-
@Override
public List<BatchResult> batchSave(List<SourceRequest> requestList, String
operator) {
List<BatchResult> resultList = new ArrayList<>();
@@ -203,21 +160,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
return streamSource;
}
- @Override
- public StreamSource get(Integer id, UserInfo opInfo) {
- StreamSourceEntity entity = sourceMapper.selectById(id);
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
- String.format("source not found by id=%s", id));
- }
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(entity.getInlongGroupId());
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
- StreamSourceOperator sourceOperator =
operatorFactory.getInstance(entity.getSourceType());
- return sourceOperator.getFromEntity(entity);
- }
-
@Override
public Integer getCount(String groupId, String streamId) {
Integer count = sourceMapper.selectCount(groupId, streamId);
@@ -365,33 +307,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
- public Boolean update(SourceRequest request, UserInfo opInfo) {
- // check request parameter
- chkUnmodifiableParams(request);
- // Check if it can be update
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (groupEntity == null) {
- throw new
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
- }
- // check inlong group status
- GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(status)) {
- throw new
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
- }
- StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
- // Remove id in sourceField when save
- List<StreamField> streamFields = request.getFieldList();
- if (CollectionUtils.isNotEmpty(streamFields)) {
- streamFields.forEach(streamField -> streamField.setId(null));
- }
- sourceOperator.updateOpt(request, groupEntity.getStatus(),
groupEntity.getInlongGroupMode(), opInfo.getName());
- return true;
- }
-
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
public Boolean updateStatus(String groupId, String streamId, Integer
targetStatus, String operator) {
@@ -436,6 +351,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
entity.setPreviousStatus(curStatus.getCode());
entity.setStatus(nextStatus.getCode());
entity.setIsDeleted(id);
+ entity.setModifier(operator);
int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.error("source has already updated with groupId={},
streamId={}, name={}, curVersion={}",
@@ -448,50 +364,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
- public Boolean delete(Integer id, UserInfo opInfo) {
- StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
- Preconditions.expectNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
- ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-
- // Check if it can be delete
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(entity.getInlongGroupId());
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", entity.getInlongGroupId()));
- }
- // check record status
- boolean isTemplateSource =
CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id));
- SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
- SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
- // if source is frozen|failed|new, or if it is a template source or
auto push source, delete directly
- if (curStatus == SourceStatus.SOURCE_STOP || curStatus ==
SourceStatus.SOURCE_FAILED
- || curStatus == SourceStatus.SOURCE_NEW || isTemplateSource
- || SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
- nextStatus = SourceStatus.SOURCE_DISABLE;
- }
- if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {
- throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
- String.format("current source status=%s for id=%s is not
allowed to delete", entity.getStatus(),
- entity.getId()));
- }
- // delete record
- entity.setPreviousStatus(curStatus.getCode());
- entity.setStatus(nextStatus.getCode());
- entity.setIsDeleted(id);
- entity.setModifier(opInfo.getName());
- int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("source has already updated with groupId=%s,
streamId=%s, name=%s, curVersion=%s",
- entity.getInlongGroupId(),
entity.getInlongStreamId(), entity.getSourceName(),
- entity.getVersion()));
- }
- sourceFieldMapper.deleteAll(id);
- return true;
- }
-
@Override
public Boolean forceDelete(String groupId, String streamId, String
operator) {
LOGGER.info("begin to force delete source for groupId={} and
streamId={} by user={}",
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 689ebe710f..96002d0ebd 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -60,15 +60,6 @@ public interface InlongStreamService {
*/
List<BatchResult> batchSave(List<InlongStreamRequest> requestList, String
operator);
- /**
- * Save inlong stream information.
- *
- * @param request Inlong stream information.
- * @param opInfo userinfo of operator
- * @return Id after successful save.
- */
- Integer save(InlongStreamRequest request, UserInfo opInfo);
-
/**
* Query whether the inlong stream ID exists
*
@@ -87,16 +78,6 @@ public interface InlongStreamService {
*/
InlongStreamInfo get(String groupId, String streamId);
- /**
- * Query the details of the specified inlong stream
- *
- * @param groupId Inlong group id
- * @param streamId Inlong stream id
- * @param opInfo userinfo of operator
- * @return inlong stream details
- */
- InlongStreamInfo get(String groupId, String streamId, UserInfo opInfo);
-
/**
* Query the brief of the specified inlong stream
*
@@ -159,15 +140,6 @@ public interface InlongStreamService {
*/
Boolean update(InlongStreamRequest request, String operator);
- /**
- * Update the InlongStream info
- *
- * @param request inlong stream info that needs to be modified
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean update(InlongStreamRequest request, UserInfo opInfo);
-
/**
* Update the InlongStream - not check the InlongGroup status to which the
stream belongs.
*
@@ -190,19 +162,6 @@ public interface InlongStreamService {
*/
Boolean delete(String groupId, String streamId, String operator);
- /**
- * Delete the specified inlong stream.
- * <p/>
- * When deleting an inlong stream, you need to check whether there are
some related
- * stream_sources or stream_sinks
- *
- * @param groupId Inlong group id
- * @param streamId Inlong stream id
- * @param opInfo userinfo of operator
- * @return whether succeed
- */
- Boolean delete(String groupId, String streamId, UserInfo opInfo);
-
/**
* Logically delete all inlong streams under the specified groupId
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 9167ab02c7..46db20a04f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -214,46 +214,6 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
return resultList;
}
- @Override
- public Integer save(InlongStreamRequest request, UserInfo opInfo) {
- InlongGroupEntity entity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
- // Add/modify/delete is not allowed under temporary inlong group status
- GroupStatus curState = GroupStatus.forCode(entity.getStatus());
- if (GroupStatus.isTempStatus(curState)) {
- throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
- String.format("inlong groupId=%s status=%s was not allowed
to add/update/delete stream",
- request.getInlongGroupId(), curState));
- }
- // The streamId under the same groupId cannot be repeated
- Integer count = streamMapper.selectExistByIdentifier(
- request.getInlongGroupId(), request.getInlongStreamId());
- if (count >= 1) {
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_DUPLICATE);
- }
- if (StringUtils.isEmpty(request.getMqResource())) {
- request.setMqResource(request.getInlongStreamId());
- }
- // Processing extended attributes
- String extParams = packExtParams(request);
- request.setExtParams(extParams);
- // Processing inlong stream
- InlongStreamEntity streamEntity =
CommonBeanUtils.copyProperties(request, InlongStreamEntity::new);
- streamEntity.setStatus(StreamStatus.NEW.getCode());
- streamEntity.setCreator(opInfo.getName());
- streamEntity.setModifier(opInfo.getName());
- // add record
- streamMapper.insertSelective(streamEntity);
- saveField(request.getInlongGroupId(), request.getInlongStreamId(),
request.getFieldList());
- List<InlongStreamExtInfo> extList = request.getExtList();
- if (CollectionUtils.isNotEmpty(extList)) {
- saveOrUpdateExt(request.getInlongGroupId(),
request.getInlongStreamId(), extList);
- }
- return streamEntity.getId();
- }
-
@Override
public Boolean exist(String groupId, String streamId) {
Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
@@ -291,33 +251,6 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
return streamInfo;
}
- @Override
- public InlongStreamInfo get(String groupId, String streamId, UserInfo
opInfo) {
- InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
- // get stream information
- InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
- if (streamEntity == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
- }
- InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
- // Processing extParams
- unpackExtParams(streamEntity.getExtParams(), streamInfo);
- // Load fields
- List<StreamField> streamFields = getStreamFields(groupId, streamId);
- streamInfo.setFieldList(streamFields);
- List<InlongStreamExtEntity> extEntities =
streamExtMapper.selectByRelatedId(groupId, streamId);
- List<InlongStreamExtInfo> extInfos =
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
- streamInfo.setExtList(extInfos);
- List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
- streamInfo.setSinkList(sinkList);
- List<StreamSource> sourceList = sourceService.listSource(groupId,
streamId);
- streamInfo.setSourceList(sourceList);
- return streamInfo;
- }
-
@Override
public InlongStreamBriefInfo getBrief(String groupId, String streamId,
String operator) {
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
@@ -526,51 +459,6 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
return this.updateWithoutCheck(request, operator);
}
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Boolean update(InlongStreamRequest request, UserInfo opInfo) {
- InlongGroupEntity entity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
-
- // Add/modify/delete is not allowed under temporary inlong group status
- GroupStatus curState = GroupStatus.forCode(entity.getStatus());
- if (GroupStatus.isTempStatus(curState)) {
- throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
- String.format("inlong groupId=%s status=%s was not allowed
to add/update/delete stream",
- request.getInlongGroupId(), curState));
- }
- // check stream status
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
- request.getInlongGroupId(), request.getInlongStreamId());
- if (streamEntity == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND,
- String.format("inlong stream not found by groupId=%s,
streamId=%s",
- request.getInlongGroupId(),
request.getInlongStreamId()));
- }
- if (!Objects.equals(streamEntity.getVersion(), request.getVersion())) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("stream has already updated with groupId=%s,
streamId=%s, curVersion=%s",
- streamEntity.getInlongGroupId(),
streamEntity.getInlongStreamId(), request.getVersion()));
- }
- // Processing extended attributes
- String extParams = packExtParams(request);
- request.setExtParams(extParams);
- // update record
- CommonBeanUtils.copyProperties(request, streamEntity, true);
- streamEntity.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
streamMapper.updateByIdentifierSelective(streamEntity)) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- // update stream fields
- updateField(request.getInlongGroupId(), request.getInlongStreamId(),
request.getFieldList());
- // update stream extension infos
- List<InlongStreamExtInfo> extList = request.getExtList();
- saveOrUpdateExt(request.getInlongGroupId(),
request.getInlongStreamId(), extList);
- return true;
- }
-
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean updateWithoutCheck(InlongStreamRequest request, String
operator) {
@@ -663,45 +551,6 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Boolean delete(String groupId, String streamId, UserInfo opInfo) {
- InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
- // Add/modify/delete is not allowed under temporary inlong group status
- GroupStatus curState = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.isTempStatus(curState)) {
- throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
- String.format("inlong groupId=%s status=%s was not allowed
to add/update/delete stream", groupId,
- curState));
- }
- // Check if steam record exists
- InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
- if (streamEntity == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
- }
- // If there is undeleted stream source, the deletion fails
- if (sourceService.getCount(groupId, streamId) > 0) {
- throw new
BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SOURCE);
- }
- // If there is undeleted stream sink, the deletion fails
- if (sinkService.getCount(groupId, streamId) > 0) {
- throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SINK);
- }
- // delete record
- streamEntity.setIsDeleted(streamEntity.getId());
- streamEntity.setModifier(opInfo.getName());
- if (streamMapper.updateByPrimaryKey(streamEntity) !=
InlongConstants.AFFECTED_ONE_ROW) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- // Logically delete the associated field table
- streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
- streamExtMapper.logicDeleteAllByRelatedId(groupId, streamId);
- return true;
- }
-
@Transactional(rollbackFor = Throwable.class)
@Override
public Boolean logicDeleteAll(String groupId, String operator) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
index f172219c77..b8d10ebb92 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
@@ -40,15 +40,6 @@ public interface StreamTransformService {
*/
Integer save(TransformRequest request, String operator);
- /**
- * Save the transform information.
- *
- * @param request the transform request
- * @param opInfo userinfo of operator
- * @return transform id after saving
- */
- Integer save(TransformRequest request, UserInfo opInfo);
-
/**
* Query transform information based on inlong group id and inlong stream
id.
*
@@ -75,16 +66,6 @@ public interface StreamTransformService {
*/
List<TransformResponse> listTransform(String groupId, String streamId);
- /**
- * Query transform information based on inlong group id and inlong stream
id.
- *
- * @param groupId the inlong group id
- * @param streamId the inlong stream id
- * @param opInfo userinfo of operator
- * @return the transform response
- */
- List<TransformResponse> listTransform(String groupId, String streamId,
UserInfo opInfo);
-
/**
* Modify data transform information.
*
@@ -94,15 +75,6 @@ public interface StreamTransformService {
*/
Boolean update(TransformRequest request, String operator);
- /**
- * Modify data transform information.
- *
- * @param request the transform request
- * @param opInfo userinfo of operator
- * @return Whether succeed
- */
- Boolean update(TransformRequest request, UserInfo opInfo);
-
/**
* Delete the stream transform by the given id.
*
@@ -111,14 +83,4 @@ public interface StreamTransformService {
* @return Whether succeed
*/
Boolean delete(DeleteTransformRequest request, String operator);
-
- /**
- * Delete the stream transform by the given id.
- *
- * @param request delete request
- * @param opInfo userinfo of operator
- * @return Whether succeed
- */
- Boolean delete(DeleteTransformRequest request, UserInfo opInfo);
-
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 512580c9a4..40269450a9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.transform;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -102,40 +101,6 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
return transformEntity.getId();
}
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
- public Integer save(TransformRequest request, UserInfo opInfo) {
- // Check if it can be added
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
- }
- // check inlong group status
- GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(status)) {
- throw new
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
- }
- // Check if the record to be added exists
- List<StreamTransformEntity> transformEntities =
- transformMapper.selectByRelatedId(request.getInlongGroupId(),
- request.getInlongStreamId(),
request.getTransformName());
- if (CollectionUtils.isNotEmpty(transformEntities)) {
- throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
- String.format("stream transform already exists with
groupId=%s, streamId=%s, transformName=%s",
- request.getInlongGroupId(),
request.getInlongStreamId(), request.getTransformName()));
- }
- // add record
- StreamTransformEntity transformEntity =
- CommonBeanUtils.copyProperties(request,
StreamTransformEntity::new);
- transformEntity.setCreator(opInfo.getName());
- transformEntity.setModifier(opInfo.getName());
- transformMapper.insert(transformEntity);
- saveFieldOpt(transformEntity, request.getFieldList());
- return transformEntity.getId();
- }
-
@Override
public PageResult<TransformResponse> listByCondition(TransformPageRequest
request, UserInfo opInfo) {
String groupId = request.getInlongGroupId();
@@ -195,23 +160,6 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
return getTransformResponse(entityList);
}
- @Override
- public List<TransformResponse> listTransform(String groupId, String
streamId, UserInfo opInfo) {
- // Check if it can be added
- InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", groupId));
- }
- // query result
- List<StreamTransformEntity> entityList =
transformMapper.selectByRelatedId(groupId, streamId, null);
- if (CollectionUtils.isEmpty(entityList)) {
- return Collections.emptyList();
- }
- // get transform data
- return getTransformResponse(entityList);
- }
-
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
public Boolean update(TransformRequest request, String operator) {
@@ -238,38 +186,6 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
- public Boolean update(TransformRequest request, UserInfo opInfo) {
- // check request and parameters
- this.chkUnmodifiableParams(request);
- // Check if it can be added
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
- }
- // check inlong group status
- GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(status)) {
- throw new
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
- }
- // update record
- StreamTransformEntity transformEntity =
- CommonBeanUtils.copyProperties(request,
StreamTransformEntity::new);
- transformEntity.setModifier(opInfo.getName());
- int rowCount = transformMapper.updateByIdSelective(transformEntity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("transform has already updated with
groupId=%s, streamId=%s, name=%s, curVersion=%s",
- request.getInlongGroupId(),
request.getInlongStreamId(),
- request.getTransformName(), request.getVersion()));
- }
- updateFieldOpt(transformEntity, request.getFieldList());
- return true;
- }
-
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
public Boolean delete(DeleteTransformRequest request, String operator) {
@@ -303,44 +219,6 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
return true;
}
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
- public Boolean delete(DeleteTransformRequest request, UserInfo opInfo) {
- // Check if it can be added
- InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
- }
- // check inlong group status
- GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(status)) {
- throw new
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
- }
- // query records
- List<StreamTransformEntity> entityList =
- transformMapper.selectByRelatedId(request.getInlongGroupId(),
- request.getInlongStreamId(),
request.getTransformName());
- if (CollectionUtils.isNotEmpty(entityList)) {
- for (StreamTransformEntity entity : entityList) {
- Integer id = entity.getId();
- entity.setIsDeleted(id);
- entity.setModifier(opInfo.getName());
- int rowCount = transformMapper.updateByIdSelective(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format(
- "transform has already updated with
groupId=%s, streamId=%s, name=%s, curVersion=%s",
- entity.getInlongGroupId(),
entity.getInlongStreamId(),
- entity.getTransformName(),
entity.getVersion()));
- }
- transformFieldMapper.deleteAll(id);
- }
- }
- return true;
- }
-
private List<TransformResponse>
getTransformResponse(List<StreamTransformEntity> entityList) {
List<Integer> transformIds =
entityList.stream().map(StreamTransformEntity::getId).collect(Collectors.toList());
List<StreamTransformFieldEntity> fieldEntities =
transformFieldMapper.selectByTransformIds(transformIds);
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index c8c6c51186..ce2d9175c8 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -112,7 +112,7 @@ public class InlongStreamController {
@ApiImplicitParam(name = "streamId", dataTypeClass = String.class,
required = true)
})
public Response<InlongStreamInfo> get(@RequestParam String groupId,
@RequestParam String streamId) {
- return Response.success(streamService.get(groupId, streamId,
LoginUserUtils.getLoginUser()));
+ return Response.success(streamService.get(groupId, streamId));
}
@RequestMapping(value = "/stream/getBrief", method = RequestMethod.GET)
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 7869758cdb..4fd4eaabb6 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -79,7 +79,7 @@ public class StreamSinkController {
@OperationLog(operation = OperationType.GET, operationTarget =
OperationTarget.SINK)
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required =
true)
public Response<StreamSink> get(@PathVariable Integer id) {
- return Response.success(sinkService.get(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(sinkService.get(id));
}
@RequestMapping(value = "/sink/list", method = RequestMethod.POST)
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index 1f62edd358..3c751a0c6a 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -77,7 +77,7 @@ public class StreamSourceController {
@ApiOperation(value = "Get stream source")
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required =
true)
public Response<StreamSource> get(@PathVariable Integer id) {
- return Response.success(sourceService.get(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(sourceService.get(id));
}
@RequestMapping(value = "/source/list", method = RequestMethod.POST)
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
index ebbfd442ef..0e6224eaee 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
@@ -80,7 +80,7 @@ public class OpenInLongClusterController {
public Response<ClusterTagResponse> getTag(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, "tag
id cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.getTag(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.getTag(id,
LoginUserUtils.getLoginUser().getName()));
}
@PostMapping(value = "/cluster/tag/list")
@@ -99,7 +99,7 @@ public class OpenInLongClusterController {
public Response<Integer> saveTag(@Validated(SaveValidation.class)
@RequestBody ClusterTagRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.saveTag(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.saveTag(request,
LoginUserUtils.getLoginUser().getName()));
}
@PostMapping(value = "/cluster/tag/update")
@@ -108,7 +108,7 @@ public class OpenInLongClusterController {
public Response<Boolean> updateTag(@Validated(UpdateValidation.class)
@RequestBody ClusterTagRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.updateTag(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.updateTag(request,
LoginUserUtils.getLoginUser().getName()));
}
@DeleteMapping(value = "/cluster/tag/delete/{id}")
@@ -118,7 +118,7 @@ public class OpenInLongClusterController {
public Response<Boolean> deleteTag(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, "tag
id cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.deleteTag(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.deleteTag(id,
LoginUserUtils.getLoginUser().getName()));
}
@GetMapping(value = "/cluster/get/{id}")
@@ -127,7 +127,7 @@ public class OpenInLongClusterController {
public Response<ClusterInfo> get(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER,
"cluster id cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.get(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.get(id,
LoginUserUtils.getLoginUser().getName()));
}
@PostMapping(value = "/cluster/list")
@@ -146,7 +146,7 @@ public class OpenInLongClusterController {
public Response<Integer> save(@Validated(SaveValidation.class)
@RequestBody ClusterRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.save(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
@PostMapping(value = "/cluster/update")
@@ -155,7 +155,7 @@ public class OpenInLongClusterController {
public Response<Boolean> update(@Validated(UpdateByIdValidation.class)
@RequestBody ClusterRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.update(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.update(request,
LoginUserUtils.getLoginUser().getName()));
}
@PostMapping(value = "/cluster/bindTag")
@@ -164,7 +164,7 @@ public class OpenInLongClusterController {
public Response<Boolean> bindTag(@Validated @RequestBody BindTagRequest
request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.bindTag(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.bindTag(request,
LoginUserUtils.getLoginUser().getName()));
}
@DeleteMapping(value = "/cluster/delete/{id}")
@@ -183,7 +183,7 @@ public class OpenInLongClusterController {
public Response<ClusterNodeResponse> getNode(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER,
"Cluster node id cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.getNode(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.getNode(id,
LoginUserUtils.getLoginUser().getName()));
}
@PostMapping(value = "/cluster/node/list")
@@ -207,8 +207,7 @@ public class OpenInLongClusterController {
Preconditions.expectNotBlank(inlongGroupId,
ErrorCodeEnum.INVALID_PARAMETER, "inlongGroupId cannot be blank");
Preconditions.expectNotBlank(clusterType,
ErrorCodeEnum.INVALID_PARAMETER, "clusterType cannot be blank");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.listNodeByGroupId(inlongGroupId,
- clusterType, protocolType, LoginUserUtils.getLoginUser()));
+ return
Response.success(clusterService.listNodeByGroupId(inlongGroupId, clusterType,
protocolType));
}
@PostMapping(value = "/cluster/node/save")
@@ -217,7 +216,7 @@ public class OpenInLongClusterController {
public Response<Integer> saveNode(@Validated @RequestBody
ClusterNodeRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.saveNode(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.saveNode(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/cluster/node/update", method =
RequestMethod.POST)
@@ -226,7 +225,7 @@ public class OpenInLongClusterController {
public Response<Boolean> updateNode(@Validated(UpdateValidation.class)
@RequestBody ClusterNodeRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.updateNode(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.updateNode(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/cluster/node/delete/{id}", method =
RequestMethod.DELETE)
@@ -236,7 +235,7 @@ public class OpenInLongClusterController {
public Response<Boolean> deleteNode(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER,
"cluster id cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(clusterService.deleteNode(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(clusterService.deleteNode(id,
LoginUserUtils.getLoginUser().getName()));
}
@PostMapping(value = "/cluster/tenant/tag/save")
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
index e36c48bca0..5dd7f3dab0 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
@@ -69,7 +69,7 @@ public class OpenInLongGroupController {
public Response<InlongGroupInfo> get(@PathVariable String groupId) {
Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER,
"groupId cannot be blank");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(groupService.get(groupId,
LoginUserUtils.getLoginUser()));
+ return Response.success(groupService.get(groupId));
}
@PostMapping(value = "/group/list")
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index 903e730280..41d33d5699 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -72,7 +72,7 @@ public class OpenInLongStreamController {
Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER,
"groupId cannot be blank");
Preconditions.expectNotBlank(streamId,
ErrorCodeEnum.INVALID_PARAMETER, "streamId cannot be blank");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(streamService.get(groupId, streamId,
LoginUserUtils.getLoginUser()));
+ return Response.success(streamService.get(groupId, streamId));
}
@RequestMapping(value = "/stream/getBrief", method = RequestMethod.GET)
@@ -103,7 +103,7 @@ public class OpenInLongStreamController {
public Response<Integer> save(@RequestBody InlongStreamRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(streamService.save(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(streamService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST)
@@ -120,7 +120,7 @@ public class OpenInLongStreamController {
public Response<Boolean> update(@Validated(UpdateValidation.class)
@RequestBody InlongStreamRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(streamService.update(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(streamService.update(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/stream/delete", method = RequestMethod.DELETE)
@@ -134,7 +134,7 @@ public class OpenInLongStreamController {
Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER,
"groupId cannot be blank");
Preconditions.expectNotBlank(streamId,
ErrorCodeEnum.INVALID_PARAMETER, "streamId cannot be blank");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(streamService.delete(groupId, streamId,
LoginUserUtils.getLoginUser()));
+ return Response.success(streamService.delete(groupId, streamId,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/stream/startProcess/{groupId}/{streamId}",
method = RequestMethod.POST)
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index 1d570033df..e999e1ba28 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -64,7 +64,7 @@ public class OpenStreamSinkController {
public Response<StreamSink> get(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER,
"sinkId cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sinkService.get(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(sinkService.get(id));
}
@RequestMapping(value = "/sink/list", method = RequestMethod.POST)
@@ -81,7 +81,7 @@ public class OpenStreamSinkController {
public Response<Integer> save(@Validated @RequestBody SinkRequest request)
{
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sinkService.save(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(sinkService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST)
@@ -97,7 +97,7 @@ public class OpenStreamSinkController {
public Response<Boolean> update(@Validated(UpdateByIdValidation.class)
@RequestBody SinkRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sinkService.update(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(sinkService.update(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE)
@@ -111,6 +111,6 @@ public class OpenStreamSinkController {
@RequestParam(required = false, defaultValue = "false") boolean
startProcess) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER,
"sinkId cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sinkService.delete(id, startProcess,
LoginUserUtils.getLoginUser()));
+ return Response.success(sinkService.delete(id, startProcess,
LoginUserUtils.getLoginUser().getName()));
}
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 64e98a58bc..1a7ad699f3 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -63,7 +63,7 @@ public class OpenStreamSourceController {
public Response<StreamSource> get(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER,
"sourceId cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sourceService.get(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(sourceService.get(id));
}
@RequestMapping(value = "/source/list", method = RequestMethod.POST)
@@ -80,7 +80,7 @@ public class OpenStreamSourceController {
public Response<Integer> save(@Validated(SaveValidation.class)
@RequestBody SourceRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sourceService.save(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(sourceService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/source/batchSave", method = RequestMethod.POST)
@@ -97,7 +97,7 @@ public class OpenStreamSourceController {
public Response<Boolean> update(@Validated(UpdateValidation.class)
@RequestBody SourceRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sourceService.update(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(sourceService.update(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/source/delete/{id}", method =
RequestMethod.DELETE)
@@ -107,7 +107,7 @@ public class OpenStreamSourceController {
public Response<Boolean> delete(@PathVariable Integer id) {
Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER,
"sourceId cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(sourceService.delete(id,
LoginUserUtils.getLoginUser()));
+ return Response.success(sourceService.delete(id,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/source/stop/{id}", method = RequestMethod.POST)
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
index 80d939d5ff..0d890486b3 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
@@ -59,8 +59,7 @@ public class OpenStreamTransformController {
@RequestParam("inlongStreamId") String streamId) {
Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER,
"groupId cannot be blank");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(streamTransformService.listTransform(
- groupId, streamId, LoginUserUtils.getLoginUser()));
+ return Response.success(streamTransformService.listTransform(groupId,
streamId));
}
@RequestMapping(value = "/transform/save", method = RequestMethod.POST)
@@ -70,7 +69,7 @@ public class OpenStreamTransformController {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(
- streamTransformService.save(request,
LoginUserUtils.getLoginUser()));
+ streamTransformService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/transform/update", method = RequestMethod.POST)
@@ -79,7 +78,7 @@ public class OpenStreamTransformController {
public Response<Boolean> update(@Validated(UpdateValidation.class)
@RequestBody TransformRequest request) {
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(streamTransformService.update(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(streamTransformService.update(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/transform/delete", method = RequestMethod.DELETE)
@@ -88,6 +87,6 @@ public class OpenStreamTransformController {
public Response<Boolean> delete(@Validated DeleteTransformRequest request)
{
Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER,
"request cannot be null");
Preconditions.expectNotNull(LoginUserUtils.getLoginUser(),
ErrorCodeEnum.LOGIN_USER_EMPTY);
- return Response.success(streamTransformService.delete(request,
LoginUserUtils.getLoginUser()));
+ return Response.success(streamTransformService.delete(request,
LoginUserUtils.getLoginUser().getName()));
}
}