This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 95b11c169 [INLONG-5184][Manager] Add more info for some error logs 
(#5185)
95b11c169 is described below

commit 95b11c169085f1cb5b8fc3469bd7015de9b17562
Author: healchow <[email protected]>
AuthorDate: Sun Jul 24 16:21:04 2022 +0800

    [INLONG-5184][Manager] Add more info for some error logs (#5185)
---
 .../pojo/cluster/pulsar/PulsarClusterDTO.java      |  2 +-
 .../common/pojo/cluster/tube/TubeClusterDTO.java   |  2 +-
 .../common/pojo/group/pulsar/InlongPulsarDTO.java  |  2 +-
 .../common/pojo/sink/ck/ClickHouseSinkDTO.java     |  2 +-
 .../pojo/sink/dlciceberg/DLCIcebergSinkDTO.java    |  6 +-
 .../pojo/sink/es/ElasticsearchFieldInfo.java       |  2 +-
 .../common/pojo/sink/es/ElasticsearchSinkDTO.java  |  8 +--
 .../pojo/sink/greenplum/GreenplumSinkDTO.java      |  2 +-
 .../pojo/sink/hbase/HBaseColumnFamilyInfo.java     |  2 +-
 .../common/pojo/sink/hbase/HBaseSinkDTO.java       |  2 +-
 .../manager/common/pojo/sink/hdfs/HDFSSinkDTO.java |  2 +-
 .../manager/common/pojo/sink/hive/HiveSinkDTO.java |  2 +-
 .../pojo/sink/iceberg/IcebergColumnInfo.java       |  2 +-
 .../common/pojo/sink/iceberg/IcebergSinkDTO.java   |  2 +-
 .../common/pojo/sink/kafka/KafkaSinkDTO.java       |  2 +-
 .../common/pojo/sink/mysql/MySQLSinkDTO.java       |  2 +-
 .../common/pojo/sink/oracle/OracleSinkDTO.java     |  2 +-
 .../pojo/sink/postgresql/PostgreSQLSinkDTO.java    |  2 +-
 .../pojo/sink/sqlserver/SQLServerSinkDTO.java      |  2 +-
 .../tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java    |  6 +-
 .../pojo/source/autopush/AutoPushSourceDTO.java    |  2 +-
 .../common/pojo/source/file/FileSourceDTO.java     |  2 +-
 .../common/pojo/source/kafka/KafkaSourceDTO.java   |  2 +-
 .../pojo/source/mongodb/MongoDBSourceDTO.java      |  2 +-
 .../pojo/source/mysql/MySQLBinlogSourceDTO.java    |  2 +-
 .../common/pojo/source/oracle/OracleSourceDTO.java |  2 +-
 .../source/postgresql/PostgreSQLSourceDTO.java     |  2 +-
 .../common/pojo/source/pulsar/PulsarSourceDTO.java |  2 +-
 .../pojo/source/sqlserver/SQLServerSourceDTO.java  |  2 +-
 .../common/pojo/source/tubemq/TubeMQSourceDTO.java |  2 +-
 .../service/cluster/PulsarClusterOperator.java     |  2 +-
 .../service/cluster/TubeClusterOperator.java       |  2 +-
 .../service/core/impl/SortClusterServiceImpl.java  | 70 +++++++++----------
 .../operation/InlongStreamProcessOperation.java    | 78 +++++++++-------------
 .../service/group/InlongPulsarOperator.java        |  2 +-
 .../source/autopush/AutoPushSourceOperator.java    |  2 +-
 .../source/binlog/BinlogSourceOperator.java        |  2 +-
 .../service/source/file/FileSourceOperator.java    |  2 +-
 .../service/source/kafka/KafkaSourceOperator.java  |  2 +-
 .../source/mongodb/MongoDBSourceOperator.java      |  2 +-
 .../source/oracle/OracleSourceOperator.java        |  2 +-
 .../postgresql/PostgreSQLSourceOperator.java       |  2 +-
 .../source/pulsar/PulsarSourceOperator.java        |  2 +-
 .../source/sqlserver/SQLServerSourceOperator.java  |  2 +-
 .../source/tubemq/TubeMQSourceOperator.java        |  2 +-
 .../service/core/sink/IcebergSinkServiceTest.java  |  2 +-
 46 files changed, 107 insertions(+), 143 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
index bf34ec7cd..d4730e18f 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -67,7 +67,7 @@ public class PulsarClusterDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, PulsarClusterDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
index 2cfbb5028..f64ce2afe 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
@@ -57,7 +57,7 @@ public class TubeClusterDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, TubeClusterDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
index 3eeb65726..235c6c273 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
@@ -103,7 +103,7 @@ public class InlongPulsarDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, InlongPulsarDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
index a7e6c3f8e..a860b23d0 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -137,7 +137,7 @@ public class ClickHouseSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
ClickHouseSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
index 1088d2165..6fbb2bbf7 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
@@ -26,8 +26,6 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -42,7 +40,6 @@ import java.util.Map;
 public class DLCIcebergSinkDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DLCIcebergSinkDTO.class);
 
     @ApiModelProperty("Catalog URI of the DLCIceberg server")
     private String catalogUri;
@@ -84,8 +81,7 @@ public class DLCIcebergSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, DLCIcebergSinkDTO.class);
         } catch (Exception e) {
-            LOGGER.error("fetch DLCIceberg sink info failed from json params: 
" + extParams, e);
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
index c4e43fe6e..40acd867f 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
@@ -67,7 +67,7 @@ public class ElasticsearchFieldInfo {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
ElasticsearchFieldInfo.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
index a96beeeb6..ea4624077 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -31,7 +31,6 @@ import org.apache.inlong.manager.common.util.AESUtils;
 
 import javax.validation.constraints.NotNull;
 import java.nio.charset.StandardCharsets;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -121,15 +120,10 @@ public class ElasticsearchSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
ElasticsearchSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
-    public static String getElasticSearchIndexName(ElasticsearchSinkDTO esInfo,
-            List<ElasticsearchFieldInfo> fieldList) {
-        return esInfo.getIndexName();
-    }
-
     private ElasticsearchSinkDTO decryptPassword() throws Exception {
         if (StringUtils.isNotEmpty(this.password)) {
             byte[] passwordBytes = AESUtils.decryptAsString(this.password, 
this.encryptVersion);
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
index 5c3060835..65754e684 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -81,7 +81,7 @@ public class GreenplumSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, GreenplumSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
index cb2c472b8..b4e541ca0 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
@@ -58,7 +58,7 @@ public class HBaseColumnFamilyInfo {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
HBaseColumnFamilyInfo.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
index 4cfe9f1a4..f088097a2 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
@@ -94,7 +94,7 @@ public class HBaseSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, HBaseSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
index 279d83b2d..8f76262ee 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
@@ -86,7 +86,7 @@ public class HDFSSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, HDFSSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index d40a4f4cf..0e7b8a036 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -132,7 +132,7 @@ public class HiveSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
HiveSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
index b7e6a1826..488fbea5b 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
@@ -75,7 +75,7 @@ public class IcebergColumnInfo {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, IcebergColumnInfo.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 2d41f9701..057a8ae83 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -94,7 +94,7 @@ public class IcebergSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, IcebergSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
index 0b94793be..1fe0d47d0 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
@@ -83,7 +83,7 @@ public class KafkaSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, KafkaSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
index 0daf5b98b..3f53262dd 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
@@ -94,7 +94,7 @@ public class MySQLSinkDTO {
             return OBJECT_MAPPER.readValue(extParams, MySQLSinkDTO.class);
         } catch (Exception e) {
             LOGGER.error("fetch mysql sink info failed from json params: " + 
extParams, e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
index a93e0286d..94dd47925 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
@@ -86,7 +86,7 @@ public class OracleSinkDTO {
             return OBJECT_MAPPER.readValue(extParams, OracleSinkDTO.class);
         } catch (Exception e) {
             LOGGER.error("fetch oracle sink info failed from json params: " + 
extParams, e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index f59e6ca42..005bd42a4 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -102,7 +102,7 @@ public class PostgreSQLSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
PostgreSQLSinkDTO.class).decryptPassword();
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
index 9a19e6918..6f9760962 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
@@ -88,7 +88,7 @@ public class SQLServerSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, SQLServerSinkDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
index 42a57785b..7c4dd0655 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
@@ -26,8 +26,6 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.validation.constraints.NotNull;
 import java.util.Map;
@@ -42,7 +40,6 @@ import java.util.Map;
 public class TDSQLPostgreSQLSinkDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TDSQLPostgreSQLSinkDTO.class);
 
     @ApiModelProperty("TDSQLPostgreSQL jdbc url, such as 
jdbc:postgresql://host:port/database")
     private String jdbcUrl;
@@ -88,8 +85,7 @@ public class TDSQLPostgreSQLSinkDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
TDSQLPostgreSQLSinkDTO.class);
         } catch (Exception e) {
-            LOGGER.error("fetch tdsql postgresql sink info failed from json 
params: " + extParams, e);
-            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
index 9d8fcad45..b42bd5d5c 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
@@ -54,7 +54,7 @@ public class AutoPushSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, AutoPushSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
index 0f1e0a478..779557722 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
@@ -71,7 +71,7 @@ public class FileSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, FileSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 773cb40f9..88884eea6 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -120,7 +120,7 @@ public class KafkaSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, KafkaSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
index 5b21955d3..5935e140a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
@@ -85,7 +85,7 @@ public class MongoDBSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, MongoDBSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
index ae63bad4a..80f24038e 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
@@ -154,7 +154,7 @@ public class MySQLBinlogSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
MySQLBinlogSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
index d34acfbb7..b99353323 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
@@ -94,7 +94,7 @@ public class OracleSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, OracleSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
index 367a7c861..5a7c0b598 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
@@ -95,7 +95,7 @@ public class PostgreSQLSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
PostgreSQLSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
index 4b8f292c3..a710cf30b 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
@@ -88,7 +88,7 @@ public class PulsarSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, PulsarSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
index ad412c5b4..5318c0efe 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
@@ -101,7 +101,7 @@ public class SQLServerSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, 
SQLServerSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
index c45f7df0a..fbae9008a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -90,7 +90,7 @@ public class TubeMQSourceDTO {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
             return OBJECT_MAPPER.readValue(extParams, TubeMQSourceDTO.class);
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 29cd9c436..3fb3e420c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -81,7 +81,7 @@ public class PulsarClusterOperator extends 
AbstractClusterOperator {
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
             LOGGER.info("success to set entity for pulsar cluster");
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index cb6eed3bc..833d4e500 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -65,7 +65,7 @@ public class TubeClusterOperator extends 
AbstractClusterOperator {
                 
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
                 LOGGER.info("success to set entity for tube cluster");
             } catch (Exception e) {
-                throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+                throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
             }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index f6d87b8be..cf492425a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
- * Use to cache the sort cluster config and reduce the number of query to 
database.
+ * Used to cache the sort cluster config and reduce the number of query to 
database.
  */
 @Lazy
 @Service
@@ -56,9 +56,9 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SortClusterServiceImpl.class);
 
-    private static final Gson gson = new Gson();
+    private static final Gson GSON = new Gson();
 
-    public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
+    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
 
     private static final int RESPONSE_CODE_SUCCESS = 0;
     private static final int RESPONSE_CODE_NO_UPDATE = 1;
@@ -96,7 +96,7 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
     @Transactional(rollbackFor = Exception.class)
     public void reload() {
-        LOGGER.debug("start to reload sort config.");
+        LOGGER.debug("start to reload sort config");
         try {
             reloadAllClusterConfig();
         } catch (Throwable t) {
@@ -154,54 +154,48 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
     /**
      * Reload all cluster config.
-     *
-     * <p>
-     *     The reload results, including config, md5 and error log, will 
replace the older ones.
-     * </p>
+     * The results including config, md5 and error log, will replace the older 
ones.
      */
     private void reloadAllClusterConfig() {
         // get all task and group by cluster
         List<SortTaskInfo> tasks = streamSinkEntityMapper.selectAllTasks();
-        Map<String, List<SortTaskInfo>> clusterTaskMap =
-                tasks.stream()
-                        .filter(dto -> dto.getSortClusterName() != null)
-                        
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
+        Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
+                .filter(dto -> dto.getSortClusterName() != null)
+                
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
 
         // get all id params and group by task
         List<SortIdInfo> idParams = streamSinkEntityMapper.selectAllIdParams();
-        Map<String, List<SortIdInfo>> taskIdParamMap =
-                idParams.stream()
-                        .filter(dto -> dto.getSortTaskName() != null)
-                        
.collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
+        Map<String, List<SortIdInfo>> taskIdParamMap = idParams.stream()
+                .filter(dto -> dto.getSortTaskName() != null)
+                .collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
 
         // get all sink params and group by data node name
         List<SortSinkInfo> sinkParams = 
dataNodeEntityMapper.selectAllSinkParams();
-        Map<String, SortSinkInfo> taskSinkParamMap =
-                sinkParams.stream()
-                        .filter(dto -> dto.getName() != null)
-                        .collect(Collectors.toMap(SortSinkInfo::getName, param 
-> param));
+        Map<String, SortSinkInfo> taskSinkParamMap = sinkParams.stream()
+                .filter(dto -> dto.getName() != null)
+                .collect(Collectors.toMap(SortSinkInfo::getName, param -> 
param));
 
         // update config of each cluster
         Map<String, SortClusterConfig> newConfigMap = new 
ConcurrentHashMap<>();
         Map<String, String> newMd5Map = new ConcurrentHashMap<>();
-        Map<String, String> newErrorlogMap = new ConcurrentHashMap<>();
+        Map<String, String> newErrorLogMap = new ConcurrentHashMap<>();
         clusterTaskMap.forEach((clusterName, taskList) -> {
             try {
                 // get config, then update config map and md5
-                SortClusterConfig clusterConfig =
-                        getConfigByClusterName(clusterName, taskList, 
taskIdParamMap, taskSinkParamMap);
-                String jsonStr = gson.toJson(clusterConfig);
+                SortClusterConfig clusterConfig = 
getConfigByClusterName(clusterName, taskList, taskIdParamMap,
+                        taskSinkParamMap);
+                String jsonStr = GSON.toJson(clusterConfig);
                 String md5 = DigestUtils.md5Hex(jsonStr);
                 newConfigMap.put(clusterName, clusterConfig);
                 newMd5Map.put(clusterName, md5);
             } catch (Throwable e) {
                 // if get config failed, update the err log.
-                newErrorlogMap.put(clusterName, e.getMessage());
+                newErrorLogMap.put(clusterName, e.getMessage());
                 LOGGER.error("Failed to update cluster config of {}, error is 
{}", clusterName, e.getMessage());
                 LOGGER.error(e.getMessage(), e);
             }
         });
-        sortClusterErrorLogMap = newErrorlogMap;
+        sortClusterErrorLogMap = newErrorLogMap;
         sortClusterConfigMap = newConfigMap;
         sortClusterMd5Map = newMd5Map;
     }
@@ -240,9 +234,8 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
     /**
      * Get task config.
-     * <p>
-     *     If there is no any id or sink params, throw exception to upper 
caller.
-     * </p>
+     * <p/>
+     * If there is not any id or sink params, throw exception to upper caller.
      *
      * @param taskName Task name.
      * @param type Type of sink.
@@ -250,12 +243,8 @@ public class SortClusterServiceImpl implements 
SortClusterService {
      * @param sinkParams Sink params.
      * @return Task config.
      */
-    private SortTaskConfig getTaskConfig(
-            String taskName,
-            String type,
-            List<SortIdInfo> idParams,
+    private SortTaskConfig getTaskConfig(String taskName, String type, 
List<SortIdInfo> idParams,
             SortSinkInfo sinkParams) {
-
         // return null if id params or sink params are empty.
         if (idParams == null || sinkParams == null) {
             return null;
@@ -263,8 +252,8 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
         if (!type.equalsIgnoreCase(sinkParams.getType())) {
             throw new IllegalArgumentException(
-                    String.format("for task %s, task type %s and sink type %s 
are not identical",
-                            taskName, type, sinkParams.getType()));
+                    String.format("task type %s and sink type %s are not 
identical for task name %s",
+                            type, sinkParams.getType(), taskName));
         }
 
         return SortTaskConfig.builder()
@@ -273,18 +262,18 @@ public class SortClusterServiceImpl implements 
SortClusterService {
                 .idParams(this.parseIdParams(idParams))
                 .sinkParams(this.parseSinkParams(sinkParams))
                 .build();
-
     }
 
     /**
      * Parse id params from json.
+     *
      * @param rowIdParams IdParams in json format.
      * @return List of IdParams.
      */
     private List<Map<String, String>> parseIdParams(List<SortIdInfo> 
rowIdParams) {
         return rowIdParams.stream()
                 .map(row -> {
-                    Map<String, String> param = 
gson.fromJson(row.getExtParams(), HashMap.class);
+                    Map<String, String> param = 
GSON.fromJson(row.getExtParams(), HashMap.class);
                     // put group and stream info
                     param.put(KEY_GROUP_ID, row.getInlongGroupId());
                     param.put(KEY_STREAM_ID, row.getInlongStreamId());
@@ -295,11 +284,12 @@ public class SortClusterServiceImpl implements 
SortClusterService {
 
     /**
      * Parse sink params from json.
+     *
      * @param rowSinkParams Sink params in json format.
      * @return Sink params.
      */
     private Map<String, String> parseSinkParams(SortSinkInfo rowSinkParams) {
-        return gson.fromJson(rowSinkParams.getExtParams(), HashMap.class);
+        return GSON.fromJson(rowSinkParams.getExtParams(), HashMap.class);
     }
 
     /**
@@ -309,4 +299,4 @@ public class SortClusterServiceImpl implements 
SortClusterService {
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
         executorService.scheduleAtFixedRate(this::reload, reloadInterval, 
reloadInterval, TimeUnit.MILLISECONDS);
     }
-}
\ No newline at end of file
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
index 58f2ccea3..ce6654a5f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -31,7 +32,6 @@ import 
org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -60,10 +60,8 @@ public class InlongStreamProcessOperation {
 
     @Autowired
     private InlongGroupService groupService;
-
     @Autowired
     private InlongStreamService streamService;
-
     @Autowired
     private WorkflowService workflowService;
 
@@ -71,7 +69,7 @@ public class InlongStreamProcessOperation {
      * Create stream in synchronous/asynchronous way.
      */
     public boolean startProcess(String groupId, String streamId, String 
operator, boolean sync) {
-        log.info("StartProcess for groupId={}, streamId={}", groupId, 
streamId);
+        log.info("begin to start stream process for groupId={} streamId={}", 
groupId, streamId);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         if (groupInfo == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -79,7 +77,7 @@ public class InlongStreamProcessOperation {
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
         if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != 
GroupStatus.RESTARTED) {
             throw new BusinessException(
-                    String.format("GroupId=%s, status=%s not correct for 
stream start", groupId, groupStatus));
+                    String.format("group status =%s not support start stream 
for groupId=%s", groupStatus, groupId));
         }
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         if (streamInfo == null) {
@@ -87,25 +85,24 @@ public class InlongStreamProcessOperation {
         }
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
         if (status == StreamStatus.CONFIG_ING) {
-            log.warn("GroupId={}, StreamId={} is already in {}", groupId, 
streamId, status);
+            log.warn("stream status={}, no need restart for groupId={}, 
streamId={}", status, groupId, streamId);
             return true;
         }
+        // only new, failed, and success status support update
         if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
                 && status != StreamStatus.CONFIG_SUCCESSFUL) {
             throw new BusinessException(
-                    String.format("GroupId=%s, StreamId=%s, status=%s not 
correct for stream start", groupId, streamId,
-                            status));
+                    String.format("stream status=%s not support start stream 
for groupId=%s streamId=%s",
+                            status, groupId, streamId));
         }
         StreamResourceProcessForm processForm = 
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
         ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
         if (sync) {
-            WorkflowResult workflowResult = workflowService.start(processName, 
operator,
-                    processForm);
+            WorkflowResult workflowResult = workflowService.start(processName, 
operator, processForm);
             ProcessStatus processStatus = 
workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(
-                    () -> workflowService.start(processName, operator, 
processForm));
+            executorService.execute(() -> workflowService.start(processName, 
operator, processForm));
             return true;
         }
     }
@@ -114,7 +111,7 @@ public class InlongStreamProcessOperation {
      * Suspend stream in synchronous/asynchronous way.
      */
     public boolean suspendProcess(String groupId, String streamId, String 
operator, boolean sync) {
-        log.info("SuspendProcess for groupId={}, streamId={}", groupId, 
streamId);
+        log.info("begin to suspend stream process for groupId={} streamId={}", 
groupId, streamId);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         if (groupInfo == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -124,7 +121,7 @@ public class InlongStreamProcessOperation {
                 && groupStatus != GroupStatus.RESTARTED
                 && groupStatus != GroupStatus.SUSPENDED) {
             throw new BusinessException(
-                    String.format("GroupId=%s, status=%s not correct for 
stream suspend", groupId, groupStatus));
+                    String.format("group status=%s not support suspend stream 
for groupId=%s", groupStatus, groupId));
         }
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         if (streamInfo == null) {
@@ -137,20 +134,17 @@ public class InlongStreamProcessOperation {
         }
         if (status != StreamStatus.CONFIG_SUCCESSFUL && status != 
StreamStatus.RESTARTED) {
             throw new BusinessException(
-                    String.format("GroupId=%s, StreamId=%s, status=%s not 
correct for stream suspend", groupId,
-                            streamId,
-                            status));
+                    String.format("stream status=%s not support suspend stream 
for groupId=%s streamId=%s",
+                            status, groupId, streamId));
         }
         StreamResourceProcessForm processForm = 
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.SUSPEND);
         ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
         if (sync) {
-            WorkflowResult workflowResult = workflowService.start(processName, 
operator,
-                    processForm);
+            WorkflowResult workflowResult = workflowService.start(processName, 
operator, processForm);
             ProcessStatus processStatus = 
workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(
-                    () -> workflowService.start(processName, operator, 
processForm));
+            executorService.execute(() -> workflowService.start(processName, 
operator, processForm));
             return true;
         }
     }
@@ -159,7 +153,7 @@ public class InlongStreamProcessOperation {
      * Restart stream in synchronous/asynchronous way.
      */
     public boolean restartProcess(String groupId, String streamId, String 
operator, boolean sync) {
-        log.info("RestartProcess for groupId={}, streamId={}", groupId, 
streamId);
+        log.info("begin to restart stream process for groupId={} streamId={}", 
groupId, streamId);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         if (groupInfo == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -168,7 +162,7 @@ public class InlongStreamProcessOperation {
         if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
                 && groupStatus != GroupStatus.RESTARTED) {
             throw new BusinessException(
-                    String.format("GroupId=%s, status=%s not correct for 
stream restart", groupId, groupStatus));
+                    String.format("group status=%s not support restart stream 
for groupId=%s", groupStatus, groupId));
         }
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         if (streamInfo == null) {
@@ -181,20 +175,17 @@ public class InlongStreamProcessOperation {
         }
         if (status != StreamStatus.SUSPENDED) {
             throw new BusinessException(
-                    String.format("GroupId=%s, StreamId=%s, status=%s not 
correct for stream restart", groupId,
-                            streamId,
-                            status));
+                    String.format("stream status=%s not support restart stream 
for groupId=%s streamId=%s",
+                            status, groupId, streamId));
         }
         StreamResourceProcessForm processForm = 
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.RESTART);
         ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
         if (sync) {
-            WorkflowResult workflowResult = workflowService.start(processName, 
operator,
-                    processForm);
+            WorkflowResult workflowResult = workflowService.start(processName, 
operator, processForm);
             ProcessStatus processStatus = 
workflowResult.getProcessInfo().getStatus();
             return processStatus == ProcessStatus.COMPLETED;
         } else {
-            executorService.execute(
-                    () -> workflowService.start(processName, operator, 
processForm));
+            executorService.execute(() -> workflowService.start(processName, 
operator, processForm));
             return true;
         }
     }
@@ -203,7 +194,7 @@ public class InlongStreamProcessOperation {
      * Restart stream in synchronous/asynchronous way.
      */
     public boolean deleteProcess(String groupId, String streamId, String 
operator, boolean sync) {
-        log.info("DeleteProcess for groupId={}, streamId={}", groupId, 
streamId);
+        log.info("begin to delete stream process for groupId={} streamId={}", 
groupId, streamId);
         InlongGroupInfo groupInfo = groupService.get(groupId);
         if (groupInfo == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -214,7 +205,7 @@ public class InlongStreamProcessOperation {
                 && groupStatus != GroupStatus.SUSPENDED
                 && groupStatus != GroupStatus.DELETING) {
             throw new BusinessException(
-                    String.format("GroupId=%s, status=%s not correct for 
stream delete", groupId, groupStatus));
+                    String.format("group status=%s not support delete stream 
for groupId=%s", groupStatus, groupId));
         }
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         if (streamInfo == null) {
@@ -229,15 +220,13 @@ public class InlongStreamProcessOperation {
                 || status == StreamStatus.RESTARTING
                 || status == StreamStatus.SUSPENDING) {
             throw new BusinessException(
-                    String.format("GroupId=%s, StreamId=%s, status=%s not 
correct for stream delete", groupId,
-                            streamId,
-                            status));
+                    String.format("stream status=%s not support delete stream 
for groupId=%s streamId=%s",
+                            status, groupId, streamId));
         }
         StreamResourceProcessForm processForm = 
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.DELETE);
         ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
         if (sync) {
-            WorkflowResult workflowResult = workflowService.start(processName, 
operator,
-                    processForm);
+            WorkflowResult workflowResult = workflowService.start(processName, 
operator, processForm);
             ProcessStatus processStatus = 
workflowResult.getProcessInfo().getStatus();
             if (processStatus == ProcessStatus.COMPLETED) {
                 return streamService.delete(groupId, streamId, operator);
@@ -245,14 +234,13 @@ public class InlongStreamProcessOperation {
                 return false;
             }
         } else {
-            executorService.execute(
-                    () -> {
-                        WorkflowResult workflowResult = 
workflowService.start(processName, operator, processForm);
-                        ProcessStatus processStatus = 
workflowResult.getProcessInfo().getStatus();
-                        if (processStatus == ProcessStatus.COMPLETED) {
-                            streamService.delete(groupId, streamId, operator);
-                        }
-                    });
+            executorService.execute(() -> {
+                WorkflowResult workflowResult = 
workflowService.start(processName, operator, processForm);
+                ProcessStatus processStatus = 
workflowResult.getProcessInfo().getStatus();
+                if (processStatus == ProcessStatus.COMPLETED) {
+                    streamService.delete(groupId, streamId, operator);
+                }
+            });
             return true;
         }
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
index 4d9b700a7..6bec4feb6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
@@ -99,7 +99,7 @@ public class InlongPulsarOperator extends 
AbstractGroupOperator {
             InlongPulsarDTO dto = 
InlongPulsarDTO.getFromRequest(pulsarRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
         LOGGER.info("success set entity for inlong group with Pulsar");
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
index cbcc15f56..43b4ba38e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
@@ -67,7 +67,7 @@ public class AutoPushSourceOperator extends 
AbstractSourceOperator {
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             LOGGER.error("parsing json string to source info failed", e);
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
index 5f2e73feb..c3b324aae 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
@@ -62,7 +62,7 @@ public class BinlogSourceOperator extends 
AbstractSourceOperator {
             MySQLBinlogSourceDTO dto = 
MySQLBinlogSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index ab4baa544..a561844d5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -62,7 +62,7 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
             FileSourceDTO dto = FileSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 3cb0b2615..28e473163 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -62,7 +62,7 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
             KafkaSourceDTO dto = KafkaSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
index aeba33ede..74c0bc466 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
@@ -62,7 +62,7 @@ public class MongoDBSourceOperator extends 
AbstractSourceOperator {
             MongoDBSourceDTO dto = 
MongoDBSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
index f9dd4ee0b..d70b88a12 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
@@ -62,7 +62,7 @@ public class OracleSourceOperator extends 
AbstractSourceOperator {
             OracleSourceDTO dto = 
OracleSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
index 8688a751d..4aba1e868 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
@@ -62,7 +62,7 @@ public class PostgreSQLSourceOperator extends 
AbstractSourceOperator {
             PostgreSQLSourceDTO dto = 
PostgreSQLSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 9a02a1399..21bce91bc 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -79,7 +79,7 @@ public class PulsarSourceOperator extends 
AbstractSourceOperator {
             PulsarSourceDTO dto = 
PulsarSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
index 7dacb20b1..9064249d6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
@@ -62,7 +62,7 @@ public class SQLServerSourceOperator extends 
AbstractSourceOperator {
             SQLServerSourceDTO dto = 
SQLServerSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 6e14acdac..55741fb99 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -75,7 +75,7 @@ public class TubeMQSourceOperator extends 
AbstractSourceOperator {
             TubeMQSourceDTO dto = 
TubeMQSourceDTO.getFromRequest(sourceRequest);
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
-            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
index bfd67314b..c9053a8ec 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
- * Iceberg sink service test..
+ * Iceberg stream sink service test.
  */
 public class IcebergSinkServiceTest extends ServiceBaseTest {
 

Reply via email to