This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 95b11c169 [INLONG-5184][Manager] Add more info for some error logs
(#5185)
95b11c169 is described below
commit 95b11c169085f1cb5b8fc3469bd7015de9b17562
Author: healchow <[email protected]>
AuthorDate: Sun Jul 24 16:21:04 2022 +0800
[INLONG-5184][Manager] Add more info for some error logs (#5185)
---
.../pojo/cluster/pulsar/PulsarClusterDTO.java | 2 +-
.../common/pojo/cluster/tube/TubeClusterDTO.java | 2 +-
.../common/pojo/group/pulsar/InlongPulsarDTO.java | 2 +-
.../common/pojo/sink/ck/ClickHouseSinkDTO.java | 2 +-
.../pojo/sink/dlciceberg/DLCIcebergSinkDTO.java | 6 +-
.../pojo/sink/es/ElasticsearchFieldInfo.java | 2 +-
.../common/pojo/sink/es/ElasticsearchSinkDTO.java | 8 +--
.../pojo/sink/greenplum/GreenplumSinkDTO.java | 2 +-
.../pojo/sink/hbase/HBaseColumnFamilyInfo.java | 2 +-
.../common/pojo/sink/hbase/HBaseSinkDTO.java | 2 +-
.../manager/common/pojo/sink/hdfs/HDFSSinkDTO.java | 2 +-
.../manager/common/pojo/sink/hive/HiveSinkDTO.java | 2 +-
.../pojo/sink/iceberg/IcebergColumnInfo.java | 2 +-
.../common/pojo/sink/iceberg/IcebergSinkDTO.java | 2 +-
.../common/pojo/sink/kafka/KafkaSinkDTO.java | 2 +-
.../common/pojo/sink/mysql/MySQLSinkDTO.java | 2 +-
.../common/pojo/sink/oracle/OracleSinkDTO.java | 2 +-
.../pojo/sink/postgresql/PostgreSQLSinkDTO.java | 2 +-
.../pojo/sink/sqlserver/SQLServerSinkDTO.java | 2 +-
.../tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java | 6 +-
.../pojo/source/autopush/AutoPushSourceDTO.java | 2 +-
.../common/pojo/source/file/FileSourceDTO.java | 2 +-
.../common/pojo/source/kafka/KafkaSourceDTO.java | 2 +-
.../pojo/source/mongodb/MongoDBSourceDTO.java | 2 +-
.../pojo/source/mysql/MySQLBinlogSourceDTO.java | 2 +-
.../common/pojo/source/oracle/OracleSourceDTO.java | 2 +-
.../source/postgresql/PostgreSQLSourceDTO.java | 2 +-
.../common/pojo/source/pulsar/PulsarSourceDTO.java | 2 +-
.../pojo/source/sqlserver/SQLServerSourceDTO.java | 2 +-
.../common/pojo/source/tubemq/TubeMQSourceDTO.java | 2 +-
.../service/cluster/PulsarClusterOperator.java | 2 +-
.../service/cluster/TubeClusterOperator.java | 2 +-
.../service/core/impl/SortClusterServiceImpl.java | 70 +++++++++----------
.../operation/InlongStreamProcessOperation.java | 78 +++++++++-------------
.../service/group/InlongPulsarOperator.java | 2 +-
.../source/autopush/AutoPushSourceOperator.java | 2 +-
.../source/binlog/BinlogSourceOperator.java | 2 +-
.../service/source/file/FileSourceOperator.java | 2 +-
.../service/source/kafka/KafkaSourceOperator.java | 2 +-
.../source/mongodb/MongoDBSourceOperator.java | 2 +-
.../source/oracle/OracleSourceOperator.java | 2 +-
.../postgresql/PostgreSQLSourceOperator.java | 2 +-
.../source/pulsar/PulsarSourceOperator.java | 2 +-
.../source/sqlserver/SQLServerSourceOperator.java | 2 +-
.../source/tubemq/TubeMQSourceOperator.java | 2 +-
.../service/core/sink/IcebergSinkServiceTest.java | 2 +-
46 files changed, 107 insertions(+), 143 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
index bf34ec7cd..d4730e18f 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -67,7 +67,7 @@ public class PulsarClusterDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, PulsarClusterDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
index 2cfbb5028..f64ce2afe 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
@@ -57,7 +57,7 @@ public class TubeClusterDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, TubeClusterDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
index 3eeb65726..235c6c273 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/pulsar/InlongPulsarDTO.java
@@ -103,7 +103,7 @@ public class InlongPulsarDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, InlongPulsarDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
index a7e6c3f8e..a860b23d0 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -137,7 +137,7 @@ public class ClickHouseSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
ClickHouseSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
index 1088d2165..6fbb2bbf7 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/dlciceberg/DLCIcebergSinkDTO.java
@@ -26,8 +26,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;
import java.util.Map;
@@ -42,7 +40,6 @@ import java.util.Map;
public class DLCIcebergSinkDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final Logger LOGGER =
LoggerFactory.getLogger(DLCIcebergSinkDTO.class);
@ApiModelProperty("Catalog URI of the DLCIceberg server")
private String catalogUri;
@@ -84,8 +81,7 @@ public class DLCIcebergSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, DLCIcebergSinkDTO.class);
} catch (Exception e) {
- LOGGER.error("fetch DLCIceberg sink info failed from json params:
" + extParams, e);
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
index c4e43fe6e..40acd867f 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchFieldInfo.java
@@ -67,7 +67,7 @@ public class ElasticsearchFieldInfo {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
ElasticsearchFieldInfo.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
index a96beeeb6..ea4624077 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -31,7 +31,6 @@ import org.apache.inlong.manager.common.util.AESUtils;
import javax.validation.constraints.NotNull;
import java.nio.charset.StandardCharsets;
-import java.util.List;
import java.util.Map;
/**
@@ -121,15 +120,10 @@ public class ElasticsearchSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
ElasticsearchSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
- public static String getElasticSearchIndexName(ElasticsearchSinkDTO esInfo,
- List<ElasticsearchFieldInfo> fieldList) {
- return esInfo.getIndexName();
- }
-
private ElasticsearchSinkDTO decryptPassword() throws Exception {
if (StringUtils.isNotEmpty(this.password)) {
byte[] passwordBytes = AESUtils.decryptAsString(this.password,
this.encryptVersion);
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
index 5c3060835..65754e684 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -81,7 +81,7 @@ public class GreenplumSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, GreenplumSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
index cb2c472b8..b4e541ca0 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseColumnFamilyInfo.java
@@ -58,7 +58,7 @@ public class HBaseColumnFamilyInfo {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
HBaseColumnFamilyInfo.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
index 4cfe9f1a4..f088097a2 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hbase/HBaseSinkDTO.java
@@ -94,7 +94,7 @@ public class HBaseSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, HBaseSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
index 279d83b2d..8f76262ee 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hdfs/HDFSSinkDTO.java
@@ -86,7 +86,7 @@ public class HDFSSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, HDFSSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index d40a4f4cf..0e7b8a036 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -132,7 +132,7 @@ public class HiveSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
HiveSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
index b7e6a1826..488fbea5b 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergColumnInfo.java
@@ -75,7 +75,7 @@ public class IcebergColumnInfo {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, IcebergColumnInfo.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 2d41f9701..057a8ae83 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -94,7 +94,7 @@ public class IcebergSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, IcebergSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
index 0b94793be..1fe0d47d0 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
@@ -83,7 +83,7 @@ public class KafkaSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, KafkaSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
index 0daf5b98b..3f53262dd 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/mysql/MySQLSinkDTO.java
@@ -94,7 +94,7 @@ public class MySQLSinkDTO {
return OBJECT_MAPPER.readValue(extParams, MySQLSinkDTO.class);
} catch (Exception e) {
LOGGER.error("fetch mysql sink info failed from json params: " +
extParams, e);
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
index a93e0286d..94dd47925 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
@@ -86,7 +86,7 @@ public class OracleSinkDTO {
return OBJECT_MAPPER.readValue(extParams, OracleSinkDTO.class);
} catch (Exception e) {
LOGGER.error("fetch oracle sink info failed from json params: " +
extParams, e);
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index f59e6ca42..005bd42a4 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -102,7 +102,7 @@ public class PostgreSQLSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
PostgreSQLSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
index 9a19e6918..6f9760962 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/sqlserver/SQLServerSinkDTO.java
@@ -88,7 +88,7 @@ public class SQLServerSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, SQLServerSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
index 42a57785b..7c4dd0655 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
@@ -26,8 +26,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;
import java.util.Map;
@@ -42,7 +40,6 @@ import java.util.Map;
public class TDSQLPostgreSQLSinkDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final Logger LOGGER =
LoggerFactory.getLogger(TDSQLPostgreSQLSinkDTO.class);
@ApiModelProperty("TDSQLPostgreSQL jdbc url, such as
jdbc:postgresql://host:port/database")
private String jdbcUrl;
@@ -88,8 +85,7 @@ public class TDSQLPostgreSQLSinkDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
TDSQLPostgreSQLSinkDTO.class);
} catch (Exception e) {
- LOGGER.error("fetch tdsql postgresql sink info failed from json
params: " + extParams, e);
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
index 9d8fcad45..b42bd5d5c 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/autopush/AutoPushSourceDTO.java
@@ -54,7 +54,7 @@ public class AutoPushSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, AutoPushSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
index 0f1e0a478..779557722 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
@@ -71,7 +71,7 @@ public class FileSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, FileSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 773cb40f9..88884eea6 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -120,7 +120,7 @@ public class KafkaSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, KafkaSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
index 5b21955d3..5935e140a 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
@@ -85,7 +85,7 @@ public class MongoDBSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, MongoDBSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
index ae63bad4a..80f24038e 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mysql/MySQLBinlogSourceDTO.java
@@ -154,7 +154,7 @@ public class MySQLBinlogSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
MySQLBinlogSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
index d34acfbb7..b99353323 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
@@ -94,7 +94,7 @@ public class OracleSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, OracleSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
index 367a7c861..5a7c0b598 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgresql/PostgreSQLSourceDTO.java
@@ -95,7 +95,7 @@ public class PostgreSQLSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
PostgreSQLSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
index 4b8f292c3..a710cf30b 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/pulsar/PulsarSourceDTO.java
@@ -88,7 +88,7 @@ public class PulsarSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, PulsarSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
index ad412c5b4..5318c0efe 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/sqlserver/SQLServerSourceDTO.java
@@ -101,7 +101,7 @@ public class SQLServerSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams,
SQLServerSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
index c45f7df0a..fbae9008a 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -90,7 +90,7 @@ public class TubeMQSourceDTO {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
return OBJECT_MAPPER.readValue(extParams, TubeMQSourceDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 29cd9c436..3fb3e420c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -81,7 +81,7 @@ public class PulsarClusterOperator extends
AbstractClusterOperator {
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
LOGGER.info("success to set entity for pulsar cluster");
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index cb6eed3bc..833d4e500 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -65,7 +65,7 @@ public class TubeClusterOperator extends
AbstractClusterOperator {
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
LOGGER.info("success to set entity for tube cluster");
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index f6d87b8be..cf492425a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
- * Use to cache the sort cluster config and reduce the number of query to
database.
+ * Used to cache the sort cluster config and reduce the number of query to
database.
*/
@Lazy
@Service
@@ -56,9 +56,9 @@ public class SortClusterServiceImpl implements
SortClusterService {
private static final Logger LOGGER =
LoggerFactory.getLogger(SortClusterServiceImpl.class);
- private static final Gson gson = new Gson();
+ private static final Gson GSON = new Gson();
- public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
+ private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
private static final int RESPONSE_CODE_SUCCESS = 0;
private static final int RESPONSE_CODE_NO_UPDATE = 1;
@@ -96,7 +96,7 @@ public class SortClusterServiceImpl implements
SortClusterService {
@Transactional(rollbackFor = Exception.class)
public void reload() {
- LOGGER.debug("start to reload sort config.");
+ LOGGER.debug("start to reload sort config");
try {
reloadAllClusterConfig();
} catch (Throwable t) {
@@ -154,54 +154,48 @@ public class SortClusterServiceImpl implements
SortClusterService {
/**
* Reload all cluster config.
- *
- * <p>
- * The reload results, including config, md5 and error log, will
replace the older ones.
- * </p>
+ * The results including config, md5 and error log, will replace the older
ones.
*/
private void reloadAllClusterConfig() {
// get all task and group by cluster
List<SortTaskInfo> tasks = streamSinkEntityMapper.selectAllTasks();
- Map<String, List<SortTaskInfo>> clusterTaskMap =
- tasks.stream()
- .filter(dto -> dto.getSortClusterName() != null)
-
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
+ Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
+ .filter(dto -> dto.getSortClusterName() != null)
+
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
// get all id params and group by task
List<SortIdInfo> idParams = streamSinkEntityMapper.selectAllIdParams();
- Map<String, List<SortIdInfo>> taskIdParamMap =
- idParams.stream()
- .filter(dto -> dto.getSortTaskName() != null)
-
.collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
+ Map<String, List<SortIdInfo>> taskIdParamMap = idParams.stream()
+ .filter(dto -> dto.getSortTaskName() != null)
+ .collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
// get all sink params and group by data node name
List<SortSinkInfo> sinkParams =
dataNodeEntityMapper.selectAllSinkParams();
- Map<String, SortSinkInfo> taskSinkParamMap =
- sinkParams.stream()
- .filter(dto -> dto.getName() != null)
- .collect(Collectors.toMap(SortSinkInfo::getName, param
-> param));
+ Map<String, SortSinkInfo> taskSinkParamMap = sinkParams.stream()
+ .filter(dto -> dto.getName() != null)
+ .collect(Collectors.toMap(SortSinkInfo::getName, param ->
param));
// update config of each cluster
Map<String, SortClusterConfig> newConfigMap = new
ConcurrentHashMap<>();
Map<String, String> newMd5Map = new ConcurrentHashMap<>();
- Map<String, String> newErrorlogMap = new ConcurrentHashMap<>();
+ Map<String, String> newErrorLogMap = new ConcurrentHashMap<>();
clusterTaskMap.forEach((clusterName, taskList) -> {
try {
// get config, then update config map and md5
- SortClusterConfig clusterConfig =
- getConfigByClusterName(clusterName, taskList,
taskIdParamMap, taskSinkParamMap);
- String jsonStr = gson.toJson(clusterConfig);
+ SortClusterConfig clusterConfig =
getConfigByClusterName(clusterName, taskList, taskIdParamMap,
+ taskSinkParamMap);
+ String jsonStr = GSON.toJson(clusterConfig);
String md5 = DigestUtils.md5Hex(jsonStr);
newConfigMap.put(clusterName, clusterConfig);
newMd5Map.put(clusterName, md5);
} catch (Throwable e) {
// if get config failed, update the err log.
- newErrorlogMap.put(clusterName, e.getMessage());
+ newErrorLogMap.put(clusterName, e.getMessage());
LOGGER.error("Failed to update cluster config of {}, error is
{}", clusterName, e.getMessage());
LOGGER.error(e.getMessage(), e);
}
});
- sortClusterErrorLogMap = newErrorlogMap;
+ sortClusterErrorLogMap = newErrorLogMap;
sortClusterConfigMap = newConfigMap;
sortClusterMd5Map = newMd5Map;
}
@@ -240,9 +234,8 @@ public class SortClusterServiceImpl implements
SortClusterService {
/**
* Get task config.
- * <p>
- * If there is no any id or sink params, throw exception to upper
caller.
- * </p>
+ * <p/>
+ * If there is not any id or sink params, throw exception to upper caller.
*
* @param taskName Task name.
* @param type Type of sink.
@@ -250,12 +243,8 @@ public class SortClusterServiceImpl implements
SortClusterService {
* @param sinkParams Sink params.
* @return Task config.
*/
- private SortTaskConfig getTaskConfig(
- String taskName,
- String type,
- List<SortIdInfo> idParams,
+ private SortTaskConfig getTaskConfig(String taskName, String type,
List<SortIdInfo> idParams,
SortSinkInfo sinkParams) {
-
// return null if id params or sink params are empty.
if (idParams == null || sinkParams == null) {
return null;
@@ -263,8 +252,8 @@ public class SortClusterServiceImpl implements
SortClusterService {
if (!type.equalsIgnoreCase(sinkParams.getType())) {
throw new IllegalArgumentException(
- String.format("for task %s, task type %s and sink type %s
are not identical",
- taskName, type, sinkParams.getType()));
+ String.format("task type %s and sink type %s are not
identical for task name %s",
+ type, sinkParams.getType(), taskName));
}
return SortTaskConfig.builder()
@@ -273,18 +262,18 @@ public class SortClusterServiceImpl implements
SortClusterService {
.idParams(this.parseIdParams(idParams))
.sinkParams(this.parseSinkParams(sinkParams))
.build();
-
}
/**
* Parse id params from json.
+ *
* @param rowIdParams IdParams in json format.
* @return List of IdParams.
*/
private List<Map<String, String>> parseIdParams(List<SortIdInfo>
rowIdParams) {
return rowIdParams.stream()
.map(row -> {
- Map<String, String> param =
gson.fromJson(row.getExtParams(), HashMap.class);
+ Map<String, String> param =
GSON.fromJson(row.getExtParams(), HashMap.class);
// put group and stream info
param.put(KEY_GROUP_ID, row.getInlongGroupId());
param.put(KEY_STREAM_ID, row.getInlongStreamId());
@@ -295,11 +284,12 @@ public class SortClusterServiceImpl implements
SortClusterService {
/**
* Parse sink params from json.
+ *
* @param rowSinkParams Sink params in json format.
* @return Sink params.
*/
private Map<String, String> parseSinkParams(SortSinkInfo rowSinkParams) {
- return gson.fromJson(rowSinkParams.getExtParams(), HashMap.class);
+ return GSON.fromJson(rowSinkParams.getExtParams(), HashMap.class);
}
/**
@@ -309,4 +299,4 @@ public class SortClusterServiceImpl implements
SortClusterService {
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(this::reload, reloadInterval,
reloadInterval, TimeUnit.MILLISECONDS);
}
-}
\ No newline at end of file
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
index 58f2ccea3..ce6654a5f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -31,7 +32,6 @@ import
org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -60,10 +60,8 @@ public class InlongStreamProcessOperation {
@Autowired
private InlongGroupService groupService;
-
@Autowired
private InlongStreamService streamService;
-
@Autowired
private WorkflowService workflowService;
@@ -71,7 +69,7 @@ public class InlongStreamProcessOperation {
* Create stream in synchronous/asynchronous way.
*/
public boolean startProcess(String groupId, String streamId, String
operator, boolean sync) {
- log.info("StartProcess for groupId={}, streamId={}", groupId,
streamId);
+ log.info("begin to start stream process for groupId={} streamId={}",
groupId, streamId);
InlongGroupInfo groupInfo = groupService.get(groupId);
if (groupInfo == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -79,7 +77,7 @@ public class InlongStreamProcessOperation {
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus !=
GroupStatus.RESTARTED) {
throw new BusinessException(
- String.format("GroupId=%s, status=%s not correct for
stream start", groupId, groupStatus));
+ String.format("group status =%s not support start stream
for groupId=%s", groupStatus, groupId));
}
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
if (streamInfo == null) {
@@ -87,25 +85,24 @@ public class InlongStreamProcessOperation {
}
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
if (status == StreamStatus.CONFIG_ING) {
- log.warn("GroupId={}, StreamId={} is already in {}", groupId,
streamId, status);
+ log.warn("stream status={}, no need restart for groupId={},
streamId={}", status, groupId, streamId);
return true;
}
+ // only new, failed, and success status support update
if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
&& status != StreamStatus.CONFIG_SUCCESSFUL) {
throw new BusinessException(
- String.format("GroupId=%s, StreamId=%s, status=%s not
correct for stream start", groupId, streamId,
- status));
+ String.format("stream status=%s not support start stream
for groupId=%s streamId=%s",
+ status, groupId, streamId));
}
StreamResourceProcessForm processForm =
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
if (sync) {
- WorkflowResult workflowResult = workflowService.start(processName,
operator,
- processForm);
+ WorkflowResult workflowResult = workflowService.start(processName,
operator, processForm);
ProcessStatus processStatus =
workflowResult.getProcessInfo().getStatus();
return processStatus == ProcessStatus.COMPLETED;
} else {
- executorService.execute(
- () -> workflowService.start(processName, operator,
processForm));
+ executorService.execute(() -> workflowService.start(processName,
operator, processForm));
return true;
}
}
@@ -114,7 +111,7 @@ public class InlongStreamProcessOperation {
* Suspend stream in synchronous/asynchronous way.
*/
public boolean suspendProcess(String groupId, String streamId, String
operator, boolean sync) {
- log.info("SuspendProcess for groupId={}, streamId={}", groupId,
streamId);
+ log.info("begin to suspend stream process for groupId={} streamId={}",
groupId, streamId);
InlongGroupInfo groupInfo = groupService.get(groupId);
if (groupInfo == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -124,7 +121,7 @@ public class InlongStreamProcessOperation {
&& groupStatus != GroupStatus.RESTARTED
&& groupStatus != GroupStatus.SUSPENDED) {
throw new BusinessException(
- String.format("GroupId=%s, status=%s not correct for
stream suspend", groupId, groupStatus));
+ String.format("group status=%s not support suspend stream
for groupId=%s", groupStatus, groupId));
}
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
if (streamInfo == null) {
@@ -137,20 +134,17 @@ public class InlongStreamProcessOperation {
}
if (status != StreamStatus.CONFIG_SUCCESSFUL && status !=
StreamStatus.RESTARTED) {
throw new BusinessException(
- String.format("GroupId=%s, StreamId=%s, status=%s not
correct for stream suspend", groupId,
- streamId,
- status));
+ String.format("stream status=%s not support suspend stream
for groupId=%s streamId=%s",
+ status, groupId, streamId));
}
StreamResourceProcessForm processForm =
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.SUSPEND);
ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
if (sync) {
- WorkflowResult workflowResult = workflowService.start(processName,
operator,
- processForm);
+ WorkflowResult workflowResult = workflowService.start(processName,
operator, processForm);
ProcessStatus processStatus =
workflowResult.getProcessInfo().getStatus();
return processStatus == ProcessStatus.COMPLETED;
} else {
- executorService.execute(
- () -> workflowService.start(processName, operator,
processForm));
+ executorService.execute(() -> workflowService.start(processName,
operator, processForm));
return true;
}
}
@@ -159,7 +153,7 @@ public class InlongStreamProcessOperation {
* Restart stream in synchronous/asynchronous way.
*/
public boolean restartProcess(String groupId, String streamId, String
operator, boolean sync) {
- log.info("RestartProcess for groupId={}, streamId={}", groupId,
streamId);
+ log.info("begin to restart stream process for groupId={} streamId={}",
groupId, streamId);
InlongGroupInfo groupInfo = groupService.get(groupId);
if (groupInfo == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -168,7 +162,7 @@ public class InlongStreamProcessOperation {
if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
&& groupStatus != GroupStatus.RESTARTED) {
throw new BusinessException(
- String.format("GroupId=%s, status=%s not correct for
stream restart", groupId, groupStatus));
+ String.format("group status=%s not support restart stream
for groupId=%s", groupStatus, groupId));
}
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
if (streamInfo == null) {
@@ -181,20 +175,17 @@ public class InlongStreamProcessOperation {
}
if (status != StreamStatus.SUSPENDED) {
throw new BusinessException(
- String.format("GroupId=%s, StreamId=%s, status=%s not
correct for stream restart", groupId,
- streamId,
- status));
+ String.format("stream status=%s not support restart stream
for groupId=%s streamId=%s",
+ status, groupId, streamId));
}
StreamResourceProcessForm processForm =
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.RESTART);
ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
if (sync) {
- WorkflowResult workflowResult = workflowService.start(processName,
operator,
- processForm);
+ WorkflowResult workflowResult = workflowService.start(processName,
operator, processForm);
ProcessStatus processStatus =
workflowResult.getProcessInfo().getStatus();
return processStatus == ProcessStatus.COMPLETED;
} else {
- executorService.execute(
- () -> workflowService.start(processName, operator,
processForm));
+ executorService.execute(() -> workflowService.start(processName,
operator, processForm));
return true;
}
}
@@ -203,7 +194,7 @@ public class InlongStreamProcessOperation {
* Restart stream in synchronous/asynchronous way.
*/
public boolean deleteProcess(String groupId, String streamId, String
operator, boolean sync) {
- log.info("DeleteProcess for groupId={}, streamId={}", groupId,
streamId);
+ log.info("begin to delete stream process for groupId={} streamId={}",
groupId, streamId);
InlongGroupInfo groupInfo = groupService.get(groupId);
if (groupInfo == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -214,7 +205,7 @@ public class InlongStreamProcessOperation {
&& groupStatus != GroupStatus.SUSPENDED
&& groupStatus != GroupStatus.DELETING) {
throw new BusinessException(
- String.format("GroupId=%s, status=%s not correct for
stream delete", groupId, groupStatus));
+ String.format("group status=%s not support delete stream
for groupId=%s", groupStatus, groupId));
}
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
if (streamInfo == null) {
@@ -229,15 +220,13 @@ public class InlongStreamProcessOperation {
|| status == StreamStatus.RESTARTING
|| status == StreamStatus.SUSPENDING) {
throw new BusinessException(
- String.format("GroupId=%s, StreamId=%s, status=%s not
correct for stream delete", groupId,
- streamId,
- status));
+ String.format("stream status=%s not support delete stream
for groupId=%s streamId=%s",
+ status, groupId, streamId));
}
StreamResourceProcessForm processForm =
genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.DELETE);
ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
if (sync) {
- WorkflowResult workflowResult = workflowService.start(processName,
operator,
- processForm);
+ WorkflowResult workflowResult = workflowService.start(processName,
operator, processForm);
ProcessStatus processStatus =
workflowResult.getProcessInfo().getStatus();
if (processStatus == ProcessStatus.COMPLETED) {
return streamService.delete(groupId, streamId, operator);
@@ -245,14 +234,13 @@ public class InlongStreamProcessOperation {
return false;
}
} else {
- executorService.execute(
- () -> {
- WorkflowResult workflowResult =
workflowService.start(processName, operator, processForm);
- ProcessStatus processStatus =
workflowResult.getProcessInfo().getStatus();
- if (processStatus == ProcessStatus.COMPLETED) {
- streamService.delete(groupId, streamId, operator);
- }
- });
+ executorService.execute(() -> {
+ WorkflowResult workflowResult =
workflowService.start(processName, operator, processForm);
+ ProcessStatus processStatus =
workflowResult.getProcessInfo().getStatus();
+ if (processStatus == ProcessStatus.COMPLETED) {
+ streamService.delete(groupId, streamId, operator);
+ }
+ });
return true;
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
index 4d9b700a7..6bec4feb6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
@@ -99,7 +99,7 @@ public class InlongPulsarOperator extends
AbstractGroupOperator {
InlongPulsarDTO dto =
InlongPulsarDTO.getFromRequest(pulsarRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
LOGGER.info("success set entity for inlong group with Pulsar");
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
index cbcc15f56..43b4ba38e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperator.java
@@ -67,7 +67,7 @@ public class AutoPushSourceOperator extends
AbstractSourceOperator {
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
LOGGER.error("parsing json string to source info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
index 5f2e73feb..c3b324aae 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
@@ -62,7 +62,7 @@ public class BinlogSourceOperator extends
AbstractSourceOperator {
MySQLBinlogSourceDTO dto =
MySQLBinlogSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index ab4baa544..a561844d5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -62,7 +62,7 @@ public class FileSourceOperator extends
AbstractSourceOperator {
FileSourceDTO dto = FileSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 3cb0b2615..28e473163 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -62,7 +62,7 @@ public class KafkaSourceOperator extends
AbstractSourceOperator {
KafkaSourceDTO dto = KafkaSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
index aeba33ede..74c0bc466 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongodb/MongoDBSourceOperator.java
@@ -62,7 +62,7 @@ public class MongoDBSourceOperator extends
AbstractSourceOperator {
MongoDBSourceDTO dto =
MongoDBSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
index f9dd4ee0b..d70b88a12 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperator.java
@@ -62,7 +62,7 @@ public class OracleSourceOperator extends
AbstractSourceOperator {
OracleSourceDTO dto =
OracleSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
index 8688a751d..4aba1e868 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgresql/PostgreSQLSourceOperator.java
@@ -62,7 +62,7 @@ public class PostgreSQLSourceOperator extends
AbstractSourceOperator {
PostgreSQLSourceDTO dto =
PostgreSQLSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 9a02a1399..21bce91bc 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -79,7 +79,7 @@ public class PulsarSourceOperator extends
AbstractSourceOperator {
PulsarSourceDTO dto =
PulsarSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
index 7dacb20b1..9064249d6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sqlserver/SQLServerSourceOperator.java
@@ -62,7 +62,7 @@ public class SQLServerSourceOperator extends
AbstractSourceOperator {
SQLServerSourceDTO dto =
SQLServerSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 6e14acdac..55741fb99 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -75,7 +75,7 @@ public class TubeMQSourceOperator extends
AbstractSourceOperator {
TubeMQSourceDTO dto =
TubeMQSourceDTO.getFromRequest(sourceRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
index bfd67314b..c9053a8ec 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergSinkServiceTest.java
@@ -31,7 +31,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
- * Iceberg sink service test..
+ * Iceberg stream sink service test.
*/
public class IcebergSinkServiceTest extends ServiceBaseTest {