This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 22392246a [INLONG-7226][Manager] Optimize OpenStreamSourceController 
implementation (#7227)
22392246a is described below

commit 22392246a7ca9907415c377b04cd700a72f078e1
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Jan 13 12:49:28 2023 +0800

    [INLONG-7226][Manager] Optimize OpenStreamSourceController implementation 
(#7227)
---
 .../inlong/manager/pojo/source/SourceRequest.java  | 15 ++--
 .../service/source/AbstractSourceOperator.java     | 38 +++++----
 .../service/source/StreamSourceServiceImpl.java    | 99 +++++++++++++++-------
 .../source/autopush/AutoPushSourceOperator.java    |  4 +-
 .../source/binlog/BinlogSourceOperator.java        |  3 +-
 .../service/source/file/FileSourceOperator.java    |  3 +-
 .../service/source/hudi/HudiSourceOperator.java    |  3 +-
 .../service/source/kafka/KafkaSourceOperator.java  |  3 +-
 .../source/mongodb/MongoDBSourceOperator.java      |  3 +-
 .../service/source/mqtt/MqttSourceOperator.java    |  3 +-
 .../source/oracle/OracleSourceOperator.java        |  3 +-
 .../postgresql/PostgreSQLSourceOperator.java       |  3 +-
 .../source/pulsar/PulsarSourceOperator.java        |  3 +-
 .../service/source/redis/RedisSourceOperator.java  |  3 +-
 .../source/sqlserver/SQLServerSourceOperator.java  |  3 +-
 .../source/tubemq/TubeMQSourceOperator.java        |  3 +-
 .../web/controller/StreamSourceController.java     |  3 +-
 .../openapi/OpenStreamSourceController.java        |  3 +-
 18 files changed, 127 insertions(+), 71 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index f0adcdd5e..3cc5c758b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+
+import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.hibernate.validator.constraints.Length;
@@ -41,31 +43,31 @@ import java.util.Map;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = 
"sourceType")
 public class SourceRequest {
 
-    @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
+    @NotNull(groups = UpdateValidation.class)
     private Integer id;
 
-    @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty("Inlong group id")
+    @NotBlank(groups = SaveValidation.class, message = "inlongGroupId cannot 
be blank")
     @Length(min = 4, max = 100, message = "length must be between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
-    @NotBlank(message = "inlongStreamId cannot be blank")
     @ApiModelProperty("Inlong stream id")
+    @NotBlank(groups = SaveValidation.class, message = "inlongStreamId cannot 
be blank")
     @Length(min = 4, max = 100, message = "inlongStreamId length must be 
between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only 
supports lowercase letters, numbers, '-', or '_'")
     private String inlongStreamId;
 
-    @NotBlank(message = "sourceType cannot be blank")
     @ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
+    @NotBlank(message = "sourceType cannot be blank")
     @Length(min = 1, max = 20, message = "length must be between 1 and 20")
     private String sourceType;
 
-    @NotBlank(message = "sourceName cannot be blank")
+    @ApiModelProperty("Source name, unique in one stream")
+    @NotBlank(groups = SaveValidation.class, message = "sourceName cannot be 
blank")
     @Length(min = 1, max = 100, message = "sourceName length must be between 1 
and 100")
     @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sourceName only 
supports lowercase letters, numbers, '-', or '_'")
-    @ApiModelProperty("Source name, unique in one stream")
     private String sourceName;
 
     @ApiModelProperty("Ip of the agent running the task")
@@ -99,6 +101,7 @@ public class SourceRequest {
     private String snapshot;
 
     @ApiModelProperty("Version")
+    @NotNull(groups = UpdateValidation.class, message = "version cannot be 
null")
     private Integer version;
 
     @ApiModelProperty("Field list, only support when inlong group in light 
weight mode")
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 505417a92..704fb30a4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
@@ -122,30 +121,33 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
     @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
     public void updateOpt(SourceRequest request, Integer groupStatus, Integer 
groupMode, String operator) {
         StreamSourceEntity entity = 
sourceMapper.selectByIdForUpdate(request.getId());
-        Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
+                    String.format("not found source record by id=%d", 
request.getId()));
+        }
         if (SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
             updateFieldOpt(entity, request.getFieldList());
             return;
         }
-
         boolean allowUpdate = 
InlongConstants.LIGHTWEIGHT_MODE.equals(groupMode)
                 || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
         if (!allowUpdate) {
-            throw new BusinessException(String.format("source=%s is not 
allowed to update, "
-                    + "please wait until its changed to final status or stop / 
frozen / delete it firstly", entity));
+            throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
+                    String.format(
+                            "source=%s is not allowed to update, please wait 
until its changed to final status or stop / frozen / delete it firstly",
+                            entity));
         }
         String errMsg = String.format("source has already updated with 
groupId=%s, streamId=%s, name=%s, curVersion=%s",
                 request.getInlongGroupId(), request.getInlongStreamId(), 
request.getSourceName(), request.getVersion());
         if (!Objects.equals(entity.getVersion(), request.getVersion())) {
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, errMsg);
         }
 
         // source type cannot be changed
         if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
-            throw new BusinessException(String.format("source type=%s cannot 
change to %s",
-                    entity.getSourceType(), request.getSourceType()));
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    String.format("source type=%s cannot change to %s", 
entity.getSourceType(),
+                            request.getSourceType()));
         }
 
         String groupId = request.getInlongGroupId();
@@ -155,8 +157,9 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         for (StreamSourceEntity sourceEntity : sourceList) {
             Integer sourceId = sourceEntity.getId();
             if (!Objects.equals(sourceId, request.getId())) {
-                String err = "source name=%s already exists with the 
groupId=%s streamId=%s";
-                throw new BusinessException(String.format(err, sourceName, 
groupId, streamId));
+                throw new 
BusinessException(ErrorCodeEnum.SOURCE_ALREADY_EXISTS,
+                        String.format("source name=%s already exists with the 
groupId=%s streamId=%s", sourceName,
+                                groupId, streamId));
             }
         }
 
@@ -187,11 +190,10 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
 
         int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.warn(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, errMsg);
         }
         updateFieldOpt(entity, request.getFieldList());
-        LOGGER.info("success to update source of type={}", 
request.getSourceType());
+        LOGGER.debug("success to update source of type={}", 
request.getSourceType());
     }
 
     @Override
@@ -255,7 +257,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
         saveStreamField(groupId, streamId, fieldInfos);
 
-        LOGGER.info("success to update source fields");
+        LOGGER.debug("success to update source fields");
     }
 
     protected void saveStreamField(String groupId, String streamId, 
List<StreamField> infoList) {
@@ -274,7 +276,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
     }
 
     protected void saveFieldOpt(StreamSourceEntity entity, List<StreamField> 
fieldInfos) {
-        LOGGER.info("begin to save source fields={}", fieldInfos);
+        LOGGER.debug("begin to save source fields={}", fieldInfos);
         if (CollectionUtils.isEmpty(fieldInfos)) {
             return;
         }
@@ -300,6 +302,6 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         }
 
         sourceFieldMapper.insertAll(entityList);
-        LOGGER.info("success to save source fields");
+        LOGGER.debug("success to save source fields");
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index f59c16ba9..8f991bb47 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -33,8 +33,10 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
@@ -63,6 +65,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -79,6 +82,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
+    private InlongStreamEntityMapper streamMapper;
+    @Autowired
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
     private StreamSourceFieldEntityMapper sourceFieldMapper;
@@ -119,8 +124,10 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
     public Integer save(SourceRequest request, UserInfo opInfo) {
-        // check request and parameters
-        this.checkRequestParams(request);
+        // check request parameter
+        if (request == null) {
+            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
+        }
         // check operator info
         if (opInfo == null) {
             throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
@@ -138,10 +145,13 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
                 throw new 
BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
             }
         }
-        // check inlong group status
-        GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
-        if (GroupStatus.notAllowedUpdate(status)) {
-            throw new 
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
 status));
+        // get stream information
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+                request.getInlongGroupId(), request.getInlongStreamId());
+        if (streamEntity == null) {
+            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND,
+                    String.format("InlongStream does not exist with 
InlongGroupId=%s, InLongStreamId=%s",
+                            request.getInlongGroupId(), 
request.getInlongStreamId()));
         }
         // Check if the record to be added exists
         List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(
@@ -151,6 +161,12 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
                     String.format("source name=%s already exists with 
groupId=%s streamId=%s",
                             request.getSourceName(), 
request.getInlongGroupId(), request.getInlongStreamId()));
         }
+        // check inlong group status
+        GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
+        if (GroupStatus.notAllowedUpdate(status)) {
+            throw new 
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+                    
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
+        }
         // According to the source type, save source information
         StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
         // Remove id in sourceField when save
@@ -344,8 +360,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
     public Boolean update(SourceRequest request, String operator) {
         LOGGER.info("begin to update source info: {}", request);
-        this.checkParams(request);
-        Preconditions.checkNotNull(request.getId(), 
ErrorCodeEnum.ID_IS_EMPTY.getMessage());
+        // check request parameter
+        checkRequestParams(request);
 
         // Check if it can be modified
         String groupId = request.getInlongGroupId();
@@ -366,20 +382,16 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
     public Boolean update(SourceRequest request, UserInfo opInfo) {
-        // check request
-        checkRequestParams(request);
-        // check record id
-        if (request.getId() == null) {
-            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
-        }
         // check operator info
         if (opInfo == null) {
             throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
         }
-        // Check if it can be added
+        // check request parameter
+        checkRequestParams(request);
+        // Check if it can be update
         InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
         if (groupEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
+            throw new 
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
                     String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
         }
         // only the person in charges can query
@@ -392,7 +404,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         // check inlong group status
         GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
         if (GroupStatus.notAllowedUpdate(status)) {
-            throw new 
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
 status));
+            throw new 
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+                    
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
         }
         StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
         // Remove id in sourceField when save
@@ -626,21 +639,45 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         if (request == null) {
             throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
         }
-        // check group id
-        if (StringUtils.isBlank(request.getInlongGroupId())) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-        }
-        // check stream id
-        if (StringUtils.isBlank(request.getInlongStreamId())) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
-        }
-        // check source type
-        if (StringUtils.isBlank(request.getSourceType())) {
-            throw new BusinessException(ErrorCodeEnum.SOURCE_TYPE_IS_NULL);
+        // check record exists
+        StreamSourceEntity entity = sourceMapper.selectById(request.getId());
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
+                    String.format("not found source record by id=%d", 
request.getId()));
         }
-        // check source name
-        if (StringUtils.isBlank(request.getSourceName())) {
-            throw new BusinessException(ErrorCodeEnum.SOURCE_NAME_IS_NULL);
+        // check whether modify sourceType
+        if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "sourceType not allowed modify");
         }
+        // check record version
+        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+                    String.format("source has already updated with groupId=%s, 
streamId=%s, name=%s, curVersion=%s",
+                            request.getInlongGroupId(), 
request.getInlongStreamId(), request.getSourceName(),
+                            request.getVersion()));
+        }
+        // check whether modify groupId
+        if (StringUtils.isNotBlank(request.getInlongGroupId())
+                && 
!entity.getInlongGroupId().equals(request.getInlongGroupId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "InlongGroupId not allowed modify");
+        }
+        // check whether modify streamId
+        if (StringUtils.isNotBlank(request.getInlongStreamId())
+                && 
!entity.getInlongStreamId().equals(request.getInlongStreamId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "InlongStreamId not allowed modify");
+        }
+        // check whether modify sourceName
+        if (StringUtils.isNotBlank(request.getSourceName())
+                && !entity.getSourceName().equals(request.getSourceName())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "sourceName not allowed modify");
+        }
+        // set primary field value
+        request.setInlongGroupId(entity.getInlongGroupId());
+        request.setInlongStreamId(entity.getInlongStreamId());
+        request.setSourceName(entity.getSourceName());
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
index e16b6107f..107a12bb9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
@@ -69,8 +69,8 @@ public class AutoPushSourceOperator extends 
AbstractSourceOperator {
             AutoPushSourceDTO dto = 
AutoPushSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to source info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of AutoPush SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
index 87d412d72..b678765d7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
@@ -62,7 +62,8 @@ public class BinlogSourceOperator extends 
AbstractSourceOperator {
             MySQLBinlogSourceDTO dto = 
MySQLBinlogSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of MySQLBinlog 
SourceDTO failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 6f410a6b4..b217cce3a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -68,7 +68,8 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
             FileSourceDTO dto = FileSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of File SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
index 2b341ef69..300fe04c6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
@@ -61,7 +61,8 @@ public class HudiSourceOperator extends 
AbstractSourceOperator {
             HudiSourceDTO dto = HudiSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of Hudi SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 686b81c39..97cedf6a2 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -77,7 +77,8 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
             KafkaSourceDTO dto = KafkaSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of Kafka SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
index 16cce1f62..a379669c7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
@@ -62,7 +62,8 @@ public class MongoDBSourceOperator extends 
AbstractSourceOperator {
             MongoDBSourceDTO dto = 
MongoDBSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of MongoDB SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
index a573c9a85..efc24a204 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
@@ -59,7 +59,8 @@ public class MqttSourceOperator extends 
AbstractSourceOperator {
             MqttSourceDTO dto = MqttSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of Mqtt SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
index 81307860b..c5a9191e4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
@@ -62,7 +62,8 @@ public class OracleSourceOperator extends 
AbstractSourceOperator {
             OracleSourceDTO dto = 
OracleSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of Oracle SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
index a8c0dbb72..85092f513 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
@@ -62,7 +62,8 @@ public class PostgreSQLSourceOperator extends 
AbstractSourceOperator {
             PostgreSQLSourceDTO dto = 
PostgreSQLSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of PostgreSQL SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index a20629e4e..f9091b782 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -85,7 +85,8 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
             PulsarSourceDTO dto = 
PulsarSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of Pulsar SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
index 17f338ae3..aefe889ce 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
@@ -62,7 +62,8 @@ public class RedisSourceOperator extends 
AbstractSourceOperator {
             RedisSourceDTO dto = RedisSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of Pulsar SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
index 09f4821f7..c24b1c48f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
@@ -62,7 +62,8 @@ public class SQLServerSourceOperator extends 
AbstractSourceOperator {
             SQLServerSourceDTO dto = 
SQLServerSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of SQLServer SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 488c866d4..bbba5c576 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -75,7 +75,8 @@ public class TubeMQSourceOperator extends 
AbstractSourceOperator {
             TubeMQSourceDTO dto = 
TubeMQSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of TubeMQ SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index fd605db9d..7db5ea233 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
@@ -54,7 +55,7 @@ public class StreamSourceController {
     @RequestMapping(value = "/source/save", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save stream source")
-    public Response<Integer> save(@Validated @RequestBody SourceRequest 
request) {
+    public Response<Integer> save(@Validated(SaveValidation.class) 
@RequestBody SourceRequest request) {
         return Response.success(sourceService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 23ff37076..292a543d7 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.web.controller.openapi;
 
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
@@ -66,7 +67,7 @@ public class OpenStreamSourceController {
     @RequestMapping(value = "/source/save", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save stream source")
-    public Response<Integer> save(@Validated @RequestBody SourceRequest 
request) {
+    public Response<Integer> save(@Validated(SaveValidation.class) 
@RequestBody SourceRequest request) {
         return Response.success(sourceService.save(request, 
LoginUserUtils.getLoginUser()));
     }
 


Reply via email to