This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 266148ccd [INLONG-4133][Manager] Not pass the type field when querying 
sources and sinks (#4135)
266148ccd is described below

commit 266148ccd336d2de64dbf0e8391a2d030a1c92a9
Author: fuweng11 <[email protected]>
AuthorDate: Tue May 10 22:37:32 2022 +0800

    [INLONG-4133][Manager] Not pass the type field when querying sources and 
sinks (#4135)
    
    * Add the judgment of equal ID in the update
    
    * Delete the requirement that type must be used in paging query
    
    * Delete the type parameter of the delete function
    
    * Change the exception log
    
    * Change the check of the source name
    
    Co-authored-by: healchow <[email protected]>
---
 .../api/impl/DefaultInlongStreamBuilder.java       |  6 +--
 .../manager/client/api/impl/InlongStreamImpl.java  |  6 +--
 .../client/api/inner/InnerInlongManagerClient.java |  8 +---
 .../manager/dao/mapper/StreamSinkEntityMapper.java |  6 ++-
 .../resources/mappers/StreamSinkEntityMapper.xml   | 11 +++--
 .../manager/service/sink/StreamSinkOperation.java  |  7 ++-
 .../manager/service/sink/StreamSinkService.java    |  8 ++--
 .../service/sink/StreamSinkServiceImpl.java        | 52 ++++++++++++++++------
 .../service/sink/ck/ClickHouseSinkOperation.java   |  5 +--
 .../service/sink/hive/HiveSinkOperation.java       |  5 +--
 .../service/sink/iceberg/IcebergSinkOperation.java |  5 +--
 .../service/sink/kafka/KafkaSinkOperation.java     |  5 +--
 .../service/source/AbstractSourceOperation.java    | 33 +++++++++++---
 .../service/source/StreamSourceOperation.java      |  6 +--
 .../service/source/StreamSourceService.java        | 14 +++---
 .../service/source/StreamSourceServiceImpl.java    | 34 ++++++++------
 .../listener/AbstractSourceOperateListener.java    |  2 +-
 .../source/listener/SourceDeleteListener.java      |  2 +-
 .../source/listener/SourceRestartListener.java     |  2 +-
 .../source/listener/SourceStopListener.java        |  2 +-
 .../service/core/impl/AgentServiceTest.java        |  2 +-
 .../core/sink/ClickHouseStreamSinkServiceTest.java |  6 +--
 .../core/sink/HiveStreamSinkServiceTest.java       |  8 ++--
 .../core/sink/IcebergStreamSinkServiceTest.java    |  8 ++--
 .../core/sink/KafkaStreamSinkServiceTest.java      |  6 +--
 .../core/source/StreamSourceServiceTest.java       | 10 ++---
 .../source/listener/DataSourceListenerTest.java    |  5 +--
 .../web/controller/StreamSinkController.java       | 20 +++------
 .../web/controller/StreamSourceController.java     | 20 +++------
 29 files changed, 163 insertions(+), 141 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index c9b0a8fd0..c744ac407 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -212,9 +212,8 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         for (SourceListResponse sourceListResponse : sourceListResponses) {
             final String sourceName = sourceListResponse.getSourceName();
             final int id = sourceListResponse.getId();
-            final String type = sourceListResponse.getSourceType();
             if (sourceRequests.get(sourceName) == null) {
-                boolean isDelete = managerClient.deleteSource(id, type);
+                boolean isDelete = managerClient.deleteSource(id);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete source=%s 
failed", sourceListResponse));
                 }
@@ -251,9 +250,8 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         for (SinkListResponse sinkListResponse : sinkListResponses) {
             final String sinkName = sinkListResponse.getSinkName();
             final int id = sinkListResponse.getId();
-            final String type = sinkListResponse.getSinkType();
             if (sinkRequests.get(sinkName) == null) {
-                boolean isDelete = managerClient.deleteSink(id, type);
+                boolean isDelete = managerClient.deleteSink(id);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete sink=%s 
failed", sinkListResponse));
                 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 11da4a2ef..ebfba5909 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -363,9 +363,8 @@ public class InlongStreamImpl extends InlongStream {
         for (SourceListResponse sourceListResponse : sourceListResponses) {
             final String sourceName = sourceListResponse.getSourceName();
             final int id = sourceListResponse.getId();
-            final String type = sourceListResponse.getSourceType();
             if (this.streamSources.get(sourceName) == null) {
-                boolean isDelete = managerClient.deleteSource(id, type);
+                boolean isDelete = managerClient.deleteSource(id);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete source=%s 
failed", sourceListResponse));
                 }
@@ -400,9 +399,8 @@ public class InlongStreamImpl extends InlongStream {
         for (SinkListResponse sinkListResponse : sinkListResponses) {
             final String sinkName = sinkListResponse.getSinkName();
             final int id = sinkListResponse.getId();
-            final String type = sinkListResponse.getSinkType();
             if (this.streamSinks.get(sinkName) == null) {
-                boolean isDelete = managerClient.deleteSink(id, type);
+                boolean isDelete = managerClient.deleteSink(id);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete sink=%s 
failed", sinkListResponse));
                 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index fa6def6cd..daffd1e4b 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -480,12 +480,10 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public boolean deleteSource(int id, String type) {
+    public boolean deleteSource(int id) {
         AssertUtil.isTrue(id > 0, "sourceId is illegal");
-        AssertUtil.notEmpty(type, "sourceType should not be null");
         final String path = HTTP_PATH + "/source/delete/" + id;
         String url = formatUrl(path);
-        url = String.format("%s&sourceType=%s", url, type);
         RequestBody requestBody = 
RequestBody.create(MediaType.parse("application/json"), "");
         Request request = new Request.Builder()
                 .url(url)
@@ -638,12 +636,10 @@ public class InnerInlongManagerClient {
         }
     }
 
-    public boolean deleteSink(int id, String type) {
+    public boolean deleteSink(int id) {
         AssertUtil.isTrue(id > 0, "sinkId is illegal");
-        AssertUtil.notEmpty(type, "sinkType should not be null");
         final String path = HTTP_PATH + "/sink/delete/" + id;
         String url = formatUrl(path);
-        url = String.format("%s&sinkType=%s", url, type);
         RequestBody requestBody = 
RequestBody.create(MediaType.parse("application/json"), "");
         Request request = new Request.Builder()
                 .url(url)
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 8ec752811..b6f88b833 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.springframework.stereotype.Repository;
@@ -63,9 +63,11 @@ public interface StreamSinkEntityMapper {
      *
      * @param groupId Inlong group id.
      * @param streamId Inlong stream id.
+     * @param sinkName Stream sink name.
      * @return Sink entity list.
      */
-    List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, 
@Param("streamId") String streamId);
+    List<StreamSinkEntity> selectByRelatedId(@Param("groupId") String groupId, 
@Param("streamId") String streamId,
+            @Param("sinkName") String sinkName);
 
     /**
      * According to the group id, stream id and sink type, query valid sink 
entity list.
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index d7b337a3e..bddd60860 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -94,16 +94,16 @@
             <if test="request.status != null and request.status != ''">
                 and status = #{request.status, jdbcType=INTEGER}
             </if>
-            <if test="request.inlongClusterName != null and request.status != 
''">
+            <if test="request.inlongClusterName != null and 
request.inlongClusterName != ''">
                 and inlong_cluster_name = #{request.inlongClusterName, 
jdbcType=VARCHAR}
             </if>
-            <if test="request.dataNodeName != null and request.status != ''">
+            <if test="request.dataNodeName != null and request.dataNodeName != 
''">
                 and data_node_name = #{request.dataNodeName, jdbcType=VARCHAR}
             </if>
-            <if test="request.sortTaskName != null and request.status != ''">
+            <if test="request.sortTaskName != null and request.sortTaskName != 
''">
                 and sort_task_name = #{request.sortTaskName, jdbcType=VARCHAR}
             </if>
-            <if test="request.sortConsumerGroup != null and request.status != 
''">
+            <if test="request.sortConsumerGroup != null and 
request.sortConsumerGroup != ''">
                 and sort_consumer_group = #{request.sortConsumerGroup, 
jdbcType=VARCHAR}
             </if>
             order by modify_time desc
@@ -131,6 +131,9 @@
             <if test="streamId != null and streamId != ''">
                 and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
             </if>
+            <if test="sinkName != null and sinkName != ''">
+                and sink_name = #{sinkName, jdbcType=VARCHAR}
+            </if>
         </where>
     </select>
     <select id="selectByIdAndType" 
resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
index d704c8075..43845b282 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperation.java
@@ -57,13 +57,12 @@ public interface StreamSinkOperation {
     }
 
     /**
-     * Get sink info by sink type and sink id.
+     * Get sink info by the given entity.
      *
-     * @param sinkType Sink type.
-     * @param id Sink id.
+     * @param entity the given entity.
      * @return Sink info.
      */
-    SinkResponse getById(String sinkType, Integer id);
+    SinkResponse getByEntity(StreamSinkEntity entity);
 
     /**
      * Get the target from the given entity.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index b04784b9c..90fdf07de 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -42,13 +42,12 @@ public interface StreamSinkService {
     Integer save(SinkRequest request, String operator);
 
     /**
-     * Query sink information based on id and type.
+     * Query sink information based on id.
      *
      * @param id Sink id.
-     * @param sinkType Sink type.
      * @return Sink info.
      */
-    SinkResponse get(Integer id, String sinkType);
+    SinkResponse get(Integer id);
 
     /**
      * Query sink information based on inlong group id and inlong stream id.
@@ -107,11 +106,10 @@ public interface StreamSinkService {
      * Delete the stream sink by the given id and sink type.
      *
      * @param id The primary key of the sink.
-     * @param sinkType Sink type.
      * @param operator Operator's name.
      * @return Whether succeed
      */
-    Boolean delete(Integer id, String sinkType, String operator);
+    Boolean delete(Integer id, String operator);
 
     /**
      * Logically delete stream sink with the given conditions.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 0ef33faef..000e7cda6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
@@ -52,6 +53,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -96,8 +98,14 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         // Make sure that there is no sink info with the current groupId and 
streamId
         String streamId = request.getInlongStreamId();
         String sinkType = request.getSinkType();
-        List<StreamSinkEntity> sinkExist = 
sinkMapper.selectByIdAndType(groupId, streamId, sinkType);
-        Preconditions.checkEmpty(sinkExist, 
ErrorCodeEnum.SINK_ALREADY_EXISTS.getMessage());
+        String sinkName = request.getSinkName();
+        List<StreamSinkEntity> sinkList = 
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+        for (StreamSinkEntity sinkEntity : sinkList) {
+            if (sinkEntity != null && Objects.equals(sinkEntity.getSinkName(), 
sinkName)) {
+                String err = "sink name=%s already exists with the groupId=%s 
streamId=%s";
+                throw new BusinessException(String.format(err, sinkName, 
groupId, streamId));
+            }
+        }
 
         // According to the sink type, save sink information
         StreamSinkOperation operation = 
operationFactory.getInstance(SinkType.forType(sinkType));
@@ -114,10 +122,17 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     }
 
     @Override
-    public SinkResponse get(Integer id, String sinkType) {
+    public SinkResponse get(Integer id) {
+        Preconditions.checkNotNull(id, "sink id is empty");
+        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+        if (entity == null) {
+            LOGGER.error("sink not found by id={}", id);
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+        }
+        String sinkType = entity.getSinkType();
         StreamSinkOperation operation = 
operationFactory.getInstance(SinkType.forType(sinkType));
-        SinkResponse sinkResponse = operation.getById(sinkType, id);
-        LOGGER.debug("success to get sink by id={}, sinkType={}", id, 
sinkType);
+        SinkResponse sinkResponse = operation.getByEntity(entity);
+        LOGGER.debug("success to get sink info by id={}", id);
         return sinkResponse;
     }
 
@@ -131,12 +146,12 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     @Override
     public List<SinkResponse> listSink(String groupId, String streamId) {
         Preconditions.checkNotNull(groupId, 
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-        List<StreamSinkEntity> entityList = 
sinkMapper.selectByRelatedId(groupId, streamId);
+        List<StreamSinkEntity> entityList = 
sinkMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
         }
         List<SinkResponse> responseList = new ArrayList<>();
-        entityList.forEach(entity -> responseList.add(this.get(entity.getId(), 
entity.getSinkType())));
+        entityList.forEach(entity -> 
responseList.add(this.get(entity.getId())));
 
         LOGGER.debug("success to list sink by groupId={}, streamId={}", 
groupId, streamId);
         return responseList;
@@ -157,6 +172,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     @Override
     public PageInfo<? extends SinkListResponse> 
listByCondition(SinkPageRequest request) {
         Preconditions.checkNotNull(request.getInlongGroupId(), 
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         List<StreamSinkEntity> entityPage = 
sinkMapper.selectByCondition(request);
         Map<SinkType, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -187,10 +203,20 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
 
         // Check if it can be modified
         String groupId = request.getInlongGroupId();
-        InlongGroupEntity groupEntity = 
commonOperateService.checkGroupStatus(groupId, operator);
-
         String streamId = request.getInlongStreamId();
+        String sinkName = request.getSinkName();
         String sinkType = request.getSinkType();
+        InlongGroupEntity groupEntity = 
commonOperateService.checkGroupStatus(groupId, operator);
+
+        // Check whether the sink name exists with the same groupId and 
streamId
+        List<StreamSinkEntity> sinkList = 
sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
+        for (StreamSinkEntity entity : sinkList) {
+            Integer sinkId = entity.getId();
+            if (!Objects.equals(request.getId(), sinkId) && 
Objects.equals(entity.getSinkName(), sinkName)) {
+                String err = "sink name=%s already exists with the groupId=%s 
streamId=%s";
+                throw new BusinessException(String.format(err, sinkName, 
groupId, streamId));
+            }
+        }
 
         StreamSinkOperation operation = 
operationFactory.getInstance(SinkType.forType(sinkType));
         operation.updateOpt(request, operator);
@@ -217,8 +243,8 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public Boolean delete(Integer id, String sinkType, String operator) {
-        LOGGER.info("begin to delete sink by id={}, sinkType={}", id, 
sinkType);
+    public Boolean delete(Integer id, String operator) {
+        LOGGER.info("begin to delete sink by id={}", id);
         Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
         // Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
 
@@ -249,7 +275,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         commonOperateService.checkGroupStatus(groupId, operator);
 
         Date now = new Date();
-        List<StreamSinkEntity> entityList = 
sinkMapper.selectByRelatedId(groupId, streamId);
+        List<StreamSinkEntity> entityList = 
sinkMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isNotEmpty(entityList)) {
             entityList.forEach(entity -> {
                 Integer id = entity.getId();
@@ -278,7 +304,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         // Check if it can be deleted
         commonOperateService.checkGroupStatus(groupId, operator);
 
-        List<StreamSinkEntity> entityList = 
sinkMapper.selectByRelatedId(groupId, streamId);
+        List<StreamSinkEntity> entityList = 
sinkMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isNotEmpty(entityList)) {
             entityList.forEach(entity -> {
                 sinkMapper.deleteByPrimaryKey(entity.getId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
index f5e9f907b..a29d34da4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
@@ -138,15 +138,14 @@ public class ClickHouseSinkOperation implements 
StreamSinkOperation {
     }
 
     @Override
-    public SinkResponse getById(@NotNull String sinkType, @NotNull Integer id) 
{
-        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+    public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
                 String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), 
SinkType.SINK_CLICKHOUSE, existType));
 
         SinkResponse response = this.getFromEntity(entity, 
ClickHouseSinkResponse::new);
-        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(id);
+        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(entity.getId());
         List<SinkFieldResponse> infos = 
CommonBeanUtils.copyListProperties(entities,
                 SinkFieldResponse::new);
         response.setFieldList(infos);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
index 01eeec022..5da3a1322 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
@@ -138,15 +138,14 @@ public class HiveSinkOperation implements 
StreamSinkOperation {
     }
 
     @Override
-    public SinkResponse getById(@NotNull String sinkType, @NotNull Integer id) 
{
-        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+    public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
                 String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), 
SinkType.SINK_HIVE, existType));
 
         SinkResponse response = this.getFromEntity(entity, 
HiveSinkResponse::new);
-        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(id);
+        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(entity.getId());
         List<SinkFieldResponse> infos = 
CommonBeanUtils.copyListProperties(entities,
                 SinkFieldResponse::new);
         response.setFieldList(infos);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
index 67be53419..f98798b48 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
@@ -132,15 +132,14 @@ public class IcebergSinkOperation implements 
StreamSinkOperation {
     }
 
     @Override
-    public SinkResponse getById(String sinkType, Integer id) {
-        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+    public SinkResponse getByEntity(StreamSinkEntity entity) {
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
                 String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), 
SinkType.SINK_ICEBERG, existType));
 
         SinkResponse response = this.getFromEntity(entity, 
IcebergSinkResponse::new);
-        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(id);
+        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(entity.getId());
         List<SinkFieldResponse> infos = 
CommonBeanUtils.copyListProperties(entities,
                 SinkFieldResponse::new);
         response.setFieldList(infos);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
index 23e8a4b2e..0e9e041ff 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
@@ -136,14 +136,13 @@ public class KafkaSinkOperation implements 
StreamSinkOperation {
     }
 
     @Override
-    public SinkResponse getById(@NotNull String sinkType, @NotNull Integer id) 
{
-        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
+    public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
         Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
                 String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), 
SinkType.SINK_KAFKA, existType));
         SinkResponse response = this.getFromEntity(entity, 
KafkaSinkResponse::new);
-        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(id);
+        List<StreamSinkFieldEntity> entities = 
sinkFieldMapper.selectBySinkId(entity.getId());
         List<SinkFieldResponse> infos = 
CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
         response.setFieldList(infos);
         return response;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index 6382e6c7d..bcbce83f4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -44,6 +44,7 @@ import javax.validation.constraints.NotNull;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Default operation of stream source.
@@ -86,8 +87,8 @@ public abstract class AbstractSourceOperation implements 
StreamSourceOperation {
         String sourceName = request.getSourceName();
         List<StreamSourceEntity> existList = 
sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
         if (CollectionUtils.isNotEmpty(existList)) {
-            String err = "stream source already exists with groupId=%s, 
streamId=%s, sourceName=%s";
-            throw new BusinessException(String.format(err, groupId, streamId, 
sourceName));
+            String err = "source name=%s already exists with groupId=%s 
streamId=%s";
+            throw new BusinessException(String.format(err, sourceName, 
groupId, streamId));
         }
 
         StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, 
StreamSourceEntity::new);
@@ -112,15 +113,14 @@ public abstract class AbstractSourceOperation implements 
StreamSourceOperation {
 
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.NOT_SUPPORTED)
-    public SourceResponse getById(@NotNull Integer id) {
-        StreamSourceEntity entity = sourceMapper.selectById(id);
+    public SourceResponse getByEntity(@NotNull StreamSourceEntity entity) {
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
                 String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), 
getSourceType(), existType));
 
         SourceResponse sourceResponse = this.getFromEntity(entity, 
this::getResponse);
-        List<StreamSourceFieldEntity> sourceFieldEntities = 
sourceFieldMapper.selectBySourceId(id);
+        List<StreamSourceFieldEntity> sourceFieldEntities = 
sourceFieldMapper.selectBySourceId(entity.getId());
         List<InlongStreamFieldInfo> fieldInfos = 
CommonBeanUtils.copyListProperties(sourceFieldEntities,
                 InlongStreamFieldInfo::new);
         sourceResponse.setFieldList(fieldInfos);
@@ -133,9 +133,28 @@ public abstract class AbstractSourceOperation implements 
StreamSourceOperation {
         StreamSourceEntity entity = 
sourceMapper.selectByIdForUpdate(request.getId());
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) {
-            throw new BusinessException(String.format("Source=%s is not 
allowed to update, "
+            throw new BusinessException(String.format("source=%s is not 
allowed to update, "
                     + "please wait until its changed to final status or stop / 
frozen / delete it firstly", entity));
         }
+
+        // Source type cannot be changed
+        if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
+            throw new BusinessException(String.format("source type=%s cannot 
change to %s",
+                    entity.getSourceType(), request.getSourceType()));
+        }
+
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sourceName = request.getSourceName();
+        List<StreamSourceEntity> sourceList = 
sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
+        for (StreamSourceEntity sourceEntity : sourceList) {
+            Integer sourceId = sourceEntity.getId();
+            if (!Objects.equals(sourceId, request.getId())) {
+                String err = "source name=%s already exists with the 
groupId=%s streamId=%s";
+                throw new BusinessException(String.format(err, sourceName, 
groupId, streamId));
+            }
+        }
+
         // Setting updated parameters of stream source entity.
         setTargetEntity(request, entity);
         entity.setVersion(entity.getVersion() + 1);
@@ -153,7 +172,7 @@ public abstract class AbstractSourceOperation implements 
StreamSourceOperation {
         SourceStatus curState = SourceStatus.forCode(existEntity.getStatus());
         SourceStatus nextState = SourceStatus.TO_BE_ISSUED_FROZEN;
         if (!SourceStatus.isAllowedTransition(curState, nextState)) {
-            throw new BusinessException(String.format("Source=%s is not 
allowed to stop", existEntity));
+            throw new BusinessException(String.format("source=%s is not 
allowed to stop", existEntity));
         }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, 
StreamSourceEntity::new);
         curEntity.setVersion(existEntity.getVersion() + 1);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
index c47e9e3e9..c17de31ce 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperation.java
@@ -48,12 +48,12 @@ public interface StreamSourceOperation {
     Integer saveOpt(SourceRequest request, Integer groupStatus, String 
operator);
 
     /**
-     * Get source info by source id.
+     * Get source info by the given entity.
      *
-     * @param id Source id.
+     * @param entity Get field value from the entity.
      * @return Source info.
      */
-    SourceResponse getById(Integer id);
+    SourceResponse getByEntity(StreamSourceEntity entity);
 
     /**
      * Get the target from the given entity.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 2cd3de8ac..ade75d69a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -40,13 +40,12 @@ public interface StreamSourceService {
     Integer save(SourceRequest request, String operator);
 
     /**
-     * Query source information based on id and type.
+     * Query source information based on id
      *
      * @param id source id.
-     * @param sourceType Source type.
      * @return Source info
      */
-    SourceResponse get(Integer id, String sourceType);
+    SourceResponse get(Integer id);
 
     /**
      * Query source information based on inlong group id and inlong stream id.
@@ -98,31 +97,28 @@ public interface StreamSourceService {
      * Delete the stream source by the given id and source type.
      *
      * @param id The primary key of the source.
-     * @param sourceType Source type.
      * @param operator Operator's name
      * @return Whether succeed
      */
-    boolean delete(Integer id, String sourceType, String operator);
+    boolean delete(Integer id, String operator);
 
     /**
      * Delete the stream source by the given id and source type.
      *
      * @param id The primary key of the source.
-     * @param sourceType Source type.
      * @param operator Operator's name
      * @return Whether succeed
      */
-    boolean restart(Integer id, String sourceType, String operator);
+    boolean restart(Integer id, String operator);
 
     /**
      * Delete the stream source by the given id and source type.
      *
      * @param id The primary key of the source.
-     * @param sourceType Source type.
      * @param operator Operator's name
      * @return Whether succeed
      */
-    boolean stop(Integer id, String sourceType, String operator);
+    boolean stop(Integer id, String operator);
 
     /**
      * Logically delete stream source with the given conditions.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 2fe1a96b4..b2472233b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -87,9 +88,15 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     }
 
     @Override
-    public SourceResponse get(Integer id, String sourceType) {
-        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(sourceType));
-        SourceResponse sourceResponse = operation.getById(id);
+    public SourceResponse get(Integer id) {
+        Preconditions.checkNotNull(id, "source id is empty");
+        StreamSourceEntity entity = sourceMapper.selectById(id);
+        if (entity == null) {
+            LOGGER.error("source not found by id={}", id);
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND);
+        }
+        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
+        SourceResponse sourceResponse = operation.getByEntity(entity);
         LOGGER.debug("success to get source by id={}", id);
         return sourceResponse;
     }
@@ -109,7 +116,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
             return Collections.emptyList();
         }
         List<SourceResponse> responseList = new ArrayList<>();
-        entityList.forEach(entity -> responseList.add(this.get(entity.getId(), 
entity.getSourceType())));
+        entityList.forEach(entity -> 
responseList.add(this.get(entity.getId())));
 
         LOGGER.debug("success to list source by groupId={}, streamId={}", 
groupId, streamId);
         return responseList;
@@ -118,6 +125,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Override
     public PageInfo<? extends SourceListResponse> 
listByCondition(SourcePageRequest request) {
         Preconditions.checkNotNull(request.getInlongGroupId(), 
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         List<StreamSourceEntity> entityList = 
sourceMapper.selectByCondition(request);
 
@@ -175,15 +183,15 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW,
             isolation = Isolation.READ_COMMITTED)
-    public boolean delete(Integer id, String sourceType, String operator) {
-        LOGGER.info("begin to delete source by id={}, sourceType={}", id, 
sourceType);
+    public boolean delete(Integer id, String operator) {
+        LOGGER.info("begin to delete source by id={}", id);
         Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
 
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         commonOperateService.checkGroupStatus(entity.getInlongGroupId(), 
operator);
 
-        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(sourceType));
+        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
         SourceRequest sourceRequest = new SourceRequest();
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
         operation.deleteOpt(sourceRequest, operator);
@@ -195,13 +203,13 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW,
             isolation = Isolation.READ_COMMITTED)
-    public boolean restart(Integer id, String sourceType, String operator) {
-        LOGGER.info("begin to restart source by id={}, sourceType={}", id, 
sourceType);
+    public boolean restart(Integer id, String operator) {
+        LOGGER.info("begin to restart source by id={}", id);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         commonOperateService.checkGroupStatus(entity.getInlongGroupId(), 
operator);
 
-        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(sourceType));
+        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
         SourceRequest sourceRequest = new SourceRequest();
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
         operation.restartOpt(sourceRequest, operator);
@@ -213,13 +221,13 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW,
             isolation = Isolation.READ_COMMITTED)
-    public boolean stop(Integer id, String sourceType, String operator) {
-        LOGGER.info("begin to stop source by id={}, sourceType={}", id, 
sourceType);
+    public boolean stop(Integer id, String operator) {
+        LOGGER.info("begin to stop source by id={}", id);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         commonOperateService.checkGroupStatus(entity.getInlongGroupId(), 
operator);
 
-        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(sourceType));
+        StreamSourceOperation operation = 
operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
         SourceRequest sourceRequest = new SourceRequest();
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
         operation.stopOpt(sourceRequest, operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
index 9674fcfe4..2e77e0e23 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
@@ -110,7 +110,7 @@ public abstract class AbstractSourceOperateListener 
implements DataSourceOperate
             } else {
                 log.warn("StreamSource={} cannot be operated for state={}", 
sourceResponse, sourceStatus);
                 TimeUnit.SECONDS.sleep(5);
-                sourceResponse = 
streamSourceService.get(sourceResponse.getId(), sourceResponse.getSourceType());
+                sourceResponse = 
streamSourceService.get(sourceResponse.getId());
             }
         }
         SourceStatus sourceStatus = 
SourceStatus.forCode(sourceResponse.getStatus());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
index 80b7b2420..651545d07 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceDeleteListener.java
@@ -32,6 +32,6 @@ public class SourceDeleteListener extends 
AbstractSourceOperateListener {
 
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String 
operator) {
-        streamSourceService.delete(sourceRequest.getId(), 
sourceRequest.getSourceType(), operator);
+        streamSourceService.delete(sourceRequest.getId(), operator);
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
index baa04f246..dab7b43c5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceRestartListener.java
@@ -30,6 +30,6 @@ public class SourceRestartListener extends 
AbstractSourceOperateListener {
 
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String 
operator) {
-        streamSourceService.restart(sourceRequest.getId(), 
sourceRequest.getSourceType(), operator);
+        streamSourceService.restart(sourceRequest.getId(), operator);
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
index c675c8f89..369f0a236 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/SourceStopListener.java
@@ -30,6 +30,6 @@ public class SourceStopListener extends 
AbstractSourceOperateListener {
 
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String 
operator) {
-        streamSourceService.stop(sourceRequest.getId(), 
sourceRequest.getSourceType(), operator);
+        streamSourceService.stop(sourceRequest.getId(), operator);
     }
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index e5cb74238..3723ce21e 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -70,7 +70,7 @@ public class AgentServiceTest extends ServiceBaseTest {
         Boolean result = agentService.reportSnapshot(request);
         Assert.assertTrue(result);
 
-        sourceService.delete(id, SourceType.BINLOG.getType(), GLOBAL_OPERATOR);
+        sourceService.delete(id, GLOBAL_OPERATOR);
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
index 6203d4634..d53c95b77 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
@@ -67,14 +67,14 @@ public class ClickHouseStreamSinkServiceTest extends 
ServiceBaseTest {
     }
 
     public void deleteKafkaSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, SinkType.SINK_CLICKHOUSE, 
globalOperator);
+        boolean result = sinkService.delete(sinkId, globalOperator);
         Assert.assertTrue(result);
     }
 
     @Test
     public void testListByIdentifier() {
         Integer sinkId = this.saveSink("default1");
-        SinkResponse sink = sinkService.get(sinkId, SinkType.SINK_CLICKHOUSE);
+        SinkResponse sink = sinkService.get(sinkId);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
         deleteKafkaSink(sinkId);
     }
@@ -82,7 +82,7 @@ public class ClickHouseStreamSinkServiceTest extends 
ServiceBaseTest {
     @Test
     public void testGetAndUpdate() {
         Integer sinkId = this.saveSink("default2");
-        SinkResponse response = sinkService.get(sinkId, 
SinkType.SINK_CLICKHOUSE);
+        SinkResponse response = sinkService.get(sinkId);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         ClickHouseSinkResponse kafkaSinkResponse = (ClickHouseSinkResponse) 
response;
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
index 049ddf425..e8dae7372 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
@@ -62,7 +62,7 @@ public class HiveStreamSinkServiceTest extends 
ServiceBaseTest {
         Integer id = this.saveSink();
         Assert.assertNotNull(id);
 
-        boolean result = sinkService.delete(id, SinkType.SINK_HIVE, 
globalOperator);
+        boolean result = sinkService.delete(id, globalOperator);
         Assert.assertTrue(result);
     }
 
@@ -70,16 +70,16 @@ public class HiveStreamSinkServiceTest extends 
ServiceBaseTest {
     public void testListByIdentifier() {
         Integer id = this.saveSink();
 
-        SinkResponse sink = sinkService.get(id, SinkType.SINK_HIVE);
+        SinkResponse sink = sinkService.get(id);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
 
-        sinkService.delete(id, SinkType.SINK_HIVE, globalOperator);
+        sinkService.delete(id, globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSink();
-        SinkResponse response = sinkService.get(id, SinkType.SINK_HIVE);
+        SinkResponse response = sinkService.get(id);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         HiveSinkResponse hiveResponse = (HiveSinkResponse) response;
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
index 92b118381..9bbe253b2 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
@@ -59,22 +59,22 @@ public class IcebergStreamSinkServiceTest extends 
ServiceBaseTest {
     public void testSaveAndDelete() {
         Integer id = this.saveSink("default1");
         Assert.assertNotNull(id);
-        boolean result = sinkService.delete(id, SinkType.SINK_ICEBERG, 
globalOperator);
+        boolean result = sinkService.delete(id, globalOperator);
         Assert.assertTrue(result);
     }
 
     @Test
     public void testListByIdentifier() {
         Integer id = this.saveSink("default2");
-        SinkResponse sink = sinkService.get(id, SinkType.SINK_ICEBERG);
+        SinkResponse sink = sinkService.get(id);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
-        sinkService.delete(id, SinkType.SINK_ICEBERG, globalOperator);
+        sinkService.delete(id, globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSink("default3");
-        SinkResponse response = sinkService.get(id, SinkType.SINK_ICEBERG);
+        SinkResponse response = sinkService.get(id);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         IcebergSinkResponse icebergSinkResponse = (IcebergSinkResponse) 
response;
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
index c77d0e2e3..4f7079e61 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
@@ -64,14 +64,14 @@ public class KafkaStreamSinkServiceTest extends 
ServiceBaseTest {
     }
 
     public void deleteKafkaSink(Integer kafkaSinkId) {
-        boolean result = sinkService.delete(kafkaSinkId, SinkType.SINK_KAFKA, 
globalOperator);
+        boolean result = sinkService.delete(kafkaSinkId, globalOperator);
         Assert.assertTrue(result);
     }
 
     @Test
     public void testListByIdentifier() {
         Integer kafkaSinkId = this.saveSink("default1");
-        SinkResponse sink = sinkService.get(kafkaSinkId, SinkType.SINK_KAFKA);
+        SinkResponse sink = sinkService.get(kafkaSinkId);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
         deleteKafkaSink(kafkaSinkId);
     }
@@ -79,7 +79,7 @@ public class KafkaStreamSinkServiceTest extends 
ServiceBaseTest {
     @Test
     public void testGetAndUpdate() {
         Integer kafkaSinkId = this.saveSink("default2");
-        SinkResponse response = sinkService.get(kafkaSinkId, 
SinkType.SINK_KAFKA);
+        SinkResponse response = sinkService.get(kafkaSinkId);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         KafkaSinkResponse kafkaSinkResponse = (KafkaSinkResponse) response;
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index 10682ca24..dff9e8f04 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -60,7 +60,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         Integer id = this.saveSource();
         Assert.assertNotNull(id);
 
-        boolean result = sourceService.delete(id, SourceType.BINLOG.getType(), 
globalOperator);
+        boolean result = sourceService.delete(id, globalOperator);
         Assert.assertTrue(result);
     }
 
@@ -68,16 +68,16 @@ public class StreamSourceServiceTest extends 
ServiceBaseTest {
     public void testListByIdentifier() {
         Integer id = this.saveSource();
 
-        SourceResponse source = sourceService.get(id, 
SourceType.BINLOG.getType());
+        SourceResponse source = sourceService.get(id);
         Assert.assertEquals(globalGroupId, source.getInlongGroupId());
 
-        sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
+        sourceService.delete(id, globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSource();
-        SourceResponse response = sourceService.get(id, 
SourceType.BINLOG.getType());
+        SourceResponse response = sourceService.get(id);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -86,7 +86,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         boolean result = sourceService.update(request, globalOperator);
         Assert.assertTrue(result);
 
-        sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
+        sourceService.delete(id, globalOperator);
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 29d938b13..bf15aaf8e 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -21,7 +21,6 @@ import 
org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
@@ -88,7 +87,7 @@ public class DataSourceListenerTest extends 
WorkflowServiceImplTest {
         WorkflowProcess process = context.getProcess();
         WorkflowTask task = process.getTaskByName("stopSource");
         Assert.assertTrue(task instanceof ServiceTask);
-        SourceResponse sourceResponse = streamSourceService.get(sourceId, 
SourceType.BINLOG.toString());
+        SourceResponse sourceResponse = streamSourceService.get(sourceId);
         Assert.assertSame(SourceStatus.forCode(sourceResponse.getStatus()), 
SourceStatus.TO_BE_ISSUED_FROZEN);
     }
 
@@ -119,7 +118,7 @@ public class DataSourceListenerTest extends 
WorkflowServiceImplTest {
         WorkflowProcess process = context.getProcess();
         WorkflowTask task = process.getTaskByName("restartSource");
         Assert.assertTrue(task instanceof ServiceTask);
-        SourceResponse sourceResponse = streamSourceService.get(sourceId, 
SourceType.BINLOG.toString());
+        SourceResponse sourceResponse = streamSourceService.get(sourceId);
         Assert.assertSame(SourceStatus.forCode(sourceResponse.getStatus()), 
SourceStatus.SOURCE_NORMAL);
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 5f6314a36..1930fd71a 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.web.controller;
 import com.github.pagehelper.PageInfo;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.OperationType;
@@ -37,7 +36,6 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -60,12 +58,9 @@ public class StreamSinkController {
 
     @RequestMapping(value = "/get/{id}", method = RequestMethod.GET)
     @ApiOperation(value = "Query sink information")
-    @ApiImplicitParams({
-            @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, 
required = true),
-            @ApiImplicitParam(name = "sinkType", dataTypeClass = String.class, 
required = true)
-    })
-    public Response<SinkResponse> get(@PathVariable Integer id, @RequestParam 
String sinkType) {
-        return Response.success(sinkService.get(id, sinkType));
+    @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
+    public Response<SinkResponse> get(@PathVariable Integer id) {
+        return Response.success(sinkService.get(id));
     }
 
     @RequestMapping(value = "/list", method = RequestMethod.GET)
@@ -84,12 +79,9 @@ public class StreamSinkController {
     @RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
     @OperationLog(operation = OperationType.DELETE)
     @ApiOperation(value = "Delete data sink information")
-    @ApiImplicitParams({
-            @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, 
required = true),
-            @ApiImplicitParam(name = "sinkType", dataTypeClass = String.class, 
required = true)
-    })
-    public Response<Boolean> delete(@PathVariable Integer id, @RequestParam 
String sinkType) {
-        boolean result = sinkService.delete(id, sinkType, 
LoginUserUtils.getLoginUserDetail().getUserName());
+    @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
+    public Response<Boolean> delete(@PathVariable Integer id) {
+        boolean result = sinkService.delete(id, 
LoginUserUtils.getLoginUserDetail().getUserName());
         return Response.success(result);
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index e486bdcaf..57c82216e 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.web.controller;
 import com.github.pagehelper.PageInfo;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.OperationType;
@@ -37,7 +36,6 @@ import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -60,12 +58,9 @@ public class StreamSourceController {
 
     @RequestMapping(value = "/get/{id}", method = RequestMethod.GET)
     @ApiOperation(value = "Query stream source")
-    @ApiImplicitParams({
-            @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, 
required = true),
-            @ApiImplicitParam(name = "sourceType", dataTypeClass = 
String.class, required = true)
-    })
-    public Response<SourceResponse> get(@PathVariable Integer id, 
@RequestParam String sourceType) {
-        return Response.success(sourceService.get(id, sourceType));
+    @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
+    public Response<SourceResponse> get(@PathVariable Integer id) {
+        return Response.success(sourceService.get(id));
     }
 
     @RequestMapping(value = "/list", method = RequestMethod.GET)
@@ -84,12 +79,9 @@ public class StreamSourceController {
     @RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
     @OperationLog(operation = OperationType.DELETE)
     @ApiOperation(value = "Delete stream source")
-    @ApiImplicitParams({
-            @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, 
required = true),
-            @ApiImplicitParam(name = "sourceType", dataTypeClass = 
String.class, required = true)
-    })
-    public Response<Boolean> delete(@PathVariable Integer id, @RequestParam 
String sourceType) {
-        boolean result = sourceService.delete(id, sourceType, 
LoginUserUtils.getLoginUserDetail().getUserName());
+    @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
+    public Response<Boolean> delete(@PathVariable Integer id) {
+        boolean result = sourceService.delete(id, 
LoginUserUtils.getLoginUserDetail().getUserName());
         return Response.success(result);
     }
 

Reply via email to