This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ff68b8d0e [INLONG-7220][Manager] Optimize OpenStreamSinkController 
implementation (#7224)
ff68b8d0e is described below

commit ff68b8d0ef6f4927bbdeb5e9f5983389c7c59364
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Jan 12 16:01:30 2023 +0800

    [INLONG-7220][Manager] Optimize OpenStreamSinkController implementation 
(#7224)
---
 .../inlong/manager/pojo/sink/SinkRequest.java      | 16 +++++---
 .../manager/pojo/sink/doris/DorisSinkDTO.java      |  3 +-
 .../manager/pojo/sink/es/ElasticsearchSinkDTO.java |  3 +-
 .../pojo/sink/greenplum/GreenplumSinkDTO.java      |  3 +-
 .../manager/pojo/sink/hbase/HBaseSinkDTO.java      |  3 +-
 .../inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java |  3 +-
 .../inlong/manager/pojo/sink/hive/HiveSinkDTO.java |  3 +-
 .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java |  3 +-
 .../manager/pojo/sink/iceberg/IcebergSinkDTO.java  |  3 +-
 .../manager/pojo/sink/kafka/KafkaSinkDTO.java      |  3 +-
 .../manager/pojo/sink/mysql/MySQLSinkDTO.java      |  4 +-
 .../manager/pojo/sink/oracle/OracleSinkDTO.java    |  4 +-
 .../pojo/sink/postgresql/PostgreSQLSinkDTO.java    |  3 +-
 .../pojo/sink/sqlserver/SQLServerSinkDTO.java      |  3 +-
 .../pojo/sink/starrocks/StarRocksSinkDTO.java      |  7 +++-
 .../tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java    |  3 +-
 .../manager/service/sink/AbstractSinkOperator.java | 31 +++++++++-------
 .../service/sink/StreamSinkServiceImpl.java        | 43 +++++++++++++++++-----
 .../service/sink/ck/ClickHouseSinkOperator.java    | 11 +++---
 .../service/sink/doris/DorisSinkOperator.java      | 18 ++++++---
 .../service/sink/es/ElasticsearchSinkOperator.java | 19 +++++-----
 .../sink/greenplum/GreenplumSinkOperator.java      | 11 +++---
 .../service/sink/hbase/HBaseSinkOperator.java      | 11 +++---
 .../service/sink/hdfs/HDFSSinkOperator.java        | 11 +++---
 .../service/sink/hive/HiveSinkOperator.java        | 17 +++++----
 .../service/sink/hudi/HudiSinkOperator.java        | 38 ++++++++++---------
 .../service/sink/iceberg/IcebergSinkOperator.java  | 17 +++++----
 .../service/sink/kafka/KafkaSinkOperator.java      | 11 +++---
 .../service/sink/mysql/MySQLSinkOperator.java      | 20 ++++++----
 .../service/sink/oracle/OracleSinkOperator.java    | 11 +++---
 .../sink/postgresql/PostgreSQLSinkOperator.java    | 11 +++---
 .../postgresql/TDSQLPostgreSQLSinkOperator.java    | 11 +++---
 .../sink/sqlserver/SQLServerSinkOperator.java      | 11 +++---
 .../sink/starrocks/StarRocksSinkOperator.java      | 35 +++++++++++-------
 .../service/stream/InlongStreamProcessService.java | 22 ++++++-----
 .../web/controller/StreamSinkController.java       |  8 ++--
 .../openapi/OpenStreamSinkController.java          |  4 +-
 37 files changed, 263 insertions(+), 175 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index d8ca5f02d..545563f4b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -22,7 +22,10 @@ import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import org.apache.inlong.manager.common.validation.SaveValidation;
+import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
+import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
 import org.hibernate.validator.constraints.Length;
 import org.hibernate.validator.constraints.Range;
 
@@ -40,29 +43,29 @@ import java.util.Map;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = 
"sinkType")
 public abstract class SinkRequest {
 
-    @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
+    @NotNull(groups = UpdateByIdValidation.class, message = "id cannot be 
null")
     private Integer id;
 
-    @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty("Inlong group id")
+    @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class}, 
message = "inlongGroupId cannot be blank")
     @Length(min = 4, max = 100, message = "length must be between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
-    @NotBlank(message = "inlongStreamId cannot be blank")
     @ApiModelProperty("Inlong stream id")
+    @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class}, 
message = "inlongStreamId cannot be blank")
     @Length(min = 4, max = 100, message = "inlongStreamId length must be 
between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only 
supports lowercase letters, numbers, '-', or '_'")
     private String inlongStreamId;
 
-    @NotBlank(message = "sinkType cannot be blank")
     @ApiModelProperty("Sink type, including: HIVE, ES, etc.")
+    @NotBlank(message = "sinkType cannot be blank")
     @Length(max = 15, message = "length must be less than or equal to 15")
     private String sinkType;
 
-    @NotBlank(message = "sinkName cannot be blank")
     @ApiModelProperty("Sink name, unique in one stream")
+    @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class}, 
message = "sinkName cannot be blank")
     @Length(min = 1, max = 100, message = "sinkName length must be between 1 
and 100")
     @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sinkName only supports 
lowercase letters, numbers, '-', or '_'")
     private String sinkName;
@@ -103,6 +106,7 @@ public abstract class SinkRequest {
     private Map<String, Object> properties = Maps.newHashMap();
 
     @ApiModelProperty(value = "Version number")
+    @NotNull(groups = {UpdateByIdValidation.class, 
UpdateByKeyValidation.class}, message = "version cannot be null")
     private Integer version;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
index 60dfb1a7b..b8a379230 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
@@ -106,7 +106,8 @@ public class DorisSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, 
DorisSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Doris SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index 0828b119f..fb7297319 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -103,7 +103,8 @@ public class ElasticsearchSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, 
ElasticsearchSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Elasticsearch SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
index d46970b64..fa178134d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -78,7 +78,8 @@ public class GreenplumSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, GreenplumSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Greenplum SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
index bbdd924e3..3a417a397 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
@@ -90,7 +90,8 @@ public class HBaseSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, HBaseSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of HBase SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
index 25b604bd8..ba7c05e37 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
@@ -82,7 +82,8 @@ public class HDFSSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, HDFSSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of HDFS SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
index 4111cbf88..b9dc4197b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
@@ -128,7 +128,8 @@ public class HiveSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, 
HiveSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Hive SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
index 32b214060..f42bad925 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -99,7 +99,8 @@ public class HudiSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, HudiSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Hudi SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
index 800da63b2..14acc52b8 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -91,7 +91,8 @@ public class IcebergSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, IcebergSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Iceberg SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
index 5eebb5b69..cd16b98a7 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
@@ -78,7 +78,8 @@ public class KafkaSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, KafkaSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Kafka SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index ca6752a6a..443e8f9c6 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -108,8 +108,8 @@ public class MySQLSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, MySQLSinkDTO.class);
         } catch (Exception e) {
-            LOGGER.error("fetch mysql sink info failed from json params: " + 
extParams, e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of MySQL SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
index 553a534ce..1016b88e0 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
@@ -83,8 +83,8 @@ public class OracleSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, OracleSinkDTO.class);
         } catch (Exception e) {
-            LOGGER.error("fetch oracle sink info failed from json params: " + 
extParams, e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of Oracle SinkDTO failure: 
%s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index 7fd5b45d9..b66a38d0e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -98,7 +98,8 @@ public class PostgreSQLSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, 
PostgreSQLSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of PostgreSQL SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
index 00c9b9ca3..b1a99ca1a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
@@ -85,7 +85,8 @@ public class SQLServerSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, SQLServerSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of SQLServer SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
index 9b627c400..74c3e9990 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
@@ -32,6 +32,7 @@ import javax.validation.constraints.NotNull;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Sink info of StarRocks
@@ -121,9 +122,11 @@ public class StarRocksSinkDTO {
 
     public static StarRocksSinkDTO getFromJson(@NotNull String extParams) {
         try {
-            return JsonUtils.parseObject(extParams, 
StarRocksSinkDTO.class).decryptPassword();
+            return Objects.requireNonNull(JsonUtils.parseObject(
+                    extParams, StarRocksSinkDTO.class)).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of StarRocks SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
index 569606737..b46988514 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
@@ -81,7 +81,8 @@ public class TDSQLPostgreSQLSinkDTO {
         try {
             return JsonUtils.parseObject(extParams, 
TDSQLPostgreSQLSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of TDSQLPostgreSQL SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index c1331bec1..f47db9c67 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -26,7 +26,6 @@ import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
@@ -116,13 +115,14 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
     @Override
     public void updateOpt(SinkRequest request, SinkStatus nextStatus, String 
operator) {
         StreamSinkEntity entity = 
sinkMapper.selectByPrimaryKey(request.getId());
-        Preconditions.checkNotNull(entity, 
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
-
-        String errMsg = String.format("sink has already updated with 
groupId=%s, streamId=%s, name=%s, curVersion=%s",
-                request.getInlongGroupId(), request.getInlongStreamId(), 
request.getSinkName(), request.getVersion());
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+        }
         if (!Objects.equals(entity.getVersion(), request.getVersion())) {
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+                    String.format("sink has already updated with groupId=%s, 
streamId=%s, name=%s, curVersion=%s",
+                            request.getInlongGroupId(), 
request.getInlongStreamId(), request.getSinkName(),
+                            request.getVersion()));
         }
         CommonBeanUtils.copyProperties(request, entity, true);
         setTargetEntity(request, entity);
@@ -133,8 +133,10 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
         entity.setModifier(operator);
         int rowCount = sinkMapper.updateByIdSelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+                    String.format("sink has already updated with groupId=%s, 
streamId=%s, name=%s, curVersion=%s",
+                            request.getInlongGroupId(), 
request.getInlongStreamId(), request.getSinkName(),
+                            request.getVersion()));
         }
 
         boolean onlyAdd = 
SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
@@ -171,7 +173,7 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
     @Override
     public void saveFieldOpt(SinkRequest request) {
         List<SinkField> fieldList = request.getSinkFieldList();
-        LOGGER.info("begin to save sink fields={}", fieldList);
+        LOGGER.debug("begin to save sink fields={}", fieldList);
         if (CollectionUtils.isEmpty(fieldList)) {
             return;
         }
@@ -197,7 +199,7 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
         }
 
         sinkFieldMapper.insertAll(entityList);
-        LOGGER.info("success to save sink fields");
+        LOGGER.debug("success to save sink fields");
     }
 
     @Override
@@ -208,9 +210,10 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
         entity.setModifier(operator);
         int rowCount = sinkMapper.updateByIdSelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error("sink has already updated with groupId={}, 
streamId={}, name={}, curVersion={}",
-                    entity.getInlongGroupId(), entity.getInlongStreamId(), 
entity.getSinkName(), entity.getVersion());
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+                    String.format("sink has already updated with groupId=%s, 
streamId=%s, name=%s, curVersion=%s",
+                            entity.getInlongGroupId(), 
entity.getInlongStreamId(), entity.getSinkName(),
+                            entity.getVersion()));
         }
         sinkFieldMapper.logicDeleteAll(entity.getId());
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 7cf87e495..fa1afaa0a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -68,6 +68,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -413,8 +414,6 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean update(SinkRequest request, UserInfo opInfo) {
-        // check request parameter
-        checkSinkRequestParams(request);
         if (request.getId() == null) {
             throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
         }
@@ -422,21 +421,43 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         if (opInfo == null) {
             throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
         }
+        StreamSinkEntity curEntity = 
sinkMapper.selectByPrimaryKey(request.getId());
+        if (curEntity == null) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+        }
+        if (!Objects.equals(curEntity.getVersion(), request.getVersion())) {
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+                    String.format("sink has already updated with groupId=%s, 
streamId=%s, name=%s, curVersion=%s",
+                            curEntity.getInlongGroupId(), 
curEntity.getInlongStreamId(), curEntity.getSinkName(),
+                            curEntity.getVersion()));
+        }
+        if (StringUtils.isNotBlank(request.getInlongGroupId())
+                && 
!curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "InlongGroupId not allowed modify");
+        }
+        if (StringUtils.isNotBlank(request.getInlongStreamId())
+                && 
!curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "InlongStreamId not allowed modify");
+        }
+        request.setInlongGroupId(curEntity.getInlongGroupId());
+        request.setInlongStreamId(curEntity.getInlongStreamId());
         // check group record
-        InlongGroupEntity entity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
-        if (entity == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", request.getInlongGroupId()));
+        InlongGroupEntity curGroupEntity = 
groupMapper.selectByGroupId(curEntity.getInlongGroupId());
+        if (curGroupEntity == null) {
+            throw new 
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
+                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", curEntity.getInlongGroupId()));
         }
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
-            List<String> inCharges = 
Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
+            List<String> inCharges = 
Arrays.asList(curGroupEntity.getInCharges().split(InlongConstants.COMMA));
             if (!inCharges.contains(opInfo.getName())) {
                 throw new 
BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
             }
         }
         // Check if group status can be modified
-        GroupStatus curState = GroupStatus.forCode(entity.getStatus());
+        GroupStatus curState = GroupStatus.forCode(curEntity.getStatus());
         if (GroupStatus.notAllowedUpdate(curState)) {
             throw new 
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
 curState));
         }
@@ -444,7 +465,9 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
                 request.getInlongGroupId(), request.getInlongStreamId());
         if (streamEntity == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
+            throw new 
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
+                    String.format("stream record not found with the groupId=%s 
streamId=%s",
+                            curEntity.getInlongGroupId(), 
curEntity.getInlongStreamId()));
         }
         // Check whether the sink name exists with the same groupId and 
streamId
         StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(
@@ -762,6 +785,6 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         }
 
         streamProcessOperation.deleteProcess(groupId, streamId, operator, 
false);
-        LOGGER.info("success to start the delete-stream-process for groupId={} 
streamId={}", groupId, streamId);
+        LOGGER.debug("success to start the delete-stream-process for 
groupId={} streamId={}", groupId, streamId);
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
index 3e405bd6d..6a7e80410 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -64,15 +63,17 @@ public class ClickHouseSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         ClickHouseSinkRequest sinkRequest = (ClickHouseSinkRequest) request;
         try {
             ClickHouseSinkDTO dto = 
ClickHouseSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of ClickHouse SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
index 0bcb38cab..30a268b22 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
@@ -20,11 +20,12 @@ package org.apache.inlong.manager.service.sink.doris;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.List;
 import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -61,15 +62,17 @@ public class DorisSinkOperator extends AbstractSinkOperator 
{
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         DorisSinkRequest sinkRequest = (DorisSinkRequest) request;
         try {
             DorisSinkDTO dto = DorisSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Doris SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
@@ -81,7 +84,10 @@ public class DorisSinkOperator extends AbstractSinkOperator {
         }
 
         DorisSinkDTO dto = DorisSinkDTO.getFromJson(entity.getExtParams());
-        Preconditions.checkNotEmpty(dto.getFeNodes(), "doris fe nodes is 
empty");
+        if (StringUtils.isBlank(dto.getFeNodes())) {
+            throw new 
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
+                    "doris fe nodes is blank");
+        }
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 19196a4dd..1516a456e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
@@ -73,8 +72,10 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         ElasticsearchSinkRequest sinkRequest = (ElasticsearchSinkRequest) 
request;
         try {
             ElasticsearchSinkDTO dto = 
ElasticsearchSinkDTO.getFromRequest(sinkRequest);
@@ -95,8 +96,8 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
             dto.setEncryptVersion(encryptVersion);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Elasticsearch 
SinkDTO failure: %s", e.getMessage()));
         }
     }
 
@@ -129,7 +130,7 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
     @Override
     public void saveFieldOpt(SinkRequest request) {
         List<SinkField> fieldList = request.getSinkFieldList();
-        LOGGER.info("begin to save es sink fields={}", fieldList);
+        LOGGER.debug("begin to save es sink fields={}", fieldList);
         if (CollectionUtils.isEmpty(fieldList)) {
             return;
         }
@@ -150,8 +151,8 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
                 ElasticsearchFieldInfo dto = 
ElasticsearchFieldInfo.getFromRequest(fieldInfo);
                 fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
             } catch (Exception e) {
-                LOGGER.error("parsing json string to sink field info failed", 
e);
-                throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+                throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                        String.format("serialize extParams of Elasticsearch 
FieldInfo failure: %s", e.getMessage()));
             }
             fieldEntity.setInlongGroupId(groupId);
             fieldEntity.setInlongStreamId(streamId);
@@ -162,7 +163,7 @@ public class ElasticsearchSinkOperator extends 
AbstractSinkOperator {
         }
 
         sinkFieldMapper.insertAll(entityList);
-        LOGGER.info("success to save es sink fields");
+        LOGGER.debug("success to save es sink fields");
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
index a7d0c7d07..a9a7fdb12 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
@@ -28,7 +28,6 @@ import 
org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
 import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkDTO;
 import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class GreenplumSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         GreenplumSinkRequest sinkRequest = (GreenplumSinkRequest) request;
         try {
             GreenplumSinkDTO dto = 
GreenplumSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Greenplum SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
index 6801abce5..917ea3f5d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
 import org.apache.inlong.manager.pojo.sink.hbase.HBaseSinkDTO;
 import org.apache.inlong.manager.pojo.sink.hbase.HBaseSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class HBaseSinkOperator extends AbstractSinkOperator 
{
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         HBaseSinkRequest sinkRequest = (HBaseSinkRequest) request;
         try {
             HBaseSinkDTO dto = HBaseSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of HBase SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
index 245709ac6..576239670 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
 import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSinkDTO;
 import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class HDFSSinkOperator extends AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         HDFSSinkRequest sinkRequest = (HDFSSinkRequest) request;
         try {
             HDFSSinkDTO dto = HDFSSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of HDFS SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
index 67b9e133a..4ffc02163 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSinkDTO;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -63,15 +62,17 @@ public class HiveSinkOperator extends AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         HiveSinkRequest sinkRequest = (HiveSinkRequest) request;
         try {
             HiveSinkDTO dto = HiveSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Hive SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
@@ -84,8 +85,10 @@ public class HiveSinkOperator extends AbstractSinkOperator {
 
         HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
         if (StringUtils.isBlank(dto.getJdbcUrl())) {
-            Preconditions.checkNotEmpty(entity.getDataNodeName(),
-                    "hive jdbc url unspecified and data node is empty");
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                        "hive jdbc url unspecified and data node is blank");
+            }
             HiveDataNodeInfo dataNodeInfo = (HiveDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
                     entity.getDataNodeName(), entity.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index 734697296..e5613d4c9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.service.sink.hudi;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.List;
 import java.util.Set;
@@ -29,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -72,8 +69,10 @@ public class HudiSinkOperator extends AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         HudiSinkRequest sinkRequest = (HudiSinkRequest) request;
 
         String partitionKey = sinkRequest.getPartitionKey();
@@ -84,17 +83,18 @@ public class HudiSinkOperator extends AbstractSinkOperator {
             Set<String> fieldNames = 
sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
                     .collect(Collectors.toSet());
             if (primaryKeyExist) {
-                checkState(
-                        fieldNames.contains(partitionKey),
-                        "The partitionKey({}) must be included in the 
sinkFieldList({})",
-                        partitionKey, fieldNames);
+                if (!fieldNames.contains(partitionKey)) {
+                    throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                            String.format("The partitionKey(%s) must be 
included in the sinkFieldList(%s)",
+                                    partitionKey, fieldNames));
+                }
             }
             if (partitionKeyExist) {
-                checkState(
-                        fieldNames.contains(partitionKey),
-                        "The primaryKey({}) must be included in the 
sinkFieldList({})",
-                        primaryKey,
-                        fieldNames);
+                if (!fieldNames.contains(primaryKey)) {
+                    throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                            String.format("The primaryKey(%s) must be included 
in the sinkFieldList(%s)",
+                                    primaryKey, fieldNames));
+                }
             }
         }
 
@@ -102,8 +102,8 @@ public class HudiSinkOperator extends AbstractSinkOperator {
             HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Hudi SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
@@ -116,8 +116,10 @@ public class HudiSinkOperator extends AbstractSinkOperator 
{
 
         HudiSinkDTO dto = HudiSinkDTO.getFromJson(entity.getExtParams());
         if (StringUtils.isBlank(dto.getCatalogUri()) && 
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
-            Preconditions.checkNotEmpty(entity.getDataNodeName(),
-                    "hudi catalog uri unspecified and data node is empty");
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                        "hudi catalog uri unspecified and data node is blank");
+            }
             HudiDataNodeInfo dataNodeInfo = (HudiDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
                     entity.getDataNodeName(), entity.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index e8da4f8a4..f778df3b9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -32,7 +32,6 @@ import 
org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkDTO;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -67,15 +66,17 @@ public class IcebergSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         IcebergSinkRequest sinkRequest = (IcebergSinkRequest) request;
         try {
             IcebergSinkDTO dto = IcebergSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Iceberg SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
@@ -88,8 +89,10 @@ public class IcebergSinkOperator extends 
AbstractSinkOperator {
 
         IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
         if (StringUtils.isBlank(dto.getCatalogUri()) && 
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
-            Preconditions.checkNotEmpty(entity.getDataNodeName(),
-                    "iceberg catalog uri unspecified and data node is empty");
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                        "iceberg catalog uri unspecified and data node is 
blank");
+            }
             IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
                     entity.getDataNodeName(), entity.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index 91b35c53a..bf6638123 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class KafkaSinkOperator extends AbstractSinkOperator 
{
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         KafkaSinkRequest sinkRequest = (KafkaSinkRequest) request;
         try {
             KafkaSinkDTO dto = KafkaSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Kafka SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index a81b9ff34..2d5a07eb9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -63,15 +62,17 @@ public class MySQLSinkOperator extends AbstractSinkOperator 
{
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         MySQLSinkRequest sinkRequest = (MySQLSinkRequest) request;
         try {
             MySQLSinkDTO dto = MySQLSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of MySQL SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
@@ -84,9 +85,12 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
 
         MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
         if (StringUtils.isBlank(dto.getJdbcUrl())) {
-            String dataNodeName = entity.getDataNodeName();
-            Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not 
specified and data node is empty");
-            DataNodeInfo dataNodeInfo = 
dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                        "mysql jdbc url not specified and data node is blank");
+            }
+            DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
             dto.setJdbcUrl(dataNodeInfo.getUrl());
             dto.setPassword(dataNodeInfo.getToken());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
index a24f8a81c..561404b4a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
 import org.apache.inlong.manager.pojo.sink.oracle.OracleSinkDTO;
 import org.apache.inlong.manager.pojo.sink.oracle.OracleSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class OracleSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         OracleSinkRequest sinkRequest = (OracleSinkRequest) request;
         try {
             OracleSinkDTO dto = OracleSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of Oracle SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
index 256d61ed0..7f4c6f8d5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
@@ -28,7 +28,6 @@ import 
org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
 import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSinkDTO;
 import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class PostgreSQLSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         PostgreSQLSinkRequest sinkRequest = (PostgreSQLSinkRequest) request;
         try {
             PostgreSQLSinkDTO dto = 
PostgreSQLSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of PostgreSQL SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
index d76edc151..1cd089a26 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
@@ -28,7 +28,6 @@ import 
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
 import 
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSinkDTO;
 import 
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
@@ -64,15 +63,17 @@ public class TDSQLPostgreSQLSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        Preconditions.checkTrue(getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         TDSQLPostgreSQLSinkRequest sinkRequest = (TDSQLPostgreSQLSinkRequest) 
request;
         try {
             TDSQLPostgreSQLSinkDTO dto = 
TDSQLPostgreSQLSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of TDSQLPostgreSQL 
SinkDTO failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
index 257909b57..91e6e0e32 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
@@ -28,7 +28,6 @@ import 
org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
 import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSinkDTO;
 import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSinkRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class SQLServerSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        Preconditions.checkTrue(getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         SQLServerSinkRequest sinkRequest = (SQLServerSinkRequest) request;
         try {
             SQLServerSinkDTO dto = 
SQLServerSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of SQLServer SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index 5b690fc19..a86f35c76 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -25,7 +25,6 @@ import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
@@ -69,15 +68,17 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
 
     @Override
     protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
-        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
-                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
         StarRocksSinkRequest sinkRequest = (StarRocksSinkRequest) request;
         try {
             StarRocksSinkDTO dto = 
StarRocksSinkDTO.getFromRequest(sinkRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            LOGGER.error("parsing json string to sink info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of StarRocks SinkDTO 
failure: %s", e.getMessage()));
         }
     }
 
@@ -90,16 +91,24 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
 
         StarRocksSinkDTO dto = 
StarRocksSinkDTO.getFromJson(entity.getExtParams());
         if (StringUtils.isBlank(dto.getJdbcUrl())) {
-            Preconditions.checkNotEmpty(entity.getDataNodeName(),
-                    "starRocks jdbc url unspecified and data node is empty");
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                        "starRocks jdbc url unspecified and data node is 
blank");
+            }
             StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
                     entity.getDataNodeName(), entity.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
             dto.setJdbcUrl(dataNodeInfo.getUrl());
             dto.setPassword(dataNodeInfo.getToken());
         }
-        Preconditions.checkNotEmpty(dto.getLoadUrl(), "StarRocks load url is 
empty");
-        Preconditions.checkNotEmpty(dto.getJdbcUrl(), "StarRocks jdbc url is 
empty");
+        if (StringUtils.isBlank(dto.getLoadUrl())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    "StarRocks load url is blank");
+        }
+        if (StringUtils.isBlank(dto.getJdbcUrl())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    "StarRocks jdbc url is blank");
+        }
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         List<SinkField> sinkFields = super.getSinkFields(entity.getId());
@@ -110,7 +119,7 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
     @Override
     public void saveFieldOpt(SinkRequest request) {
         List<SinkField> fieldList = request.getSinkFieldList();
-        LOGGER.info("begin to save es sink fields={}", fieldList);
+        LOGGER.debug("begin to save es sink fields={}", fieldList);
         if (CollectionUtils.isEmpty(fieldList)) {
             return;
         }
@@ -131,8 +140,8 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
                 StarRocksColumnInfo dto = 
StarRocksColumnInfo.getFromRequest(fieldInfo);
                 fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
             } catch (Exception e) {
-                LOGGER.error("parsing json string to sink field info failed", 
e);
-                throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+                throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                        String.format("serialize extParams of StarRocks 
ColumnInfo failure: %s", e.getMessage()));
             }
             fieldEntity.setInlongGroupId(groupId);
             fieldEntity.setInlongStreamId(streamId);
@@ -143,7 +152,7 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
         }
 
         sinkFieldMapper.insertAll(entityList);
-        LOGGER.info("success to save starRock sink fields");
+        LOGGER.debug("success to save starRock sink fields");
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index abc8d79de..934eb7404 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -196,31 +196,35 @@ public class InlongStreamProcessService {
      * Restart stream in synchronous/asynchronous way.
      */
     public boolean deleteProcess(String groupId, String streamId, String 
operator, boolean sync) {
-        log.info("begin to delete stream process for groupId={} streamId={}", 
groupId, streamId);
+        log.debug("begin to delete stream process for groupId={} streamId={}", 
groupId, streamId);
 
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        Preconditions.checkNotNull(groupInfo, 
ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+        if (groupInfo == null) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
+                    ErrorCodeEnum.GROUP_NOT_FOUND.getMessage() + " : " + 
groupId);
+        }
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
         if (GroupStatus.notAllowedTransition(groupStatus, 
GroupStatus.DELETING)) {
-            throw new BusinessException(String.format("group status=%s not 
support delete stream"
-                    + " for groupId=%s", groupStatus, groupId));
+            throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED,
+                    String.format("group status=%s not support delete stream 
for groupId=%s", groupStatus, groupId));
         }
 
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         Preconditions.checkNotNull(streamInfo, 
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
         if (status == StreamStatus.DELETED || status == StreamStatus.DELETING) 
{
-            log.warn("groupId={}, streamId={} is already in {}", groupId, 
streamId, status);
+            log.debug("groupId={}, streamId={} is already in {}", groupId, 
streamId, status);
             return true;
         }
 
         if (StreamStatus.notAllowedDelete(status)) {
-            throw new BusinessException(String.format("stream status=%s not 
support delete stream"
-                    + " for groupId=%s streamId=%s", status, groupId, 
streamId));
+            throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
+                    String.format("stream status=%s not support delete stream 
for groupId=%s streamId=%s", status,
+                            groupId, streamId));
         }
 
-        StreamResourceProcessForm processForm = 
StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
-                GroupOperateType.DELETE);
+        StreamResourceProcessForm processForm =
+                StreamResourceProcessForm.getProcessForm(groupInfo, 
streamInfo, GroupOperateType.DELETE);
         ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
         if (sync) {
             WorkflowResult workflowResult = workflowService.start(processName, 
operator, processForm);
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 40d202479..949e943ba 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -22,7 +22,8 @@ import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
+import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -75,14 +76,15 @@ public class StreamSinkController {
     @RequestMapping(value = "/sink/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update stream sink")
-    public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody SinkRequest request) {
+    public Response<Boolean> update(@Validated(UpdateByIdValidation.class) 
@RequestBody SinkRequest request) {
         return Response.success(sinkService.update(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/sink/updateByKey", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update stream sink by key")
-    public Response<UpdateResult> updateByKey(@RequestBody SinkRequest 
request) {
+    public Response<UpdateResult> updateByKey(
+            @Validated(UpdateByKeyValidation.class) @RequestBody SinkRequest 
request) {
         return Response.success(sinkService.updateByKey(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index b12c47be3..c6048694c 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.web.controller.openapi;
 
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -76,7 +76,7 @@ public class OpenStreamSinkController {
     @RequestMapping(value = "/sink/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update stream sink")
-    public Response<Boolean> update(@Validated(UpdateValidation.class) 
@RequestBody SinkRequest request) {
+    public Response<Boolean> update(@Validated(UpdateByIdValidation.class) 
@RequestBody SinkRequest request) {
         return Response.success(sinkService.update(request, 
LoginUserUtils.getLoginUser()));
     }
 

Reply via email to