This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b8da024cde [INLONG-10484][Manager] Refactor code in manager-service 
module (#10485)
b8da024cde is described below

commit b8da024cde85b0d9a0aa74a2d6e8c4d8efbbc796
Author: AloysZhang <[email protected]>
AuthorDate: Sat Jun 22 10:57:58 2024 +0800

    [INLONG-10484][Manager] Refactor code in manager-service module (#10485)
---
 .../service/cluster/InlongClusterService.java      | 120 --------
 .../service/cluster/InlongClusterServiceImpl.java  | 331 ---------------------
 .../manager/service/group/InlongGroupService.java  |   9 -
 .../service/group/InlongGroupServiceImpl.java      |  20 --
 .../manager/service/sink/StreamSinkService.java    |  37 ---
 .../service/sink/StreamSinkServiceImpl.java        | 174 +----------
 .../service/source/StreamSourceService.java        |  36 ---
 .../service/source/StreamSourceServiceImpl.java    | 130 +-------
 .../service/stream/InlongStreamService.java        |  41 ---
 .../service/stream/InlongStreamServiceImpl.java    | 151 ----------
 .../service/transform/StreamTransformService.java  |  38 ---
 .../transform/StreamTransformServiceImpl.java      | 122 --------
 .../web/controller/InlongStreamController.java     |   2 +-
 .../web/controller/StreamSinkController.java       |   2 +-
 .../web/controller/StreamSourceController.java     |   2 +-
 .../openapi/OpenInLongClusterController.java       |  27 +-
 .../openapi/OpenInLongGroupController.java         |   2 +-
 .../openapi/OpenInLongStreamController.java        |   8 +-
 .../openapi/OpenStreamSinkController.java          |   8 +-
 .../openapi/OpenStreamSourceController.java        |   8 +-
 .../openapi/OpenStreamTransformController.java     |   9 +-
 21 files changed, 44 insertions(+), 1233 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 9b7dac0ea2..22282ba67e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -52,15 +52,6 @@ public interface InlongClusterService {
      */
     Integer saveTag(ClusterTagRequest request, String operator);
 
-    /**
-     * Save cluster tag.
-     *
-     * @param request cluster tag
-     * @param opInfo userinfo of operator
-     * @return cluster tag id after saving
-     */
-    Integer saveTag(ClusterTagRequest request, UserInfo opInfo);
-
     /**
      * Get cluster tag by id.
      *
@@ -70,15 +61,6 @@ public interface InlongClusterService {
      */
     ClusterTagResponse getTag(Integer id, String currentUser);
 
-    /**
-     * Get cluster tag by id.
-     *
-     * @param id cluster tag id
-     * @param opInfo userinfo of operator
-     * @return cluster tag info
-     */
-    ClusterTagResponse getTag(Integer id, UserInfo opInfo);
-
     /**
      * Paging query cluster tags according to conditions.
      *
@@ -105,15 +87,6 @@ public interface InlongClusterService {
      */
     Boolean updateTag(ClusterTagRequest request, String operator);
 
-    /**
-     * Update cluster tag.
-     *
-     * @param request cluster tag to be modified
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean updateTag(ClusterTagRequest request, UserInfo opInfo);
-
     /**
      * Delete cluster tag.
      *
@@ -123,15 +96,6 @@ public interface InlongClusterService {
      */
     Boolean deleteTag(Integer id, String operator);
 
-    /**
-     * Delete cluster tag.
-     *
-     * @param id cluster tag id to be deleted
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean deleteTag(Integer id, UserInfo opInfo);
-
     /**
      * Save cluster info.
      *
@@ -141,15 +105,6 @@ public interface InlongClusterService {
      */
     Integer save(ClusterRequest request, String operator);
 
-    /**
-     * Save cluster info.
-     *
-     * @param request inlong cluster info
-     * @param opInfo userinfo of operator
-     * @return cluster id after saving
-     */
-    Integer save(ClusterRequest request, UserInfo opInfo);
-
     /**
      * Get cluster info by id.
      *
@@ -159,15 +114,6 @@ public interface InlongClusterService {
      */
     ClusterInfo get(Integer id, String currentUser);
 
-    /**
-     * Get cluster info by id.
-     *
-     * @param id cluster id
-     * @param opInfo userinfo of operator
-     * @return cluster info
-     */
-    ClusterInfo get(Integer id, UserInfo opInfo);
-
     /**
      * Get one cluster by the cluster tag, cluster name and cluster type.
      *
@@ -214,15 +160,6 @@ public interface InlongClusterService {
      */
     Boolean update(ClusterRequest request, String operator);
 
-    /**
-     * Update cluster information
-     *
-     * @param request cluster info to be modified
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean update(ClusterRequest request, UserInfo opInfo);
-
     /**
      * Update cluster information by unique key
      *
@@ -241,15 +178,6 @@ public interface InlongClusterService {
      */
     Boolean bindTag(BindTagRequest request, String operator);
 
-    /**
-     * Bind or unbind cluster tag for clusters.
-     *
-     * @param request cluster info to be modified
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean bindTag(BindTagRequest request, UserInfo opInfo);
-
     /**
      * Delete cluster information.
      *
@@ -287,15 +215,6 @@ public interface InlongClusterService {
      */
     Integer saveNode(ClusterNodeRequest request, String operator);
 
-    /**
-     * Save cluster node info.
-     *
-     * @param request inlong cluster info
-     * @param opInfo userinfo of operator
-     * @return cluster id after saving
-     */
-    Integer saveNode(ClusterNodeRequest request, UserInfo opInfo);
-
     /**
      * Get cluster node info by id.
      *
@@ -305,15 +224,6 @@ public interface InlongClusterService {
      */
     ClusterNodeResponse getNode(Integer id, String currentUser);
 
-    /**
-     * Get cluster node info by id.
-     *
-     * @param id cluster id
-     * @param opInfo userinfo of operator
-     * @return cluster info
-     */
-    ClusterNodeResponse getNode(Integer id, UserInfo opInfo);
-
     /**
      * Paging query cluster nodes according to conditions.
      *
@@ -342,18 +252,6 @@ public interface InlongClusterService {
      */
     List<ClusterNodeResponse> listNodeByGroupId(String inlongGroupId, String 
clusterType, String protocolType);
 
-    /**
-     * List cluster nodes
-     *
-     * @param inlongGroupId inlong group id
-     * @param clusterType cluster type
-     * @param protocolType protocol type, such as: TCP, HTTP
-     * @param opInfo userinfo of operator
-     * @return cluster node list
-     */
-    List<ClusterNodeResponse> listNodeByGroupId(
-            String inlongGroupId, String clusterType, String protocolType, 
UserInfo opInfo);
-
     /**
      * Query node IP list by cluster type
      *
@@ -371,15 +269,6 @@ public interface InlongClusterService {
      */
     Boolean updateNode(ClusterNodeRequest request, String operator);
 
-    /**
-     * Update cluster node.
-     *
-     * @param request cluster node to be modified
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean updateNode(ClusterNodeRequest request, UserInfo opInfo);
-
     /**
      * Delete cluster node.
      *
@@ -391,15 +280,6 @@ public interface InlongClusterService {
 
     Boolean unloadNode(Integer id, String operator);
 
-    /**
-     * Delete cluster node.
-     *
-     * @param id cluster node id to be deleted
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean deleteNode(Integer id, UserInfo opInfo);
-
     /**
      * Retrieves the SSH public key from the manager node for agent 
installation.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index a85695d6fd..5be7ffd3b1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -193,24 +193,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return entity.getId();
     }
 
-    @Override
-    public Integer saveTag(ClusterTagRequest request, UserInfo opInfo) {
-        // check if the cluster tag already exist
-        InlongClusterTagEntity exist = 
clusterTagMapper.selectByTag(request.getClusterTag());
-        if (exist != null) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("inlong cluster tag [%s] already exist", 
request.getClusterTag()));
-        }
-        InlongClusterTagEntity entity = 
CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
-        request.setExtParams(entity.getExtParams());
-        String extParam = packExtParams(request);
-        entity.setExtParams(extParam);
-        entity.setCreator(opInfo.getName());
-        entity.setModifier(opInfo.getName());
-        clusterTagMapper.insert(entity);
-        return entity.getId();
-    }
-
     @Override
     public ClusterTagResponse getTag(Integer id, String currentUser) {
         Preconditions.expectNotNull(id, "inlong cluster tag id cannot be 
empty");
@@ -233,26 +215,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return response;
     }
 
-    @Override
-    public ClusterTagResponse getTag(Integer id, UserInfo opInfo) {
-        InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
-                    String.format("inlong cluster tag not found by id=%s", 
id));
-        }
-        ClusterTagResponse response = CommonBeanUtils.copyProperties(entity, 
ClusterTagResponse::new);
-        unpackExtParams(response);
-
-        List<String> tenantList = tenantClusterTagMapper
-                .selectByTag(entity.getClusterTag()).stream()
-                .map(TenantClusterTagEntity::getTenant)
-                .collect(Collectors.toList());
-        response.setTenantList(tenantList);
-
-        LOGGER.debug("success to get cluster tag info by id={}", id);
-        return response;
-    }
-
     @Override
     public PageResult<ClusterTagResponse> listTag(ClusterTagPageRequest 
request) {
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
@@ -395,65 +357,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
-    public Boolean updateTag(ClusterTagRequest request, UserInfo opInfo) {
-        InlongClusterTagEntity exist = 
clusterTagMapper.selectById(request.getId());
-        if (exist == null) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
-                    String.format("inlong cluster tag was not exist for 
id=%s", request.getId()));
-        }
-        // check record version
-        Preconditions.expectEquals(exist.getVersion(), request.getVersion(),
-                ErrorCodeEnum.CONFIG_EXPIRED,
-                String.format("record has expired with record version=%d, 
request version=%d",
-                        exist.getVersion(), request.getVersion()));
-        // check if the cluster tag was changed, need to check whether the new 
tag already exists
-        if (StringUtils.isNotEmpty(request.getClusterTag())) {
-            String newClusterTag = request.getClusterTag();
-            String oldClusterTag = exist.getClusterTag();
-            if (!newClusterTag.equals(oldClusterTag)) {
-                InlongClusterTagEntity tagConflict = 
clusterTagMapper.selectByTag(newClusterTag);
-                if (tagConflict != null) {
-                    throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                            String.format("inlong cluster tag [%s] to changed 
already exist", newClusterTag));
-                }
-                // check if there are some InlongGroups that uses this tag
-                List<InlongGroupEntity> usedGroupEntity = 
groupMapper.selectByClusterTag(oldClusterTag);
-                if (CollectionUtils.isNotEmpty(usedGroupEntity)) {
-                    throw new BusinessException(ErrorCodeEnum.RECORD_IN_USED,
-                            String.format("inlong cluster tag [%s] was used by 
inlong group", oldClusterTag));
-                }
-                // update the associated cluster tag in inlong_cluster
-                List<InlongClusterEntity> clusterEntities = 
clusterMapper.selectByKey(oldClusterTag, null, null);
-                if (CollectionUtils.isNotEmpty(clusterEntities)) {
-                    clusterEntities.forEach(entity -> {
-                        Set<String> tagSet = 
Sets.newHashSet(entity.getClusterTags().split(InlongConstants.COMMA));
-                        tagSet.remove(oldClusterTag);
-                        tagSet.add(newClusterTag);
-                        String updateTags = Joiner.on(",").join(tagSet);
-                        entity.setClusterTags(updateTags);
-                        entity.setModifier(opInfo.getName());
-                        if (InlongConstants.AFFECTED_ONE_ROW != 
clusterMapper.updateById(entity)) {
-                            throw new 
BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                                    String.format("cluster has already updated 
with name=%s, type=%s, curVersion=%s",
-                                            entity.getName(), 
entity.getType(), entity.getVersion()));
-                        }
-                    });
-                }
-            }
-        }
-        CommonBeanUtils.copyProperties(request, exist, true);
-        request.setExtParams(exist.getExtParams());
-        String extParams = packExtParams(request);
-        exist.setExtParams(extParams);
-        exist.setModifier(opInfo.getName());
-        if (InlongConstants.AFFECTED_ONE_ROW != 
clusterTagMapper.updateByIdSelective(exist)) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        return true;
-    }
-
     @Override
     public Boolean deleteTag(Integer id, String operator) {
         Preconditions.expectNotNull(id, "cluster tag id cannot be empty");
@@ -496,37 +399,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return true;
     }
 
-    @Override
-    public Boolean deleteTag(Integer id, UserInfo opInfo) {
-        InlongClusterTagEntity exist = clusterTagMapper.selectById(id);
-        if (exist == null || exist.getIsDeleted() > 
InlongConstants.UN_DELETED) {
-            return true;
-        }
-        // check if there are some InlongGroups that uses this tag
-        String clusterTag = exist.getClusterTag();
-        // check if there are some InlongGroups that uses this tag
-        List<InlongGroupEntity> usedGroupEntity = 
groupMapper.selectByClusterTag(clusterTag);
-        if (CollectionUtils.isNotEmpty(usedGroupEntity)) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_IN_USED,
-                    String.format("inlong cluster tag [%s] was used by inlong 
group", clusterTag));
-        }
-        // update the associated cluster tag in inlong_cluster
-        List<InlongClusterEntity> clusterEntities = 
clusterMapper.selectByKey(clusterTag, null, null);
-        if (CollectionUtils.isNotEmpty(clusterEntities)) {
-            clusterEntities.forEach(entity -> {
-                this.removeClusterTag(entity, clusterTag, opInfo.getName());
-            });
-        }
-        exist.setIsDeleted(exist.getId());
-        exist.setModifier(opInfo.getName());
-        if (InlongConstants.AFFECTED_ONE_ROW != 
clusterTagMapper.updateByIdSelective(exist)) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format("cluster tag has already updated with 
name=%s, curVersion=%s",
-                            exist.getClusterTag(), exist.getVersion()));
-        }
-        return true;
-    }
-
     @Override
     public Integer save(ClusterRequest request, String operator) {
         LOGGER.debug("begin to save inlong cluster={}", request);
@@ -556,26 +428,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return id;
     }
 
-    @Override
-    public Integer save(ClusterRequest request, UserInfo opInfo) {
-        // The name cannot be modified and is automatically generated by the 
system
-        String name = request.getName();
-        if (StringUtils.isBlank(name)) {
-            name = UUID.randomUUID().toString();
-            request.setName(name);
-        }
-        // check if the cluster already exist
-        List<InlongClusterEntity> exist = clusterMapper.selectByKey(
-                request.getClusterTags(), request.getName(), 
request.getType());
-        if (CollectionUtils.isNotEmpty(exist)) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("inlong cluster already exist for cluster 
tag=%s name=%s type=%s",
-                            request.getClusterTags(), request.getName(), 
request.getType()));
-        }
-        InlongClusterOperator instance = 
clusterOperatorFactory.getInstance(request.getType());
-        return instance.saveOpt(request, opInfo.getName());
-    }
-
     @Override
     public ClusterInfo get(Integer id, String currentUser) {
         Preconditions.expectNotNull(id, "inlong cluster id cannot be empty");
@@ -591,18 +443,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return clusterInfo;
     }
 
-    @Override
-    public ClusterInfo get(Integer id, UserInfo opInfo) {
-        InlongClusterEntity entity = clusterMapper.selectById(id);
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
-                    String.format("inlong cluster not found by id=%s", id));
-        }
-
-        InlongClusterOperator instance = 
clusterOperatorFactory.getInstance(entity.getType());
-        return instance.getFromEntity(entity);
-    }
-
     @Override
     public PageResult<ClusterInfo> list(ClusterPageRequest request) {
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
@@ -701,29 +541,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return true;
     }
 
-    @Override
-    public Boolean update(ClusterRequest request, UserInfo opInfo) {
-        InlongClusterEntity entity = clusterMapper.selectById(request.getId());
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
-                    String.format("inlong cluster not found by id=%s", 
request.getId()));
-        }
-        // check parameters
-        chkUnmodifiableParams(entity, request);
-        // check whether the cluster already exists
-        List<InlongClusterEntity> exist = clusterMapper.selectByKey(
-                request.getClusterTags(), request.getName(), 
request.getType());
-        if (CollectionUtils.isNotEmpty(exist) && 
!Objects.equals(request.getId(), exist.get(0).getId())) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("inlong cluster already exist for cluster 
tag=%s name=%s type=%s",
-                            request.getClusterTags(), request.getName(), 
request.getType()));
-        }
-        // update record
-        InlongClusterOperator instance = 
clusterOperatorFactory.getInstance(request.getType());
-        instance.updateOpt(request, opInfo.getName());
-        return true;
-    }
-
     @Override
     public UpdateResult updateByKey(ClusterRequest request, String operator) {
         LOGGER.debug("begin to update inlong cluster: {}", request);
@@ -779,42 +596,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return true;
     }
 
-    @Override
-    public Boolean bindTag(BindTagRequest request, UserInfo opInfo) {
-        if (CollectionUtils.isNotEmpty(request.getBindClusters())) {
-            request.getBindClusters().forEach(id -> {
-                InlongClusterEntity entity = clusterMapper.selectById(id);
-                Set<String> tagSet = 
Sets.newHashSet(entity.getClusterTags().split(InlongConstants.COMMA));
-                tagSet.add(request.getClusterTag());
-                String updateTags = Joiner.on(",").join(tagSet);
-                InlongClusterEntity updateEntity = 
clusterMapper.selectById(id);
-                updateEntity.setClusterTags(updateTags);
-                updateEntity.setModifier(opInfo.getName());
-                if (InlongConstants.AFFECTED_ONE_ROW != 
clusterMapper.updateById(updateEntity)) {
-                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                            String.format("cluster has already updated with 
name=%s, type=%s, curVersion=%s",
-                                    updateEntity.getName(), 
updateEntity.getType(), updateEntity.getVersion()));
-                }
-            });
-        }
-        if (CollectionUtils.isNotEmpty(request.getUnbindClusters())) {
-            request.getUnbindClusters().forEach(id -> {
-                InlongClusterEntity entity = clusterMapper.selectById(id);
-                Set<String> tagSet = 
Sets.newHashSet(entity.getClusterTags().split(InlongConstants.COMMA));
-                tagSet.remove(request.getClusterTag());
-                String updateTags = Joiner.on(",").join(tagSet);
-                entity.setClusterTags(updateTags);
-                entity.setModifier(opInfo.getName());
-                if (InlongConstants.AFFECTED_ONE_ROW != 
clusterMapper.updateById(entity)) {
-                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                            String.format("cluster has already updated with 
name=%s, type=%s, curVersion=%s",
-                                    entity.getName(), entity.getType(), 
entity.getVersion()));
-                }
-            });
-        }
-        return true;
-    }
-
     @Override
     public Boolean deleteByKey(String name, String type, String operator) {
         Preconditions.expectNotBlank(name, ErrorCodeEnum.INVALID_PARAMETER, 
"cluster name should not be empty or null");
@@ -915,24 +696,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return id;
     }
 
-    @Override
-    public Integer saveNode(ClusterNodeRequest request, UserInfo opInfo) {
-        // check cluster info
-        InlongClusterEntity entity = 
clusterMapper.selectById(request.getParentId());
-        Preconditions.expectNotNull(entity, ErrorCodeEnum.CLUSTER_NOT_FOUND,
-                String.format("inlong cluster not found by id=%s, or was 
already deleted", request.getParentId()));
-        // check cluster node if exist
-        InlongClusterNodeEntity exist = 
clusterNodeMapper.selectByUniqueKey(request);
-        if (exist != null) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("inlong cluster node already exist for 
type=%s ip=%s port=%s",
-                            request.getType(), request.getIp(), 
request.getPort()));
-        }
-        // add record
-        InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(request.getType());
-        return instance.saveOpt(request, opInfo.getName());
-    }
-
     @Override
     public ClusterNodeResponse getNode(Integer id, String currentUser) {
         Preconditions.expectNotNull(id, "cluster node id cannot be empty");
@@ -945,16 +708,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return instance.getFromEntity(entity);
     }
 
-    @Override
-    public ClusterNodeResponse getNode(Integer id, UserInfo opInfo) {
-        InlongClusterNodeEntity entity = clusterNodeMapper.selectById(id);
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
-        }
-        InlongClusterEntity cluster = 
clusterMapper.selectById(entity.getParentId());
-        return CommonBeanUtils.copyProperties(entity, 
ClusterNodeResponse::new);
-    }
-
     @Override
     public PageResult<ClusterNodeResponse> listNode(ClusterPageRequest 
request, String currentUser) {
         if (StringUtils.isNotBlank(request.getClusterTag())) {
@@ -1024,34 +777,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return result;
     }
 
-    @Override
-    public List<ClusterNodeResponse> listNodeByGroupId(
-            String groupId, String clusterType, String protocolType, UserInfo 
opInfo) {
-        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("inlong group not exists for groupId=%s", 
groupId));
-        }
-        String clusterTag = groupEntity.getInlongClusterTag();
-        if (StringUtils.isBlank(clusterTag)) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND,
-                    String.format("not found any cluster tag for groupId=%s", 
groupId));
-        }
-        List<InlongClusterEntity> clusterList = 
clusterMapper.selectByKey(clusterTag, null, clusterType);
-        if (CollectionUtils.isEmpty(clusterList)) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
-                    String.format("not found any data proxy cluster for 
groupId=%s and clusterTag=%s",
-                            groupId, clusterTag));
-        }
-        // TODO if more than one data proxy cluster, currently takes first
-        List<InlongClusterNodeEntity> nodeEntities =
-                clusterNodeMapper.selectByParentId(clusterList.get(0).getId(), 
protocolType);
-        if (CollectionUtils.isEmpty(nodeEntities)) {
-            return Collections.emptyList();
-        }
-        return CommonBeanUtils.copyListProperties(nodeEntities, 
ClusterNodeResponse::new);
-    }
-
     public List<ClusterNodeResponse> listNodeByClusterTag(ClusterPageRequest 
request) {
         List<InlongClusterEntity> clusterList = 
clusterMapper.selectByKey(request.getClusterTag(), request.getName(),
                 request.getType());
@@ -1122,45 +847,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
-    public Boolean updateNode(ClusterNodeRequest request, UserInfo opInfo) {
-        InlongClusterNodeEntity entity = 
clusterNodeMapper.selectById(request.getId());
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
-                    String.format("cluster node not found by id=%s", 
request.getId()));
-        }
-        // check type
-        Preconditions.expectEquals(entity.getType(), request.getType(),
-                ErrorCodeEnum.INVALID_PARAMETER, "type not allowed modify");
-        // check record version
-        Preconditions.expectEquals(entity.getVersion(), request.getVersion(),
-                ErrorCodeEnum.CONFIG_EXPIRED,
-                String.format("record has expired with record version=%d, 
request version=%d",
-                        entity.getVersion(), request.getVersion()));
-        // check protocol type
-        if (StringUtils.isBlank(request.getProtocolType())) {
-            request.setProtocolType(entity.getProtocolType());
-        }
-        // check wanted cluster node if exist
-        InlongClusterNodeEntity exist = 
clusterNodeMapper.selectByUniqueKey(request);
-        if (exist != null && !Objects.equals(request.getId(), exist.getId())) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    "inlong cluster node already exist for " + request);
-        }
-        // check parent id
-        InlongClusterEntity cluster = 
clusterMapper.selectById(entity.getParentId());
-        if (cluster == null) {
-            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND,
-                    String.format("The cluster to which the node belongs not 
found by clusterId=%s",
-                            request.getParentId()));
-        }
-        // update record
-        InlongClusterNodeOperator instance = 
clusterNodeOperatorFactory.getInstance(request.getType());
-        instance.updateOpt(request, opInfo.getName());
-        return true;
-    }
-
     @Override
     public Boolean deleteNode(Integer id, String operator) {
         Preconditions.expectNotNull(id, "cluster node id cannot be empty");
@@ -1189,23 +875,6 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return isSuccess;
     }
 
-    @Override
-    public Boolean deleteNode(Integer id, UserInfo opInfo) {
-        InlongClusterNodeEntity entity = clusterNodeMapper.selectById(id);
-        Preconditions.expectNotNull(entity, ErrorCodeEnum.CLUSTER_NOT_FOUND);
-        // delete record
-        entity.setIsDeleted(entity.getId());
-        entity.setModifier(opInfo.getName());
-        if (InlongConstants.AFFECTED_ONE_ROW != 
clusterNodeMapper.updateById(entity)) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format(
-                            "cluster node has already updated with 
parentId=%s, type=%s, ip=%s, port=%s, protocolType=%s",
-                            entity.getParentId(), entity.getType(), 
entity.getIp(), entity.getPort(),
-                            entity.getProtocolType()));
-        }
-        return true;
-    }
-
     @Override
     public String getManagerSSHPublicKey() {
         String homeDirectory = System.getProperty("user.home");
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 82844da825..4906e91aa7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -86,15 +86,6 @@ public interface InlongGroupService {
      */
     String getTenant(String groupId, String operator);
 
-    /**
-     * Get inlong group info based on inlong group id
-     *
-     * @param groupId inlong group id
-     * @param opInfo userinfo of operator
-     * @return detail of inlong group
-     */
-    InlongGroupInfo get(String groupId, UserInfo opInfo);
-
     /**
      * Query the group information of each status of the current user
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index e0e7c486c7..19e546938f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -266,26 +266,6 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         return groupInfo;
     }
 
-    @Override
-    public InlongGroupInfo get(String groupId, UserInfo opInfo) {
-        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-
-        // query mq information
-        InlongGroupOperator instance = 
groupOperatorFactory.getInstance(entity.getMqType());
-        InlongGroupInfo groupInfo = instance.getFromEntity(entity);
-        // get all ext info
-        List<InlongGroupExtEntity> extEntityList = 
groupExtMapper.selectByGroupId(groupId);
-        List<InlongGroupExtInfo> extList = 
CommonBeanUtils.copyListProperties(extEntityList, InlongGroupExtInfo::new);
-        groupInfo.setExtList(extList);
-        List<InlongStreamExtEntity> streamExtEntities = 
streamExtMapper.selectByRelatedId(groupId, null);
-        BaseSortConf sortConf = buildSortConfig(streamExtEntities);
-        groupInfo.setSortConf(sortConf);
-        return groupInfo;
-    }
-
     @Override
     public String getTenant(String groupId, String operator) {
         InlongGroupEntity groupEntity = 
groupMapper.selectByGroupIdWithoutTenant(groupId);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 79da89d512..ea46dc3432 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -51,15 +51,6 @@ public interface StreamSinkService {
      */
     Integer save(SinkRequest request, String operator);
 
-    /**
-     * Save the sink info.
-     *
-     * @param request sink request need to save
-     * @param opInfo userinfo of operator
-     * @return sink id after saving
-     */
-    Integer save(SinkRequest request, UserInfo opInfo);
-
     /**
      * Batch save the sink info.
      *
@@ -77,15 +68,6 @@ public interface StreamSinkService {
      */
     StreamSink get(Integer id);
 
-    /**
-     * Get stream sink info based on id.
-     *
-     * @param id sink id
-     * @param opInfo userinfo of operator
-     * @return detail of stream sink info
-     */
-    StreamSink get(Integer id, UserInfo opInfo);
-
     /**
      * List the stream sinks based on inlong group id and inlong stream id.
      *
@@ -151,15 +133,6 @@ public interface StreamSinkService {
      */
     Boolean update(SinkRequest sinkRequest, String operator);
 
-    /**
-     * Modify stream sink info by id.
-     *
-     * @param sinkRequest stream sink request that needs to be modified
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean update(SinkRequest sinkRequest, UserInfo opInfo);
-
     /**
      * Modify stream sink info by key.
      *
@@ -188,16 +161,6 @@ public interface StreamSinkService {
      */
     Boolean delete(Integer id, Boolean startProcess, String operator);
 
-    /**
-     * Delete the stream sink by the given id and sink type.
-     *
-     * @param id stream sink id
-     * @param startProcess whether to start the process after saving or 
updating
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean delete(Integer id, Boolean startProcess, UserInfo opInfo);
-
     /**
      * Delete the stream sink by given group id, stream id, and sink name.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4d7fd556ac..2180400305 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.sink;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
@@ -180,62 +179,6 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         return id;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Integer save(SinkRequest request, UserInfo opInfo) {
-        // check request parameter
-        checkSinkRequestParams(request);
-        InlongGroupEntity entity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
-        }
-        // check group status
-        GroupStatus curState = GroupStatus.forCode(entity.getStatus());
-        if (GroupStatus.notAllowedUpdate(curState)) {
-            throw new 
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
 curState));
-        }
-        // Check whether the stream exist or not
-        InlongStreamEntity streamEntity =
-                streamMapper.selectByIdentifier(request.getInlongGroupId(), 
request.getInlongStreamId());
-        if (streamEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
-        }
-        // Check whether the sink name exists with the same groupId and 
streamId
-        StreamSinkEntity exists = sinkMapper.selectByUniqueKey(
-                request.getInlongGroupId(), request.getInlongStreamId(), 
request.getSinkName());
-        if (exists != null && 
exists.getSinkName().equals(request.getSinkName())) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("sink name=%s already exists with the 
groupId=%s streamId=%s",
-                            request.getSinkName(), request.getInlongGroupId(), 
request.getInlongStreamId()));
-        }
-        // According to the sink type, save sink information
-        StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(request.getSinkType());
-        List<SinkField> fields = request.getSinkFieldList();
-        // Remove id in sinkField when save
-        if (CollectionUtils.isNotEmpty(fields)) {
-            fields.forEach(sinkField -> sinkField.setId(null));
-        }
-        int id = sinkOperator.saveOpt(request, opInfo.getName());
-        boolean streamSuccess = 
StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
-        if (streamSuccess || 
StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
-            boolean enableCreateResource = 
InlongConstants.ENABLE_CREATE_RESOURCE.equals(
-                    request.getEnableCreateResource());
-            SinkStatus nextStatus = request.getStartProcess() ? 
SinkStatus.CONFIG_ING : SinkStatus.NEW;
-            if (!enableCreateResource) {
-                nextStatus = SinkStatus.CONFIG_SUCCESSFUL;
-            }
-            StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
-            sinkEntity.setStatus(nextStatus.getCode());
-            sinkMapper.updateStatus(sinkEntity);
-        }
-        // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the 
[CREATE_STREAM_RESOURCE] process
-        if (streamSuccess && request.getStartProcess()) {
-            this.startProcessForSink(request.getInlongGroupId(), 
request.getInlongStreamId(), opInfo.getName());
-        }
-        return id;
-    }
-
     @Override
     public List<BatchResult> batchSave(List<SinkRequest> requestList, String 
operator) {
         List<BatchResult> resultList = new ArrayList<>();
@@ -269,16 +212,6 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND,
                     String.format("sink not found by id=%s", id));
         }
-        StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(entity.getSinkType());
-        return sinkOperator.getFromEntity(entity);
-    }
-
-    @Override
-    public StreamSink get(Integer id, UserInfo opInfo) {
-        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
-        }
         InlongGroupEntity groupEntity =
                 groupMapper.selectByGroupId(entity.getInlongGroupId());
         if (groupEntity == null) {
@@ -453,63 +386,6 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Boolean update(SinkRequest request, UserInfo opInfo) {
-        if (request.getId() == null) {
-            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
-        }
-        StreamSinkEntity curEntity = 
sinkMapper.selectByPrimaryKey(request.getId());
-        if (curEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
-        }
-        chkUnmodifiableParams(curEntity, request);
-        // check group record
-        InlongGroupEntity curGroupEntity = 
groupMapper.selectByGroupId(curEntity.getInlongGroupId());
-        if (curGroupEntity == null) {
-            throw new 
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", curEntity.getInlongGroupId()));
-        }
-        // Check if group status can be modified
-        GroupStatus curState = GroupStatus.forCode(curEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(curState)) {
-            throw new 
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
 curState));
-        }
-        // Check whether the stream exist or not
-        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
-                request.getInlongGroupId(), request.getInlongStreamId());
-        if (streamEntity == null) {
-            throw new 
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
-                    String.format("stream record not found with the groupId=%s 
streamId=%s",
-                            curEntity.getInlongGroupId(), 
curEntity.getInlongStreamId()));
-        }
-        // Check whether the sink name exists with the same groupId and 
streamId
-        StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(
-                request.getInlongGroupId(), request.getInlongStreamId(), 
request.getSinkName());
-        if (existEntity != null && 
!existEntity.getId().equals(request.getId())) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("sink name=%s already exists with the 
groupId=%s streamId=%s",
-                            request.getSinkName(), request.getInlongGroupId(), 
request.getInlongStreamId()));
-        }
-        // update record
-        SinkStatus nextStatus = null;
-        boolean enableConfig = 
StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus())
-                || 
StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus());
-        if (enableConfig) {
-            boolean enableCreateResource = 
InlongConstants.ENABLE_CREATE_RESOURCE.equals(
-                    request.getEnableCreateResource());
-            nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : 
SinkStatus.CONFIG_SUCCESSFUL;
-        }
-        StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(request.getSinkType());
-        sinkOperator.updateOpt(request, nextStatus, opInfo.getName());
-        // If the stream is [CONFIG_SUCCESSFUL] or [CONFIG_FAILED], then 
asynchronously start the
-        // [CREATE_STREAM_RESOURCE] process
-        if (enableConfig && request.getStartProcess()) {
-            this.startProcessForSink(request.getInlongGroupId(), 
request.getInlongStreamId(), opInfo.getName());
-        }
-        return true;
-    }
-
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public UpdateResult updateByKey(SinkRequest request, String operator) {
@@ -564,34 +440,6 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Boolean delete(Integer id, Boolean startProcess, UserInfo opInfo) {
-        // check stream sink record
-        StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
-        if (sinkEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
-        }
-        // check group record
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(sinkEntity.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", sinkEntity.getInlongGroupId()));
-        }
-        // Check if group status can be modified
-        GroupStatus curState = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(curState)) {
-            throw new 
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
 curState));
-        }
-        // delete record
-        StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(sinkEntity.getSinkType());
-        sinkOperator.deleteOpt(sinkEntity, opInfo.getName());
-        if (startProcess) {
-            this.deleteProcessForSink(sinkEntity.getInlongGroupId(), 
sinkEntity.getInlongStreamId(), opInfo.getName());
-        }
-        return true;
-    }
-
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean deleteByKey(String groupId, String streamId, String 
sinkName,
@@ -638,12 +486,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
                 entity.setIsDeleted(id);
                 entity.setModifier(operator);
                 int rowCount = sinkMapper.updateByIdSelective(entity);
-                if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-                    LOGGER.error("sink has already updated with groupId={}, 
streamId={}, name={}, curVersion={}",
-                            entity.getInlongGroupId(), 
entity.getInlongStreamId(), entity.getSinkName(),
-                            entity.getVersion());
-                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-                }
+                checkAffectRowCount(rowCount, entity);
                 sinkFieldMapper.logicDeleteAll(id);
             });
         }
@@ -652,6 +495,14 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         return true;
     }
 
+    private void checkAffectRowCount(int affectRowCount, StreamSinkEntity 
entity) {
+        if (affectRowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("sink has already updated with groupId={}, 
streamId={}, name={}, curVersion={}",
+                    entity.getInlongGroupId(), entity.getInlongStreamId(), 
entity.getSinkName(), entity.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+    }
+
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean deleteAll(String groupId, String streamId, String operator) 
{
@@ -716,12 +567,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
             entity.setStatus(status);
             entity.setModifier(operator);
             int rowCount = sinkMapper.updateByIdSelective(entity);
-            if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-                LOGGER.error("sink has already updated with groupId={}, 
streamId={}, name={}, curVersion={}",
-                        entity.getInlongGroupId(), entity.getInlongStreamId(), 
entity.getSinkName(),
-                        entity.getVersion());
-                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-            }
+            checkAffectRowCount(rowCount, entity);
         }
 
         LOGGER.info("success to update sink after approve: {}", approveList);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 0a2cfa9239..b622d7a303 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -44,15 +44,6 @@ public interface StreamSourceService {
      */
     Integer save(SourceRequest request, String operator);
 
-    /**
-     * Save the source information
-     *
-     * @param request Source request.
-     * @param opInfo userinfo of operator
-     * @return source id after saving.
-     */
-    Integer save(SourceRequest request, UserInfo opInfo);
-
     /**
      * Batch save the source information
      *
@@ -70,15 +61,6 @@ public interface StreamSourceService {
      */
     StreamSource get(Integer id);
 
-    /**
-     * Query source information based on id
-     *
-     * @param id source id.
-     * @param opInfo userinfo of operator
-     * @return Source info
-     */
-    StreamSource get(Integer id, UserInfo opInfo);
-
     /**
      * Query source information based on inlong group id and inlong stream id.
      *
@@ -135,15 +117,6 @@ public interface StreamSourceService {
      */
     Boolean update(SourceRequest sourceRequest, String operator);
 
-    /**
-     * Modify data source information
-     *
-     * @param sourceRequest Information that needs to be modified
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean update(SourceRequest sourceRequest, UserInfo opInfo);
-
     /**
      * Update source status by the given groupId and streamId
      *
@@ -164,15 +137,6 @@ public interface StreamSourceService {
      */
     Boolean delete(Integer id, String operator);
 
-    /**
-     * Delete the stream source by the given id and source type.
-     *
-     * @param id The primary key of the source.
-     * @param opInfo userinfo of operator
-     * @return Whether succeed
-     */
-    Boolean delete(Integer id, UserInfo opInfo);
-
     /**
      * Force deletes the stream source by groupId and streamId
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index ec4d70f076..f2a263ba82 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.service.source;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
@@ -28,7 +27,6 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
@@ -123,47 +121,6 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         return id;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
-    public Integer save(SourceRequest request, UserInfo opInfo) {
-        // Check if it can be added
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
-        }
-        // get stream information
-        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
-                request.getInlongGroupId(), request.getInlongStreamId());
-        if (streamEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND,
-                    String.format("InlongStream does not exist with 
InlongGroupId=%s, InLongStreamId=%s",
-                            request.getInlongGroupId(), 
request.getInlongStreamId()));
-        }
-        // Check if the record to be added exists
-        List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(
-                request.getInlongGroupId(), request.getInlongStreamId(), 
request.getSourceName());
-        if (CollectionUtils.isNotEmpty(existList)) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("source name=%s already exists with 
groupId=%s streamId=%s",
-                            request.getSourceName(), 
request.getInlongGroupId(), request.getInlongStreamId()));
-        }
-        // check inlong group status
-        GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(status)) {
-            throw new 
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-                    
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
-        }
-        // According to the source type, save source information
-        StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
-        // Remove id in sourceField when save
-        List<StreamField> streamFields = request.getFieldList();
-        if (CollectionUtils.isNotEmpty(streamFields)) {
-            streamFields.forEach(streamField -> streamField.setId(null));
-        }
-        return sourceOperator.saveOpt(request, groupEntity.getStatus(), 
opInfo.getName());
-    }
-
     @Override
     public List<BatchResult> batchSave(List<SourceRequest> requestList, String 
operator) {
         List<BatchResult> resultList = new ArrayList<>();
@@ -203,21 +160,6 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         return streamSource;
     }
 
-    @Override
-    public StreamSource get(Integer id, UserInfo opInfo) {
-        StreamSourceEntity entity = sourceMapper.selectById(id);
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
-                    String.format("source not found by id=%s", id));
-        }
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(entity.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-        StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(entity.getSourceType());
-        return sourceOperator.getFromEntity(entity);
-    }
-
     @Override
     public Integer getCount(String groupId, String streamId) {
         Integer count = sourceMapper.selectCount(groupId, streamId);
@@ -365,33 +307,6 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
-    public Boolean update(SourceRequest request, UserInfo opInfo) {
-        // check request parameter
-        chkUnmodifiableParams(request);
-        // Check if it can be update
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new 
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
-        }
-        // check inlong group status
-        GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(status)) {
-            throw new 
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-                    
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
-        }
-        StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
-        // Remove id in sourceField when save
-        List<StreamField> streamFields = request.getFieldList();
-        if (CollectionUtils.isNotEmpty(streamFields)) {
-            streamFields.forEach(streamField -> streamField.setId(null));
-        }
-        sourceOperator.updateOpt(request, groupEntity.getStatus(), 
groupEntity.getInlongGroupMode(), opInfo.getName());
-        return true;
-    }
-
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
     public Boolean updateStatus(String groupId, String streamId, Integer 
targetStatus, String operator) {
@@ -436,6 +351,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         entity.setPreviousStatus(curStatus.getCode());
         entity.setStatus(nextStatus.getCode());
         entity.setIsDeleted(id);
+        entity.setModifier(operator);
         int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
             LOGGER.error("source has already updated with groupId={}, 
streamId={}, name={}, curVersion={}",
@@ -448,50 +364,6 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
-    public Boolean delete(Integer id, UserInfo opInfo) {
-        StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
-        Preconditions.expectNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
-                ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-
-        // Check if it can be delete
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(entity.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", entity.getInlongGroupId()));
-        }
-        // check record status
-        boolean isTemplateSource = 
CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id));
-        SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
-        SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
-        // if source is frozen|failed|new, or if it is a template source or 
auto push source, delete directly
-        if (curStatus == SourceStatus.SOURCE_STOP || curStatus == 
SourceStatus.SOURCE_FAILED
-                || curStatus == SourceStatus.SOURCE_NEW || isTemplateSource
-                || SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
-            nextStatus = SourceStatus.SOURCE_DISABLE;
-        }
-        if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {
-            throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
-                    String.format("current source status=%s for id=%s is not 
allowed to delete", entity.getStatus(),
-                            entity.getId()));
-        }
-        // delete record
-        entity.setPreviousStatus(curStatus.getCode());
-        entity.setStatus(nextStatus.getCode());
-        entity.setIsDeleted(id);
-        entity.setModifier(opInfo.getName());
-        int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format("source has already updated with groupId=%s, 
streamId=%s, name=%s, curVersion=%s",
-                            entity.getInlongGroupId(), 
entity.getInlongStreamId(), entity.getSourceName(),
-                            entity.getVersion()));
-        }
-        sourceFieldMapper.deleteAll(id);
-        return true;
-    }
-
     @Override
     public Boolean forceDelete(String groupId, String streamId, String 
operator) {
         LOGGER.info("begin to force delete source for groupId={} and 
streamId={} by user={}",
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 689ebe710f..96002d0ebd 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -60,15 +60,6 @@ public interface InlongStreamService {
      */
     List<BatchResult> batchSave(List<InlongStreamRequest> requestList, String 
operator);
 
-    /**
-     * Save inlong stream information.
-     *
-     * @param request Inlong stream information.
-     * @param opInfo userinfo of operator
-     * @return Id after successful save.
-     */
-    Integer save(InlongStreamRequest request, UserInfo opInfo);
-
     /**
      * Query whether the inlong stream ID exists
      *
@@ -87,16 +78,6 @@ public interface InlongStreamService {
      */
     InlongStreamInfo get(String groupId, String streamId);
 
-    /**
-     * Query the details of the specified inlong stream
-     *
-     * @param groupId Inlong group id
-     * @param streamId Inlong stream id
-     * @param opInfo userinfo of operator
-     * @return inlong stream details
-     */
-    InlongStreamInfo get(String groupId, String streamId, UserInfo opInfo);
-
     /**
      * Query the brief of the specified inlong stream
      *
@@ -159,15 +140,6 @@ public interface InlongStreamService {
      */
     Boolean update(InlongStreamRequest request, String operator);
 
-    /**
-     * Update the InlongStream info
-     *
-     * @param request inlong stream info that needs to be modified
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean update(InlongStreamRequest request, UserInfo opInfo);
-
     /**
      * Update the InlongStream - not check the InlongGroup status to which the 
stream belongs.
      *
@@ -190,19 +162,6 @@ public interface InlongStreamService {
      */
     Boolean delete(String groupId, String streamId, String operator);
 
-    /**
-     * Delete the specified inlong stream.
-     * <p/>
-     * When deleting an inlong stream, you need to check whether there are 
some related
-     * stream_sources or stream_sinks
-     *
-     * @param groupId Inlong group id
-     * @param streamId Inlong stream id
-     * @param opInfo userinfo of operator
-     * @return whether succeed
-     */
-    Boolean delete(String groupId, String streamId, UserInfo opInfo);
-
     /**
      * Logically delete all inlong streams under the specified groupId
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 9167ab02c7..46db20a04f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -214,46 +214,6 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         return resultList;
     }
 
-    @Override
-    public Integer save(InlongStreamRequest request, UserInfo opInfo) {
-        InlongGroupEntity entity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-        // Add/modify/delete is not allowed under temporary inlong group status
-        GroupStatus curState = GroupStatus.forCode(entity.getStatus());
-        if (GroupStatus.isTempStatus(curState)) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
-                    String.format("inlong groupId=%s status=%s was not allowed 
to add/update/delete stream",
-                            request.getInlongGroupId(), curState));
-        }
-        // The streamId under the same groupId cannot be repeated
-        Integer count = streamMapper.selectExistByIdentifier(
-                request.getInlongGroupId(), request.getInlongStreamId());
-        if (count >= 1) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_DUPLICATE);
-        }
-        if (StringUtils.isEmpty(request.getMqResource())) {
-            request.setMqResource(request.getInlongStreamId());
-        }
-        // Processing extended attributes
-        String extParams = packExtParams(request);
-        request.setExtParams(extParams);
-        // Processing inlong stream
-        InlongStreamEntity streamEntity = 
CommonBeanUtils.copyProperties(request, InlongStreamEntity::new);
-        streamEntity.setStatus(StreamStatus.NEW.getCode());
-        streamEntity.setCreator(opInfo.getName());
-        streamEntity.setModifier(opInfo.getName());
-        // add record
-        streamMapper.insertSelective(streamEntity);
-        saveField(request.getInlongGroupId(), request.getInlongStreamId(), 
request.getFieldList());
-        List<InlongStreamExtInfo> extList = request.getExtList();
-        if (CollectionUtils.isNotEmpty(extList)) {
-            saveOrUpdateExt(request.getInlongGroupId(), 
request.getInlongStreamId(), extList);
-        }
-        return streamEntity.getId();
-    }
-
     @Override
     public Boolean exist(String groupId, String streamId) {
         Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
@@ -291,33 +251,6 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         return streamInfo;
     }
 
-    @Override
-    public InlongStreamInfo get(String groupId, String streamId, UserInfo 
opInfo) {
-        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-        // get stream information
-        InlongStreamEntity streamEntity = 
streamMapper.selectByIdentifier(groupId, streamId);
-        if (streamEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
-        }
-        InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
-        // Processing extParams
-        unpackExtParams(streamEntity.getExtParams(), streamInfo);
-        // Load fields
-        List<StreamField> streamFields = getStreamFields(groupId, streamId);
-        streamInfo.setFieldList(streamFields);
-        List<InlongStreamExtEntity> extEntities = 
streamExtMapper.selectByRelatedId(groupId, streamId);
-        List<InlongStreamExtInfo> extInfos = 
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
-        streamInfo.setExtList(extInfos);
-        List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
-        streamInfo.setSinkList(sinkList);
-        List<StreamSource> sourceList = sourceService.listSource(groupId, 
streamId);
-        streamInfo.setSourceList(sourceList);
-        return streamInfo;
-    }
-
     @Override
     public InlongStreamBriefInfo getBrief(String groupId, String streamId, 
String operator) {
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
@@ -526,51 +459,6 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         return this.updateWithoutCheck(request, operator);
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Boolean update(InlongStreamRequest request, UserInfo opInfo) {
-        InlongGroupEntity entity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-
-        // Add/modify/delete is not allowed under temporary inlong group status
-        GroupStatus curState = GroupStatus.forCode(entity.getStatus());
-        if (GroupStatus.isTempStatus(curState)) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
-                    String.format("inlong groupId=%s status=%s was not allowed 
to add/update/delete stream",
-                            request.getInlongGroupId(), curState));
-        }
-        // check stream status
-        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
-                request.getInlongGroupId(), request.getInlongStreamId());
-        if (streamEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND,
-                    String.format("inlong stream not found by groupId=%s, 
streamId=%s",
-                            request.getInlongGroupId(), 
request.getInlongStreamId()));
-        }
-        if (!Objects.equals(streamEntity.getVersion(), request.getVersion())) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format("stream has already updated with groupId=%s, 
streamId=%s, curVersion=%s",
-                            streamEntity.getInlongGroupId(), 
streamEntity.getInlongStreamId(), request.getVersion()));
-        }
-        // Processing extended attributes
-        String extParams = packExtParams(request);
-        request.setExtParams(extParams);
-        // update record
-        CommonBeanUtils.copyProperties(request, streamEntity, true);
-        streamEntity.setModifier(opInfo.getName());
-        if (InlongConstants.AFFECTED_ONE_ROW != 
streamMapper.updateByIdentifierSelective(streamEntity)) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        // update stream fields
-        updateField(request.getInlongGroupId(), request.getInlongStreamId(), 
request.getFieldList());
-        // update stream extension infos
-        List<InlongStreamExtInfo> extList = request.getExtList();
-        saveOrUpdateExt(request.getInlongGroupId(), 
request.getInlongStreamId(), extList);
-        return true;
-    }
-
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean updateWithoutCheck(InlongStreamRequest request, String 
operator) {
@@ -663,45 +551,6 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Boolean delete(String groupId, String streamId, UserInfo opInfo) {
-        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-        // Add/modify/delete is not allowed under temporary inlong group status
-        GroupStatus curState = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.isTempStatus(curState)) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
-                    String.format("inlong groupId=%s status=%s was not allowed 
to add/update/delete stream", groupId,
-                            curState));
-        }
-        // Check if steam record exists
-        InlongStreamEntity streamEntity = 
streamMapper.selectByIdentifier(groupId, streamId);
-        if (streamEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
-        }
-        // If there is undeleted stream source, the deletion fails
-        if (sourceService.getCount(groupId, streamId) > 0) {
-            throw new 
BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SOURCE);
-        }
-        // If there is undeleted stream sink, the deletion fails
-        if (sinkService.getCount(groupId, streamId) > 0) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_DELETE_HAS_SINK);
-        }
-        // delete record
-        streamEntity.setIsDeleted(streamEntity.getId());
-        streamEntity.setModifier(opInfo.getName());
-        if (streamMapper.updateByPrimaryKey(streamEntity) != 
InlongConstants.AFFECTED_ONE_ROW) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        // Logically delete the associated field table
-        streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
-        streamExtMapper.logicDeleteAllByRelatedId(groupId, streamId);
-        return true;
-    }
-
     @Transactional(rollbackFor = Throwable.class)
     @Override
     public Boolean logicDeleteAll(String groupId, String operator) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
index f172219c77..b8d10ebb92 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
@@ -40,15 +40,6 @@ public interface StreamTransformService {
      */
     Integer save(TransformRequest request, String operator);
 
-    /**
-     * Save the transform information.
-     *
-     * @param request the transform request
-     * @param opInfo userinfo of operator
-     * @return transform id after saving
-     */
-    Integer save(TransformRequest request, UserInfo opInfo);
-
     /**
      * Query transform information based on inlong group id and inlong stream 
id.
      *
@@ -75,16 +66,6 @@ public interface StreamTransformService {
      */
     List<TransformResponse> listTransform(String groupId, String streamId);
 
-    /**
-     * Query transform information based on inlong group id and inlong stream 
id.
-     *
-     * @param groupId the inlong group id
-     * @param streamId the inlong stream id
-     * @param opInfo userinfo of operator
-     * @return the transform response
-     */
-    List<TransformResponse> listTransform(String groupId, String streamId, 
UserInfo opInfo);
-
     /**
      * Modify data transform information.
      *
@@ -94,15 +75,6 @@ public interface StreamTransformService {
      */
     Boolean update(TransformRequest request, String operator);
 
-    /**
-     * Modify data transform information.
-     *
-     * @param request the transform request
-     * @param opInfo userinfo of operator
-     * @return Whether succeed
-     */
-    Boolean update(TransformRequest request, UserInfo opInfo);
-
     /**
      * Delete the stream transform by the given id.
      *
@@ -111,14 +83,4 @@ public interface StreamTransformService {
      * @return Whether succeed
      */
     Boolean delete(DeleteTransformRequest request, String operator);
-
-    /**
-     * Delete the stream transform by the given id.
-     *
-     * @param request delete request
-     * @param opInfo userinfo of operator
-     * @return Whether succeed
-     */
-    Boolean delete(DeleteTransformRequest request, UserInfo opInfo);
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 512580c9a4..40269450a9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.transform;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -102,40 +101,6 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
         return transformEntity.getId();
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
-    public Integer save(TransformRequest request, UserInfo opInfo) {
-        // Check if it can be added
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
-        }
-        // check inlong group status
-        GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(status)) {
-            throw new 
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-                    
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
-        }
-        // Check if the record to be added exists
-        List<StreamTransformEntity> transformEntities =
-                transformMapper.selectByRelatedId(request.getInlongGroupId(),
-                        request.getInlongStreamId(), 
request.getTransformName());
-        if (CollectionUtils.isNotEmpty(transformEntities)) {
-            throw new BusinessException(ErrorCodeEnum.RECORD_DUPLICATE,
-                    String.format("stream transform already exists with 
groupId=%s, streamId=%s, transformName=%s",
-                            request.getInlongGroupId(), 
request.getInlongStreamId(), request.getTransformName()));
-        }
-        // add record
-        StreamTransformEntity transformEntity =
-                CommonBeanUtils.copyProperties(request, 
StreamTransformEntity::new);
-        transformEntity.setCreator(opInfo.getName());
-        transformEntity.setModifier(opInfo.getName());
-        transformMapper.insert(transformEntity);
-        saveFieldOpt(transformEntity, request.getFieldList());
-        return transformEntity.getId();
-    }
-
     @Override
     public PageResult<TransformResponse> listByCondition(TransformPageRequest 
request, UserInfo opInfo) {
         String groupId = request.getInlongGroupId();
@@ -195,23 +160,6 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
         return getTransformResponse(entityList);
     }
 
-    @Override
-    public List<TransformResponse> listTransform(String groupId, String 
streamId, UserInfo opInfo) {
-        // Check if it can be added
-        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", groupId));
-        }
-        // query result
-        List<StreamTransformEntity> entityList = 
transformMapper.selectByRelatedId(groupId, streamId, null);
-        if (CollectionUtils.isEmpty(entityList)) {
-            return Collections.emptyList();
-        }
-        // get transform data
-        return getTransformResponse(entityList);
-    }
-
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
     public Boolean update(TransformRequest request, String operator) {
@@ -238,38 +186,6 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
-    public Boolean update(TransformRequest request, UserInfo opInfo) {
-        // check request and parameters
-        this.chkUnmodifiableParams(request);
-        // Check if it can be added
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
-        }
-        // check inlong group status
-        GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(status)) {
-            throw new 
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-                    
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
-        }
-        // update record
-        StreamTransformEntity transformEntity =
-                CommonBeanUtils.copyProperties(request, 
StreamTransformEntity::new);
-        transformEntity.setModifier(opInfo.getName());
-        int rowCount = transformMapper.updateByIdSelective(transformEntity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format("transform has already updated with 
groupId=%s, streamId=%s, name=%s, curVersion=%s",
-                            request.getInlongGroupId(), 
request.getInlongStreamId(),
-                            request.getTransformName(), request.getVersion()));
-        }
-        updateFieldOpt(transformEntity, request.getFieldList());
-        return true;
-    }
-
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
     public Boolean delete(DeleteTransformRequest request, String operator) {
@@ -303,44 +219,6 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
         return true;
     }
 
-    @Override
-    @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
-    public Boolean delete(DeleteTransformRequest request, UserInfo opInfo) {
-        // Check if it can be added
-        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
-        }
-        // check inlong group status
-        GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(status)) {
-            throw new 
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
-                    
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
-        }
-        // query records
-        List<StreamTransformEntity> entityList =
-                transformMapper.selectByRelatedId(request.getInlongGroupId(),
-                        request.getInlongStreamId(), 
request.getTransformName());
-        if (CollectionUtils.isNotEmpty(entityList)) {
-            for (StreamTransformEntity entity : entityList) {
-                Integer id = entity.getId();
-                entity.setIsDeleted(id);
-                entity.setModifier(opInfo.getName());
-                int rowCount = transformMapper.updateByIdSelective(entity);
-                if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-                    throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                            String.format(
-                                    "transform has already updated with 
groupId=%s, streamId=%s, name=%s, curVersion=%s",
-                                    entity.getInlongGroupId(), 
entity.getInlongStreamId(),
-                                    entity.getTransformName(), 
entity.getVersion()));
-                }
-                transformFieldMapper.deleteAll(id);
-            }
-        }
-        return true;
-    }
-
     private List<TransformResponse> 
getTransformResponse(List<StreamTransformEntity> entityList) {
         List<Integer> transformIds = 
entityList.stream().map(StreamTransformEntity::getId).collect(Collectors.toList());
         List<StreamTransformFieldEntity> fieldEntities = 
transformFieldMapper.selectByTransformIds(transformIds);
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index c8c6c51186..ce2d9175c8 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -112,7 +112,7 @@ public class InlongStreamController {
             @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, 
required = true)
     })
     public Response<InlongStreamInfo> get(@RequestParam String groupId, 
@RequestParam String streamId) {
-        return Response.success(streamService.get(groupId, streamId, 
LoginUserUtils.getLoginUser()));
+        return Response.success(streamService.get(groupId, streamId));
     }
 
     @RequestMapping(value = "/stream/getBrief", method = RequestMethod.GET)
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 7869758cdb..4fd4eaabb6 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -79,7 +79,7 @@ public class StreamSinkController {
     @OperationLog(operation = OperationType.GET, operationTarget = 
OperationTarget.SINK)
     @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
     public Response<StreamSink> get(@PathVariable Integer id) {
-        return Response.success(sinkService.get(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sinkService.get(id));
     }
 
     @RequestMapping(value = "/sink/list", method = RequestMethod.POST)
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index 1f62edd358..3c751a0c6a 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -77,7 +77,7 @@ public class StreamSourceController {
     @ApiOperation(value = "Get stream source")
     @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
     public Response<StreamSource> get(@PathVariable Integer id) {
-        return Response.success(sourceService.get(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sourceService.get(id));
     }
 
     @RequestMapping(value = "/source/list", method = RequestMethod.POST)
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
index ebbfd442ef..0e6224eaee 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongClusterController.java
@@ -80,7 +80,7 @@ public class OpenInLongClusterController {
     public Response<ClusterTagResponse> getTag(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, "tag 
id cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.getTag(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.getTag(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping(value = "/cluster/tag/list")
@@ -99,7 +99,7 @@ public class OpenInLongClusterController {
     public Response<Integer> saveTag(@Validated(SaveValidation.class) 
@RequestBody ClusterTagRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.saveTag(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.saveTag(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping(value = "/cluster/tag/update")
@@ -108,7 +108,7 @@ public class OpenInLongClusterController {
     public Response<Boolean> updateTag(@Validated(UpdateValidation.class) 
@RequestBody ClusterTagRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.updateTag(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.updateTag(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @DeleteMapping(value = "/cluster/tag/delete/{id}")
@@ -118,7 +118,7 @@ public class OpenInLongClusterController {
     public Response<Boolean> deleteTag(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, "tag 
id cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.deleteTag(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.deleteTag(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @GetMapping(value = "/cluster/get/{id}")
@@ -127,7 +127,7 @@ public class OpenInLongClusterController {
     public Response<ClusterInfo> get(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, 
"cluster id cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.get(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.get(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping(value = "/cluster/list")
@@ -146,7 +146,7 @@ public class OpenInLongClusterController {
     public Response<Integer> save(@Validated(SaveValidation.class) 
@RequestBody ClusterRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.save(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping(value = "/cluster/update")
@@ -155,7 +155,7 @@ public class OpenInLongClusterController {
     public Response<Boolean> update(@Validated(UpdateByIdValidation.class) 
@RequestBody ClusterRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.update(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.update(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping(value = "/cluster/bindTag")
@@ -164,7 +164,7 @@ public class OpenInLongClusterController {
     public Response<Boolean> bindTag(@Validated @RequestBody BindTagRequest 
request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.bindTag(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.bindTag(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @DeleteMapping(value = "/cluster/delete/{id}")
@@ -183,7 +183,7 @@ public class OpenInLongClusterController {
     public Response<ClusterNodeResponse> getNode(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, 
"Cluster node id cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.getNode(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.getNode(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping(value = "/cluster/node/list")
@@ -207,8 +207,7 @@ public class OpenInLongClusterController {
         Preconditions.expectNotBlank(inlongGroupId, 
ErrorCodeEnum.INVALID_PARAMETER, "inlongGroupId cannot be blank");
         Preconditions.expectNotBlank(clusterType, 
ErrorCodeEnum.INVALID_PARAMETER, "clusterType cannot be blank");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.listNodeByGroupId(inlongGroupId,
-                clusterType, protocolType, LoginUserUtils.getLoginUser()));
+        return 
Response.success(clusterService.listNodeByGroupId(inlongGroupId, clusterType, 
protocolType));
     }
 
     @PostMapping(value = "/cluster/node/save")
@@ -217,7 +216,7 @@ public class OpenInLongClusterController {
     public Response<Integer> saveNode(@Validated @RequestBody 
ClusterNodeRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.saveNode(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.saveNode(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/cluster/node/update", method = 
RequestMethod.POST)
@@ -226,7 +225,7 @@ public class OpenInLongClusterController {
     public Response<Boolean> updateNode(@Validated(UpdateValidation.class) 
@RequestBody ClusterNodeRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.updateNode(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.updateNode(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/cluster/node/delete/{id}", method = 
RequestMethod.DELETE)
@@ -236,7 +235,7 @@ public class OpenInLongClusterController {
     public Response<Boolean> deleteNode(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, 
"cluster id cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(clusterService.deleteNode(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(clusterService.deleteNode(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping(value = "/cluster/tenant/tag/save")
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
index e36c48bca0..5dd7f3dab0 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
@@ -69,7 +69,7 @@ public class OpenInLongGroupController {
     public Response<InlongGroupInfo> get(@PathVariable String groupId) {
         Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER, 
"groupId cannot be blank");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(groupService.get(groupId, 
LoginUserUtils.getLoginUser()));
+        return Response.success(groupService.get(groupId));
     }
 
     @PostMapping(value = "/group/list")
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index 903e730280..41d33d5699 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -72,7 +72,7 @@ public class OpenInLongStreamController {
         Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER, 
"groupId cannot be blank");
         Preconditions.expectNotBlank(streamId, 
ErrorCodeEnum.INVALID_PARAMETER, "streamId cannot be blank");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(streamService.get(groupId, streamId, 
LoginUserUtils.getLoginUser()));
+        return Response.success(streamService.get(groupId, streamId));
     }
 
     @RequestMapping(value = "/stream/getBrief", method = RequestMethod.GET)
@@ -103,7 +103,7 @@ public class OpenInLongStreamController {
     public Response<Integer> save(@RequestBody InlongStreamRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(streamService.save(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(streamService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST)
@@ -120,7 +120,7 @@ public class OpenInLongStreamController {
     public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody InlongStreamRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(streamService.update(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(streamService.update(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/stream/delete", method = RequestMethod.DELETE)
@@ -134,7 +134,7 @@ public class OpenInLongStreamController {
         Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER, 
"groupId cannot be blank");
         Preconditions.expectNotBlank(streamId, 
ErrorCodeEnum.INVALID_PARAMETER, "streamId cannot be blank");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(streamService.delete(groupId, streamId, 
LoginUserUtils.getLoginUser()));
+        return Response.success(streamService.delete(groupId, streamId, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/stream/startProcess/{groupId}/{streamId}", 
method = RequestMethod.POST)
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index 1d570033df..e999e1ba28 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -64,7 +64,7 @@ public class OpenStreamSinkController {
     public Response<StreamSink> get(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, 
"sinkId cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sinkService.get(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sinkService.get(id));
     }
 
     @RequestMapping(value = "/sink/list", method = RequestMethod.POST)
@@ -81,7 +81,7 @@ public class OpenStreamSinkController {
     public Response<Integer> save(@Validated @RequestBody SinkRequest request) 
{
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sinkService.save(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sinkService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST)
@@ -97,7 +97,7 @@ public class OpenStreamSinkController {
     public Response<Boolean> update(@Validated(UpdateByIdValidation.class) 
@RequestBody SinkRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sinkService.update(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sinkService.update(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE)
@@ -111,6 +111,6 @@ public class OpenStreamSinkController {
             @RequestParam(required = false, defaultValue = "false") boolean 
startProcess) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, 
"sinkId cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sinkService.delete(id, startProcess, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sinkService.delete(id, startProcess, 
LoginUserUtils.getLoginUser().getName()));
     }
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 64e98a58bc..1a7ad699f3 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -63,7 +63,7 @@ public class OpenStreamSourceController {
     public Response<StreamSource> get(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, 
"sourceId cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sourceService.get(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sourceService.get(id));
     }
 
     @RequestMapping(value = "/source/list", method = RequestMethod.POST)
@@ -80,7 +80,7 @@ public class OpenStreamSourceController {
     public Response<Integer> save(@Validated(SaveValidation.class) 
@RequestBody SourceRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sourceService.save(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sourceService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST)
@@ -97,7 +97,7 @@ public class OpenStreamSourceController {
     public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody SourceRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sourceService.update(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sourceService.update(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/source/delete/{id}", method = 
RequestMethod.DELETE)
@@ -107,7 +107,7 @@ public class OpenStreamSourceController {
     public Response<Boolean> delete(@PathVariable Integer id) {
         Preconditions.expectNotNull(id, ErrorCodeEnum.INVALID_PARAMETER, 
"sourceId cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(sourceService.delete(id, 
LoginUserUtils.getLoginUser()));
+        return Response.success(sourceService.delete(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/source/stop/{id}", method = RequestMethod.POST)
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
index 80d939d5ff..0d890486b3 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
@@ -59,8 +59,7 @@ public class OpenStreamTransformController {
             @RequestParam("inlongStreamId") String streamId) {
         Preconditions.expectNotBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER, 
"groupId cannot be blank");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(streamTransformService.listTransform(
-                groupId, streamId, LoginUserUtils.getLoginUser()));
+        return Response.success(streamTransformService.listTransform(groupId, 
streamId));
     }
 
     @RequestMapping(value = "/transform/save", method = RequestMethod.POST)
@@ -70,7 +69,7 @@ public class OpenStreamTransformController {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(
-                streamTransformService.save(request, 
LoginUserUtils.getLoginUser()));
+                streamTransformService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/transform/update", method = RequestMethod.POST)
@@ -79,7 +78,7 @@ public class OpenStreamTransformController {
     public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody TransformRequest request) {
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(streamTransformService.update(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(streamTransformService.update(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/transform/delete", method = RequestMethod.DELETE)
@@ -88,6 +87,6 @@ public class OpenStreamTransformController {
     public Response<Boolean> delete(@Validated DeleteTransformRequest request) 
{
         Preconditions.expectNotNull(request, ErrorCodeEnum.INVALID_PARAMETER, 
"request cannot be null");
         Preconditions.expectNotNull(LoginUserUtils.getLoginUser(), 
ErrorCodeEnum.LOGIN_USER_EMPTY);
-        return Response.success(streamTransformService.delete(request, 
LoginUserUtils.getLoginUser()));
+        return Response.success(streamTransformService.delete(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 }

Reply via email to