This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 22392246a [INLONG-7226][Manager] Optimize OpenStreamSourceController
implementation (#7227)
22392246a is described below
commit 22392246a7ca9907415c377b04cd700a72f078e1
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Jan 13 12:49:28 2023 +0800
[INLONG-7226][Manager] Optimize OpenStreamSourceController implementation
(#7227)
---
.../inlong/manager/pojo/source/SourceRequest.java | 15 ++--
.../service/source/AbstractSourceOperator.java | 38 +++++----
.../service/source/StreamSourceServiceImpl.java | 99 +++++++++++++++-------
.../source/autopush/AutoPushSourceOperator.java | 4 +-
.../source/binlog/BinlogSourceOperator.java | 3 +-
.../service/source/file/FileSourceOperator.java | 3 +-
.../service/source/hudi/HudiSourceOperator.java | 3 +-
.../service/source/kafka/KafkaSourceOperator.java | 3 +-
.../source/mongodb/MongoDBSourceOperator.java | 3 +-
.../service/source/mqtt/MqttSourceOperator.java | 3 +-
.../source/oracle/OracleSourceOperator.java | 3 +-
.../postgresql/PostgreSQLSourceOperator.java | 3 +-
.../source/pulsar/PulsarSourceOperator.java | 3 +-
.../service/source/redis/RedisSourceOperator.java | 3 +-
.../source/sqlserver/SQLServerSourceOperator.java | 3 +-
.../source/tubemq/TubeMQSourceOperator.java | 3 +-
.../web/controller/StreamSourceController.java | 3 +-
.../openapi/OpenStreamSourceController.java | 3 +-
18 files changed, 127 insertions(+), 71 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index f0adcdd5e..3cc5c758b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+
+import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.hibernate.validator.constraints.Length;
@@ -41,31 +43,31 @@ import java.util.Map;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property =
"sourceType")
public class SourceRequest {
- @NotNull(groups = UpdateValidation.class)
@ApiModelProperty(value = "Primary key")
+ @NotNull(groups = UpdateValidation.class)
private Integer id;
- @NotBlank(message = "inlongGroupId cannot be blank")
@ApiModelProperty("Inlong group id")
+ @NotBlank(groups = SaveValidation.class, message = "inlongGroupId cannot
be blank")
@Length(min = 4, max = 100, message = "length must be between 4 and 100")
@Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
private String inlongGroupId;
- @NotBlank(message = "inlongStreamId cannot be blank")
@ApiModelProperty("Inlong stream id")
+ @NotBlank(groups = SaveValidation.class, message = "inlongStreamId cannot
be blank")
@Length(min = 4, max = 100, message = "inlongStreamId length must be
between 4 and 100")
@Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only
supports lowercase letters, numbers, '-', or '_'")
private String inlongStreamId;
- @NotBlank(message = "sourceType cannot be blank")
@ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
+ @NotBlank(message = "sourceType cannot be blank")
@Length(min = 1, max = 20, message = "length must be between 1 and 20")
private String sourceType;
- @NotBlank(message = "sourceName cannot be blank")
+ @ApiModelProperty("Source name, unique in one stream")
+ @NotBlank(groups = SaveValidation.class, message = "sourceName cannot be
blank")
@Length(min = 1, max = 100, message = "sourceName length must be between 1
and 100")
@Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sourceName only
supports lowercase letters, numbers, '-', or '_'")
- @ApiModelProperty("Source name, unique in one stream")
private String sourceName;
@ApiModelProperty("Ip of the agent running the task")
@@ -99,6 +101,7 @@ public class SourceRequest {
private String snapshot;
@ApiModelProperty("Version")
+ @NotNull(groups = UpdateValidation.class, message = "version cannot be
null")
private Integer version;
@ApiModelProperty("Field list, only support when inlong group in light
weight mode")
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 505417a92..704fb30a4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
@@ -122,30 +121,33 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
@Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
public void updateOpt(SourceRequest request, Integer groupStatus, Integer
groupMode, String operator) {
StreamSourceEntity entity =
sourceMapper.selectByIdForUpdate(request.getId());
- Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
+ String.format("not found source record by id=%d",
request.getId()));
+ }
if (SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
updateFieldOpt(entity, request.getFieldList());
return;
}
-
boolean allowUpdate =
InlongConstants.LIGHTWEIGHT_MODE.equals(groupMode)
|| SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
if (!allowUpdate) {
- throw new BusinessException(String.format("source=%s is not
allowed to update, "
- + "please wait until its changed to final status or stop /
frozen / delete it firstly", entity));
+ throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
+ String.format(
+ "source=%s is not allowed to update, please wait
until its changed to final status or stop / frozen / delete it firstly",
+ entity));
}
String errMsg = String.format("source has already updated with
groupId=%s, streamId=%s, name=%s, curVersion=%s",
request.getInlongGroupId(), request.getInlongStreamId(),
request.getSourceName(), request.getVersion());
if (!Objects.equals(entity.getVersion(), request.getVersion())) {
- LOGGER.error(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, errMsg);
}
// source type cannot be changed
if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
- throw new BusinessException(String.format("source type=%s cannot
change to %s",
- entity.getSourceType(), request.getSourceType()));
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ String.format("source type=%s cannot change to %s",
entity.getSourceType(),
+ request.getSourceType()));
}
String groupId = request.getInlongGroupId();
@@ -155,8 +157,9 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
for (StreamSourceEntity sourceEntity : sourceList) {
Integer sourceId = sourceEntity.getId();
if (!Objects.equals(sourceId, request.getId())) {
- String err = "source name=%s already exists with the
groupId=%s streamId=%s";
- throw new BusinessException(String.format(err, sourceName,
groupId, streamId));
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_ALREADY_EXISTS,
+ String.format("source name=%s already exists with the
groupId=%s streamId=%s", sourceName,
+ groupId, streamId));
}
}
@@ -187,11 +190,10 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.warn(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, errMsg);
}
updateFieldOpt(entity, request.getFieldList());
- LOGGER.info("success to update source of type={}",
request.getSourceType());
+ LOGGER.debug("success to update source of type={}",
request.getSourceType());
}
@Override
@@ -255,7 +257,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
saveStreamField(groupId, streamId, fieldInfos);
- LOGGER.info("success to update source fields");
+ LOGGER.debug("success to update source fields");
}
protected void saveStreamField(String groupId, String streamId,
List<StreamField> infoList) {
@@ -274,7 +276,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
}
protected void saveFieldOpt(StreamSourceEntity entity, List<StreamField>
fieldInfos) {
- LOGGER.info("begin to save source fields={}", fieldInfos);
+ LOGGER.debug("begin to save source fields={}", fieldInfos);
if (CollectionUtils.isEmpty(fieldInfos)) {
return;
}
@@ -300,6 +302,6 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
}
sourceFieldMapper.insertAll(entityList);
- LOGGER.info("success to save source fields");
+ LOGGER.debug("success to save source fields");
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index f59c16ba9..8f991bb47 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -33,8 +33,10 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
@@ -63,6 +65,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -79,6 +82,8 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
+ private InlongStreamEntityMapper streamMapper;
+ @Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
private StreamSourceFieldEntityMapper sourceFieldMapper;
@@ -119,8 +124,10 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
public Integer save(SourceRequest request, UserInfo opInfo) {
- // check request and parameters
- this.checkRequestParams(request);
+ // check request parameter
+ if (request == null) {
+ throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
+ }
// check operator info
if (opInfo == null) {
throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
@@ -138,10 +145,13 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
throw new
BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
}
}
- // check inlong group status
- GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
- if (GroupStatus.notAllowedUpdate(status)) {
- throw new
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
status));
+ // get stream information
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+ request.getInlongGroupId(), request.getInlongStreamId());
+ if (streamEntity == null) {
+ throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND,
+ String.format("InlongStream does not exist with
InlongGroupId=%s, InLongStreamId=%s",
+ request.getInlongGroupId(),
request.getInlongStreamId()));
}
// Check if the record to be added exists
List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(
@@ -151,6 +161,12 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
String.format("source name=%s already exists with
groupId=%s streamId=%s",
request.getSourceName(),
request.getInlongGroupId(), request.getInlongStreamId()));
}
+ // check inlong group status
+ GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
+ if (GroupStatus.notAllowedUpdate(status)) {
+ throw new
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
+ }
// According to the source type, save source information
StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
// Remove id in sourceField when save
@@ -344,8 +360,8 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
public Boolean update(SourceRequest request, String operator) {
LOGGER.info("begin to update source info: {}", request);
- this.checkParams(request);
- Preconditions.checkNotNull(request.getId(),
ErrorCodeEnum.ID_IS_EMPTY.getMessage());
+ // check request parameter
+ checkRequestParams(request);
// Check if it can be modified
String groupId = request.getInlongGroupId();
@@ -366,20 +382,16 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
public Boolean update(SourceRequest request, UserInfo opInfo) {
- // check request
- checkRequestParams(request);
- // check record id
- if (request.getId() == null) {
- throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
- }
// check operator info
if (opInfo == null) {
throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
}
- // Check if it can be added
+ // check request parameter
+ checkRequestParams(request);
+ // Check if it can be update
InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(request.getInlongGroupId());
if (groupEntity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
+ throw new
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
}
// only the person in charges can query
@@ -392,7 +404,8 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
// check inlong group status
GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
if (GroupStatus.notAllowedUpdate(status)) {
- throw new
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
status));
+ throw new
BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+
String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
}
StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
// Remove id in sourceField when save
@@ -626,21 +639,45 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
if (request == null) {
throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
}
- // check group id
- if (StringUtils.isBlank(request.getInlongGroupId())) {
- throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
- }
- // check stream id
- if (StringUtils.isBlank(request.getInlongStreamId())) {
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
- }
- // check source type
- if (StringUtils.isBlank(request.getSourceType())) {
- throw new BusinessException(ErrorCodeEnum.SOURCE_TYPE_IS_NULL);
+ // check record exists
+ StreamSourceEntity entity = sourceMapper.selectById(request.getId());
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
+ String.format("not found source record by id=%d",
request.getId()));
}
- // check source name
- if (StringUtils.isBlank(request.getSourceName())) {
- throw new BusinessException(ErrorCodeEnum.SOURCE_NAME_IS_NULL);
+ // check whether modify sourceType
+ if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "sourceType not allowed modify");
}
+ // check record version
+ if (!Objects.equals(entity.getVersion(), request.getVersion())) {
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("source has already updated with groupId=%s,
streamId=%s, name=%s, curVersion=%s",
+ request.getInlongGroupId(),
request.getInlongStreamId(), request.getSourceName(),
+ request.getVersion()));
+ }
+ // check whether modify groupId
+ if (StringUtils.isNotBlank(request.getInlongGroupId())
+ &&
!entity.getInlongGroupId().equals(request.getInlongGroupId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "InlongGroupId not allowed modify");
+ }
+ // check whether modify streamId
+ if (StringUtils.isNotBlank(request.getInlongStreamId())
+ &&
!entity.getInlongStreamId().equals(request.getInlongStreamId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "InlongStreamId not allowed modify");
+ }
+ // check whether modify sourceName
+ if (StringUtils.isNotBlank(request.getSourceName())
+ && !entity.getSourceName().equals(request.getSourceName())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "sourceName not allowed modify");
+ }
+ // set primary field value
+ request.setInlongGroupId(entity.getInlongGroupId());
+ request.setInlongStreamId(entity.getInlongStreamId());
+ request.setSourceName(entity.getSourceName());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
index e16b6107f..107a12bb9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
@@ -69,8 +69,8 @@ public class AutoPushSourceOperator extends
AbstractSourceOperator {
AutoPushSourceDTO dto =
AutoPushSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to source info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of AutoPush SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
index 87d412d72..b678765d7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
@@ -62,7 +62,8 @@ public class BinlogSourceOperator extends
AbstractSourceOperator {
MySQLBinlogSourceDTO dto =
MySQLBinlogSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of MySQLBinlog
SourceDTO failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 6f410a6b4..b217cce3a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -68,7 +68,8 @@ public class FileSourceOperator extends
AbstractSourceOperator {
FileSourceDTO dto = FileSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of File SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
index 2b341ef69..300fe04c6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/hudi/HudiSourceOperator.java
@@ -61,7 +61,8 @@ public class HudiSourceOperator extends
AbstractSourceOperator {
HudiSourceDTO dto = HudiSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of Hudi SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 686b81c39..97cedf6a2 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -77,7 +77,8 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
KafkaSourceDTO dto = KafkaSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of Kafka SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
index 16cce1f62..a379669c7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
@@ -62,7 +62,8 @@ public class MongoDBSourceOperator extends
AbstractSourceOperator {
MongoDBSourceDTO dto =
MongoDBSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of MongoDB SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
index a573c9a85..efc24a204 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mqtt/MqttSourceOperator.java
@@ -59,7 +59,8 @@ public class MqttSourceOperator extends
AbstractSourceOperator {
MqttSourceDTO dto = MqttSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of Mqtt SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
index 81307860b..c5a9191e4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
@@ -62,7 +62,8 @@ public class OracleSourceOperator extends
AbstractSourceOperator {
OracleSourceDTO dto =
OracleSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of Oracle SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
index a8c0dbb72..85092f513 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
@@ -62,7 +62,8 @@ public class PostgreSQLSourceOperator extends
AbstractSourceOperator {
PostgreSQLSourceDTO dto =
PostgreSQLSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of PostgreSQL SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index a20629e4e..f9091b782 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -85,7 +85,8 @@ public class PulsarSourceOperator extends
AbstractSourceOperator {
PulsarSourceDTO dto =
PulsarSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of Pulsar SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
index 17f338ae3..aefe889ce 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/redis/RedisSourceOperator.java
@@ -62,7 +62,8 @@ public class RedisSourceOperator extends
AbstractSourceOperator {
RedisSourceDTO dto = RedisSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of Pulsar SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
index 09f4821f7..c24b1c48f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
@@ -62,7 +62,8 @@ public class SQLServerSourceOperator extends
AbstractSourceOperator {
SQLServerSourceDTO dto =
SQLServerSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of SQLServer SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 488c866d4..bbba5c576 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -75,7 +75,8 @@ public class TubeMQSourceOperator extends
AbstractSourceOperator {
TubeMQSourceDTO dto =
TubeMQSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of TubeMQ SourceDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index fd605db9d..7db5ea233 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
@@ -54,7 +55,7 @@ public class StreamSourceController {
@RequestMapping(value = "/source/save", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save stream source")
- public Response<Integer> save(@Validated @RequestBody SourceRequest
request) {
+ public Response<Integer> save(@Validated(SaveValidation.class)
@RequestBody SourceRequest request) {
return Response.success(sourceService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 23ff37076..292a543d7 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.web.controller.openapi;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
@@ -66,7 +67,7 @@ public class OpenStreamSourceController {
@RequestMapping(value = "/source/save", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save stream source")
- public Response<Integer> save(@Validated @RequestBody SourceRequest
request) {
+ public Response<Integer> save(@Validated(SaveValidation.class)
@RequestBody SourceRequest request) {
return Response.success(sourceService.save(request,
LoginUserUtils.getLoginUser()));
}