This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ff68b8d0e [INLONG-7220][Manager] Optimize OpenStreamSinkController
implementation (#7224)
ff68b8d0e is described below
commit ff68b8d0ef6f4927bbdeb5e9f5983389c7c59364
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Jan 12 16:01:30 2023 +0800
[INLONG-7220][Manager] Optimize OpenStreamSinkController implementation
(#7224)
---
.../inlong/manager/pojo/sink/SinkRequest.java | 16 +++++---
.../manager/pojo/sink/doris/DorisSinkDTO.java | 3 +-
.../manager/pojo/sink/es/ElasticsearchSinkDTO.java | 3 +-
.../pojo/sink/greenplum/GreenplumSinkDTO.java | 3 +-
.../manager/pojo/sink/hbase/HBaseSinkDTO.java | 3 +-
.../inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java | 3 +-
.../inlong/manager/pojo/sink/hive/HiveSinkDTO.java | 3 +-
.../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 3 +-
.../manager/pojo/sink/iceberg/IcebergSinkDTO.java | 3 +-
.../manager/pojo/sink/kafka/KafkaSinkDTO.java | 3 +-
.../manager/pojo/sink/mysql/MySQLSinkDTO.java | 4 +-
.../manager/pojo/sink/oracle/OracleSinkDTO.java | 4 +-
.../pojo/sink/postgresql/PostgreSQLSinkDTO.java | 3 +-
.../pojo/sink/sqlserver/SQLServerSinkDTO.java | 3 +-
.../pojo/sink/starrocks/StarRocksSinkDTO.java | 7 +++-
.../tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java | 3 +-
.../manager/service/sink/AbstractSinkOperator.java | 31 +++++++++-------
.../service/sink/StreamSinkServiceImpl.java | 43 +++++++++++++++++-----
.../service/sink/ck/ClickHouseSinkOperator.java | 11 +++---
.../service/sink/doris/DorisSinkOperator.java | 18 ++++++---
.../service/sink/es/ElasticsearchSinkOperator.java | 19 +++++-----
.../sink/greenplum/GreenplumSinkOperator.java | 11 +++---
.../service/sink/hbase/HBaseSinkOperator.java | 11 +++---
.../service/sink/hdfs/HDFSSinkOperator.java | 11 +++---
.../service/sink/hive/HiveSinkOperator.java | 17 +++++----
.../service/sink/hudi/HudiSinkOperator.java | 38 ++++++++++---------
.../service/sink/iceberg/IcebergSinkOperator.java | 17 +++++----
.../service/sink/kafka/KafkaSinkOperator.java | 11 +++---
.../service/sink/mysql/MySQLSinkOperator.java | 20 ++++++----
.../service/sink/oracle/OracleSinkOperator.java | 11 +++---
.../sink/postgresql/PostgreSQLSinkOperator.java | 11 +++---
.../postgresql/TDSQLPostgreSQLSinkOperator.java | 11 +++---
.../sink/sqlserver/SQLServerSinkOperator.java | 11 +++---
.../sink/starrocks/StarRocksSinkOperator.java | 35 +++++++++++-------
.../service/stream/InlongStreamProcessService.java | 22 ++++++-----
.../web/controller/StreamSinkController.java | 8 ++--
.../openapi/OpenStreamSinkController.java | 4 +-
37 files changed, 263 insertions(+), 175 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index d8ca5f02d..545563f4b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -22,7 +22,10 @@ import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import org.apache.inlong.manager.common.validation.SaveValidation;
+import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
+import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
import org.hibernate.validator.constraints.Length;
import org.hibernate.validator.constraints.Range;
@@ -40,29 +43,29 @@ import java.util.Map;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property =
"sinkType")
public abstract class SinkRequest {
- @NotNull(groups = UpdateValidation.class)
@ApiModelProperty(value = "Primary key")
+ @NotNull(groups = UpdateByIdValidation.class, message = "id cannot be
null")
private Integer id;
- @NotBlank(message = "inlongGroupId cannot be blank")
@ApiModelProperty("Inlong group id")
+ @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class},
message = "inlongGroupId cannot be blank")
@Length(min = 4, max = 100, message = "length must be between 4 and 100")
@Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports
lowercase letters, numbers, '-', or '_'")
private String inlongGroupId;
- @NotBlank(message = "inlongStreamId cannot be blank")
@ApiModelProperty("Inlong stream id")
+ @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class},
message = "inlongStreamId cannot be blank")
@Length(min = 4, max = 100, message = "inlongStreamId length must be
between 4 and 100")
@Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only
supports lowercase letters, numbers, '-', or '_'")
private String inlongStreamId;
- @NotBlank(message = "sinkType cannot be blank")
@ApiModelProperty("Sink type, including: HIVE, ES, etc.")
+ @NotBlank(message = "sinkType cannot be blank")
@Length(max = 15, message = "length must be less than or equal to 15")
private String sinkType;
- @NotBlank(message = "sinkName cannot be blank")
@ApiModelProperty("Sink name, unique in one stream")
+ @NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class},
message = "sinkName cannot be blank")
@Length(min = 1, max = 100, message = "sinkName length must be between 1
and 100")
@Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sinkName only supports
lowercase letters, numbers, '-', or '_'")
private String sinkName;
@@ -103,6 +106,7 @@ public abstract class SinkRequest {
private Map<String, Object> properties = Maps.newHashMap();
@ApiModelProperty(value = "Version number")
+ @NotNull(groups = {UpdateByIdValidation.class,
UpdateByKeyValidation.class}, message = "version cannot be null")
private Integer version;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
index 60dfb1a7b..b8a379230 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSinkDTO.java
@@ -106,7 +106,8 @@ public class DorisSinkDTO {
try {
return JsonUtils.parseObject(extParams,
DorisSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Doris SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index 0828b119f..fb7297319 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -103,7 +103,8 @@ public class ElasticsearchSinkDTO {
try {
return JsonUtils.parseObject(extParams,
ElasticsearchSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Elasticsearch SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
index d46970b64..fa178134d 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -78,7 +78,8 @@ public class GreenplumSinkDTO {
try {
return JsonUtils.parseObject(extParams, GreenplumSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Greenplum SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
index bbdd924e3..3a417a397 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hbase/HBaseSinkDTO.java
@@ -90,7 +90,8 @@ public class HBaseSinkDTO {
try {
return JsonUtils.parseObject(extParams, HBaseSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of HBase SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
index 25b604bd8..ba7c05e37 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hdfs/HDFSSinkDTO.java
@@ -82,7 +82,8 @@ public class HDFSSinkDTO {
try {
return JsonUtils.parseObject(extParams, HDFSSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of HDFS SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
index 4111cbf88..b9dc4197b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hive/HiveSinkDTO.java
@@ -128,7 +128,8 @@ public class HiveSinkDTO {
try {
return JsonUtils.parseObject(extParams,
HiveSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Hive SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
index 32b214060..f42bad925 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -99,7 +99,8 @@ public class HudiSinkDTO {
try {
return JsonUtils.parseObject(extParams, HudiSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Hudi SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
index 800da63b2..14acc52b8 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -91,7 +91,8 @@ public class IcebergSinkDTO {
try {
return JsonUtils.parseObject(extParams, IcebergSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Iceberg SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
index 5eebb5b69..cd16b98a7 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/kafka/KafkaSinkDTO.java
@@ -78,7 +78,8 @@ public class KafkaSinkDTO {
try {
return JsonUtils.parseObject(extParams, KafkaSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Kafka SinkDTO failure:
%s", e.getMessage()));
}
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
index ca6752a6a..443e8f9c6 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTO.java
@@ -108,8 +108,8 @@ public class MySQLSinkDTO {
try {
return JsonUtils.parseObject(extParams, MySQLSinkDTO.class);
} catch (Exception e) {
- LOGGER.error("fetch mysql sink info failed from json params: " +
extParams, e);
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of MySQL SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
index 553a534ce..1016b88e0 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oracle/OracleSinkDTO.java
@@ -83,8 +83,8 @@ public class OracleSinkDTO {
try {
return JsonUtils.parseObject(extParams, OracleSinkDTO.class);
} catch (Exception e) {
- LOGGER.error("fetch oracle sink info failed from json params: " +
extParams, e);
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of Oracle SinkDTO failure:
%s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
index 7fd5b45d9..b66a38d0e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/postgresql/PostgreSQLSinkDTO.java
@@ -98,7 +98,8 @@ public class PostgreSQLSinkDTO {
try {
return JsonUtils.parseObject(extParams,
PostgreSQLSinkDTO.class).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of PostgreSQL SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
index 00c9b9ca3..b1a99ca1a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/sqlserver/SQLServerSinkDTO.java
@@ -85,7 +85,8 @@ public class SQLServerSinkDTO {
try {
return JsonUtils.parseObject(extParams, SQLServerSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of SQLServer SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
index 9b627c400..74c3e9990 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
@@ -32,6 +32,7 @@ import javax.validation.constraints.NotNull;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* Sink info of StarRocks
@@ -121,9 +122,11 @@ public class StarRocksSinkDTO {
public static StarRocksSinkDTO getFromJson(@NotNull String extParams) {
try {
- return JsonUtils.parseObject(extParams,
StarRocksSinkDTO.class).decryptPassword();
+ return Objects.requireNonNull(JsonUtils.parseObject(
+ extParams, StarRocksSinkDTO.class)).decryptPassword();
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of StarRocks SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
index 569606737..b46988514 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/tdsqlpostgresql/TDSQLPostgreSQLSinkDTO.java
@@ -81,7 +81,8 @@ public class TDSQLPostgreSQLSinkDTO {
try {
return JsonUtils.parseObject(extParams,
TDSQLPostgreSQLSinkDTO.class);
} catch (Exception e) {
- throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ String.format("parse extParams of TDSQLPostgreSQL SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index c1331bec1..f47db9c67 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -26,7 +26,6 @@ import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
@@ -116,13 +115,14 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
@Override
public void updateOpt(SinkRequest request, SinkStatus nextStatus, String
operator) {
StreamSinkEntity entity =
sinkMapper.selectByPrimaryKey(request.getId());
- Preconditions.checkNotNull(entity,
ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
-
- String errMsg = String.format("sink has already updated with
groupId=%s, streamId=%s, name=%s, curVersion=%s",
- request.getInlongGroupId(), request.getInlongStreamId(),
request.getSinkName(), request.getVersion());
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+ }
if (!Objects.equals(entity.getVersion(), request.getVersion())) {
- LOGGER.error(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("sink has already updated with groupId=%s,
streamId=%s, name=%s, curVersion=%s",
+ request.getInlongGroupId(),
request.getInlongStreamId(), request.getSinkName(),
+ request.getVersion()));
}
CommonBeanUtils.copyProperties(request, entity, true);
setTargetEntity(request, entity);
@@ -133,8 +133,10 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
entity.setModifier(operator);
int rowCount = sinkMapper.updateByIdSelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("sink has already updated with groupId=%s,
streamId=%s, name=%s, curVersion=%s",
+ request.getInlongGroupId(),
request.getInlongStreamId(), request.getSinkName(),
+ request.getVersion()));
}
boolean onlyAdd =
SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
@@ -171,7 +173,7 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
@Override
public void saveFieldOpt(SinkRequest request) {
List<SinkField> fieldList = request.getSinkFieldList();
- LOGGER.info("begin to save sink fields={}", fieldList);
+ LOGGER.debug("begin to save sink fields={}", fieldList);
if (CollectionUtils.isEmpty(fieldList)) {
return;
}
@@ -197,7 +199,7 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
}
sinkFieldMapper.insertAll(entityList);
- LOGGER.info("success to save sink fields");
+ LOGGER.debug("success to save sink fields");
}
@Override
@@ -208,9 +210,10 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
entity.setModifier(operator);
int rowCount = sinkMapper.updateByIdSelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error("sink has already updated with groupId={},
streamId={}, name={}, curVersion={}",
- entity.getInlongGroupId(), entity.getInlongStreamId(),
entity.getSinkName(), entity.getVersion());
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("sink has already updated with groupId=%s,
streamId=%s, name=%s, curVersion=%s",
+ entity.getInlongGroupId(),
entity.getInlongStreamId(), entity.getSinkName(),
+ entity.getVersion()));
}
sinkFieldMapper.logicDeleteAll(entity.getId());
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 7cf87e495..fa1afaa0a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -68,6 +68,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -413,8 +414,6 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean update(SinkRequest request, UserInfo opInfo) {
- // check request parameter
- checkSinkRequestParams(request);
if (request.getId() == null) {
throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
}
@@ -422,21 +421,43 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
if (opInfo == null) {
throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
}
+ StreamSinkEntity curEntity =
sinkMapper.selectByPrimaryKey(request.getId());
+ if (curEntity == null) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+ }
+ if (!Objects.equals(curEntity.getVersion(), request.getVersion())) {
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("sink has already updated with groupId=%s,
streamId=%s, name=%s, curVersion=%s",
+ curEntity.getInlongGroupId(),
curEntity.getInlongStreamId(), curEntity.getSinkName(),
+ curEntity.getVersion()));
+ }
+ if (StringUtils.isNotBlank(request.getInlongGroupId())
+ &&
!curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "InlongGroupId not allowed modify");
+ }
+ if (StringUtils.isNotBlank(request.getInlongStreamId())
+ &&
!curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "InlongStreamId not allowed modify");
+ }
+ request.setInlongGroupId(curEntity.getInlongGroupId());
+ request.setInlongStreamId(curEntity.getInlongStreamId());
// check group record
- InlongGroupEntity entity =
groupMapper.selectByGroupId(request.getInlongGroupId());
- if (entity == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
- String.format("InlongGroup does not exist with
InlongGroupId=%s", request.getInlongGroupId()));
+ InlongGroupEntity curGroupEntity =
groupMapper.selectByGroupId(curEntity.getInlongGroupId());
+ if (curGroupEntity == null) {
+ throw new
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
+ String.format("InlongGroup does not exist with
InlongGroupId=%s", curEntity.getInlongGroupId()));
}
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
- List<String> inCharges =
Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
+ List<String> inCharges =
Arrays.asList(curGroupEntity.getInCharges().split(InlongConstants.COMMA));
if (!inCharges.contains(opInfo.getName())) {
throw new
BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED);
}
}
// Check if group status can be modified
- GroupStatus curState = GroupStatus.forCode(entity.getStatus());
+ GroupStatus curState = GroupStatus.forCode(curEntity.getStatus());
if (GroupStatus.notAllowedUpdate(curState)) {
throw new
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
curState));
}
@@ -444,7 +465,9 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
request.getInlongGroupId(), request.getInlongStreamId());
if (streamEntity == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
+ throw new
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
+ String.format("stream record not found with the groupId=%s
streamId=%s",
+ curEntity.getInlongGroupId(),
curEntity.getInlongStreamId()));
}
// Check whether the sink name exists with the same groupId and
streamId
StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(
@@ -762,6 +785,6 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
}
streamProcessOperation.deleteProcess(groupId, streamId, operator,
false);
- LOGGER.info("success to start the delete-stream-process for groupId={}
streamId={}", groupId, streamId);
+ LOGGER.debug("success to start the delete-stream-process for
groupId={} streamId={}", groupId, streamId);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
index 3e405bd6d..6a7e80410 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -64,15 +63,17 @@ public class ClickHouseSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
ClickHouseSinkRequest sinkRequest = (ClickHouseSinkRequest) request;
try {
ClickHouseSinkDTO dto =
ClickHouseSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of ClickHouse SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
index 0bcb38cab..30a268b22 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/doris/DorisSinkOperator.java
@@ -20,11 +20,12 @@ package org.apache.inlong.manager.service.sink.doris;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -61,15 +62,17 @@ public class DorisSinkOperator extends AbstractSinkOperator
{
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
DorisSinkRequest sinkRequest = (DorisSinkRequest) request;
try {
DorisSinkDTO dto = DorisSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Doris SinkDTO
failure: %s", e.getMessage()));
}
}
@@ -81,7 +84,10 @@ public class DorisSinkOperator extends AbstractSinkOperator {
}
DorisSinkDTO dto = DorisSinkDTO.getFromJson(entity.getExtParams());
- Preconditions.checkNotEmpty(dto.getFeNodes(), "doris fe nodes is
empty");
+ if (StringUtils.isBlank(dto.getFeNodes())) {
+ throw new
BusinessException(ErrorCodeEnum.ILLEGAL_RECORD_FIELD_VALUE,
+ "doris fe nodes is blank");
+ }
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 19196a4dd..1516a456e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.AESUtils;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
@@ -73,8 +72,10 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
ElasticsearchSinkRequest sinkRequest = (ElasticsearchSinkRequest)
request;
try {
ElasticsearchSinkDTO dto =
ElasticsearchSinkDTO.getFromRequest(sinkRequest);
@@ -95,8 +96,8 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
dto.setEncryptVersion(encryptVersion);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Elasticsearch
SinkDTO failure: %s", e.getMessage()));
}
}
@@ -129,7 +130,7 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
@Override
public void saveFieldOpt(SinkRequest request) {
List<SinkField> fieldList = request.getSinkFieldList();
- LOGGER.info("begin to save es sink fields={}", fieldList);
+ LOGGER.debug("begin to save es sink fields={}", fieldList);
if (CollectionUtils.isEmpty(fieldList)) {
return;
}
@@ -150,8 +151,8 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
ElasticsearchFieldInfo dto =
ElasticsearchFieldInfo.getFromRequest(fieldInfo);
fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink field info failed",
e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Elasticsearch
FieldInfo failure: %s", e.getMessage()));
}
fieldEntity.setInlongGroupId(groupId);
fieldEntity.setInlongStreamId(streamId);
@@ -162,7 +163,7 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
}
sinkFieldMapper.insertAll(entityList);
- LOGGER.info("success to save es sink fields");
+ LOGGER.debug("success to save es sink fields");
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
index a7d0c7d07..a9a7fdb12 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/greenplum/GreenplumSinkOperator.java
@@ -28,7 +28,6 @@ import
org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkDTO;
import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class GreenplumSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
GreenplumSinkRequest sinkRequest = (GreenplumSinkRequest) request;
try {
GreenplumSinkDTO dto =
GreenplumSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Greenplum SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
index 6801abce5..917ea3f5d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hbase/HBaseSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
import org.apache.inlong.manager.pojo.sink.hbase.HBaseSinkDTO;
import org.apache.inlong.manager.pojo.sink.hbase.HBaseSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class HBaseSinkOperator extends AbstractSinkOperator
{
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
HBaseSinkRequest sinkRequest = (HBaseSinkRequest) request;
try {
HBaseSinkDTO dto = HBaseSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of HBase SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
index 245709ac6..576239670 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hdfs/HDFSSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSinkDTO;
import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class HDFSSinkOperator extends AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
HDFSSinkRequest sinkRequest = (HDFSSinkRequest) request;
try {
HDFSSinkDTO dto = HDFSSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of HDFS SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
index 67b9e133a..4ffc02163 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
import org.apache.inlong.manager.pojo.sink.hive.HiveSinkDTO;
import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -63,15 +62,17 @@ public class HiveSinkOperator extends AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
HiveSinkRequest sinkRequest = (HiveSinkRequest) request;
try {
HiveSinkDTO dto = HiveSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Hive SinkDTO
failure: %s", e.getMessage()));
}
}
@@ -84,8 +85,10 @@ public class HiveSinkOperator extends AbstractSinkOperator {
HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getJdbcUrl())) {
- Preconditions.checkNotEmpty(entity.getDataNodeName(),
- "hive jdbc url unspecified and data node is empty");
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "hive jdbc url unspecified and data node is blank");
+ }
HiveDataNodeInfo dataNodeInfo = (HiveDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), entity.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index 734697296..e5613d4c9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -17,8 +17,6 @@
package org.apache.inlong.manager.service.sink.hudi;
-import static com.google.common.base.Preconditions.checkState;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Set;
@@ -29,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -72,8 +69,10 @@ public class HudiSinkOperator extends AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
HudiSinkRequest sinkRequest = (HudiSinkRequest) request;
String partitionKey = sinkRequest.getPartitionKey();
@@ -84,17 +83,18 @@ public class HudiSinkOperator extends AbstractSinkOperator {
Set<String> fieldNames =
sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
.collect(Collectors.toSet());
if (primaryKeyExist) {
- checkState(
- fieldNames.contains(partitionKey),
- "The partitionKey({}) must be included in the
sinkFieldList({})",
- partitionKey, fieldNames);
+ if (!fieldNames.contains(partitionKey)) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("The partitionKey(%s) must be
included in the sinkFieldList(%s)",
+ partitionKey, fieldNames));
+ }
}
if (partitionKeyExist) {
- checkState(
- fieldNames.contains(partitionKey),
- "The primaryKey({}) must be included in the
sinkFieldList({})",
- primaryKey,
- fieldNames);
+ if (!fieldNames.contains(primaryKey)) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("The primaryKey(%s) must be included
in the sinkFieldList(%s)",
+ primaryKey, fieldNames));
+ }
}
}
@@ -102,8 +102,8 @@ public class HudiSinkOperator extends AbstractSinkOperator {
HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Hudi SinkDTO
failure: %s", e.getMessage()));
}
}
@@ -116,8 +116,10 @@ public class HudiSinkOperator extends AbstractSinkOperator
{
HudiSinkDTO dto = HudiSinkDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getCatalogUri()) &&
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
- Preconditions.checkNotEmpty(entity.getDataNodeName(),
- "hudi catalog uri unspecified and data node is empty");
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "hudi catalog uri unspecified and data node is blank");
+ }
HudiDataNodeInfo dataNodeInfo = (HudiDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), entity.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index e8da4f8a4..f778df3b9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -32,7 +32,6 @@ import
org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkDTO;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -67,15 +66,17 @@ public class IcebergSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
IcebergSinkRequest sinkRequest = (IcebergSinkRequest) request;
try {
IcebergSinkDTO dto = IcebergSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Iceberg SinkDTO
failure: %s", e.getMessage()));
}
}
@@ -88,8 +89,10 @@ public class IcebergSinkOperator extends
AbstractSinkOperator {
IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getCatalogUri()) &&
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
- Preconditions.checkNotEmpty(entity.getDataNodeName(),
- "iceberg catalog uri unspecified and data node is empty");
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "iceberg catalog uri unspecified and data node is
blank");
+ }
IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), entity.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index 91b35c53a..bf6638123 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class KafkaSinkOperator extends AbstractSinkOperator
{
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
KafkaSinkRequest sinkRequest = (KafkaSinkRequest) request;
try {
KafkaSinkDTO dto = KafkaSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Kafka SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index a81b9ff34..2d5a07eb9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -63,15 +62,17 @@ public class MySQLSinkOperator extends AbstractSinkOperator
{
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
MySQLSinkRequest sinkRequest = (MySQLSinkRequest) request;
try {
MySQLSinkDTO dto = MySQLSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of MySQL SinkDTO
failure: %s", e.getMessage()));
}
}
@@ -84,9 +85,12 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getJdbcUrl())) {
- String dataNodeName = entity.getDataNodeName();
- Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not
specified and data node is empty");
- DataNodeInfo dataNodeInfo =
dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "mysql jdbc url not specified and data node is blank");
+ }
+ DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setJdbcUrl(dataNodeInfo.getUrl());
dto.setPassword(dataNodeInfo.getToken());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
index a24f8a81c..561404b4a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oracle/OracleSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
import org.apache.inlong.manager.pojo.sink.oracle.OracleSinkDTO;
import org.apache.inlong.manager.pojo.sink.oracle.OracleSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class OracleSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
OracleSinkRequest sinkRequest = (OracleSinkRequest) request;
try {
OracleSinkDTO dto = OracleSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of Oracle SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
index 256d61ed0..7f4c6f8d5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/PostgreSQLSinkOperator.java
@@ -28,7 +28,6 @@ import
org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSinkDTO;
import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class PostgreSQLSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
PostgreSQLSinkRequest sinkRequest = (PostgreSQLSinkRequest) request;
try {
PostgreSQLSinkDTO dto =
PostgreSQLSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of PostgreSQL SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
index d76edc151..1cd089a26 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgresql/TDSQLPostgreSQLSinkOperator.java
@@ -28,7 +28,6 @@ import
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
import
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSinkDTO;
import
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
@@ -64,15 +63,17 @@ public class TDSQLPostgreSQLSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
- Preconditions.checkTrue(getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
TDSQLPostgreSQLSinkRequest sinkRequest = (TDSQLPostgreSQLSinkRequest)
request;
try {
TDSQLPostgreSQLSinkDTO dto =
TDSQLPostgreSQLSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of TDSQLPostgreSQL
SinkDTO failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
index 257909b57..91e6e0e32 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/sqlserver/SQLServerSinkOperator.java
@@ -28,7 +28,6 @@ import
org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSinkDTO;
import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSinkRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import org.slf4j.Logger;
@@ -61,15 +60,17 @@ public class SQLServerSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
- Preconditions.checkTrue(getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
SQLServerSinkRequest sinkRequest = (SQLServerSinkRequest) request;
try {
SQLServerSinkDTO dto =
SQLServerSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of SQLServer SinkDTO
failure: %s", e.getMessage()));
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index 5b690fc19..a86f35c76 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -25,7 +25,6 @@ import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.node.starrocks.StarRocksDataNodeInfo;
@@ -69,15 +68,17 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
@Override
protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
-
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
- ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ if (!this.getSinkType().equals(request.getSinkType())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ }
StarRocksSinkRequest sinkRequest = (StarRocksSinkRequest) request;
try {
StarRocksSinkDTO dto =
StarRocksSinkDTO.getFromRequest(sinkRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink info failed", e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of StarRocks SinkDTO
failure: %s", e.getMessage()));
}
}
@@ -90,16 +91,24 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
StarRocksSinkDTO dto =
StarRocksSinkDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getJdbcUrl())) {
- Preconditions.checkNotEmpty(entity.getDataNodeName(),
- "starRocks jdbc url unspecified and data node is empty");
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "starRocks jdbc url unspecified and data node is
blank");
+ }
StarRocksDataNodeInfo dataNodeInfo = (StarRocksDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), entity.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setJdbcUrl(dataNodeInfo.getUrl());
dto.setPassword(dataNodeInfo.getToken());
}
- Preconditions.checkNotEmpty(dto.getLoadUrl(), "StarRocks load url is
empty");
- Preconditions.checkNotEmpty(dto.getJdbcUrl(), "StarRocks jdbc url is
empty");
+ if (StringUtils.isBlank(dto.getLoadUrl())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "StarRocks load url is blank");
+ }
+ if (StringUtils.isBlank(dto.getJdbcUrl())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+ "StarRocks jdbc url is blank");
+ }
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
@@ -110,7 +119,7 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
@Override
public void saveFieldOpt(SinkRequest request) {
List<SinkField> fieldList = request.getSinkFieldList();
- LOGGER.info("begin to save es sink fields={}", fieldList);
+ LOGGER.debug("begin to save es sink fields={}", fieldList);
if (CollectionUtils.isEmpty(fieldList)) {
return;
}
@@ -131,8 +140,8 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
StarRocksColumnInfo dto =
StarRocksColumnInfo.getFromRequest(fieldInfo);
fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
- LOGGER.error("parsing json string to sink field info failed",
e);
- throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+ String.format("serialize extParams of StarRocks
ColumnInfo failure: %s", e.getMessage()));
}
fieldEntity.setInlongGroupId(groupId);
fieldEntity.setInlongStreamId(streamId);
@@ -143,7 +152,7 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
}
sinkFieldMapper.insertAll(entityList);
- LOGGER.info("success to save starRock sink fields");
+ LOGGER.debug("success to save starRock sink fields");
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index abc8d79de..934eb7404 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -196,31 +196,35 @@ public class InlongStreamProcessService {
* Restart stream in synchronous/asynchronous way.
*/
public boolean deleteProcess(String groupId, String streamId, String
operator, boolean sync) {
- log.info("begin to delete stream process for groupId={} streamId={}",
groupId, streamId);
+ log.debug("begin to delete stream process for groupId={} streamId={}",
groupId, streamId);
InlongGroupInfo groupInfo = groupService.get(groupId);
- Preconditions.checkNotNull(groupInfo,
ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+ if (groupInfo == null) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
+ ErrorCodeEnum.GROUP_NOT_FOUND.getMessage() + " : " +
groupId);
+ }
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
if (GroupStatus.notAllowedTransition(groupStatus,
GroupStatus.DELETING)) {
- throw new BusinessException(String.format("group status=%s not
support delete stream"
- + " for groupId=%s", groupStatus, groupId));
+ throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED,
+ String.format("group status=%s not support delete stream
for groupId=%s", groupStatus, groupId));
}
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
Preconditions.checkNotNull(streamInfo,
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
if (status == StreamStatus.DELETED || status == StreamStatus.DELETING)
{
- log.warn("groupId={}, streamId={} is already in {}", groupId,
streamId, status);
+ log.debug("groupId={}, streamId={} is already in {}", groupId,
streamId, status);
return true;
}
if (StreamStatus.notAllowedDelete(status)) {
- throw new BusinessException(String.format("stream status=%s not
support delete stream"
- + " for groupId=%s streamId=%s", status, groupId,
streamId));
+ throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED,
+ String.format("stream status=%s not support delete stream
for groupId=%s streamId=%s", status,
+ groupId, streamId));
}
- StreamResourceProcessForm processForm =
StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
- GroupOperateType.DELETE);
+ StreamResourceProcessForm processForm =
+ StreamResourceProcessForm.getProcessForm(groupInfo,
streamInfo, GroupOperateType.DELETE);
ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
if (sync) {
WorkflowResult workflowResult = workflowService.start(processName,
operator, processForm);
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 40d202479..949e943ba 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -22,7 +22,8 @@ import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
+import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -75,14 +76,15 @@ public class StreamSinkController {
@RequestMapping(value = "/sink/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update stream sink")
- public Response<Boolean> update(@Validated(UpdateValidation.class)
@RequestBody SinkRequest request) {
+ public Response<Boolean> update(@Validated(UpdateByIdValidation.class)
@RequestBody SinkRequest request) {
return Response.success(sinkService.update(request,
LoginUserUtils.getLoginUser().getName()));
}
@RequestMapping(value = "/sink/updateByKey", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update stream sink by key")
- public Response<UpdateResult> updateByKey(@RequestBody SinkRequest
request) {
+ public Response<UpdateResult> updateByKey(
+ @Validated(UpdateByKeyValidation.class) @RequestBody SinkRequest
request) {
return Response.success(sinkService.updateByKey(request,
LoginUserUtils.getLoginUser().getName()));
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index b12c47be3..c6048694c 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.web.controller.openapi;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -76,7 +76,7 @@ public class OpenStreamSinkController {
@RequestMapping(value = "/sink/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update stream sink")
- public Response<Boolean> update(@Validated(UpdateValidation.class)
@RequestBody SinkRequest request) {
+ public Response<Boolean> update(@Validated(UpdateByIdValidation.class)
@RequestBody SinkRequest request) {
return Response.success(sinkService.update(request,
LoginUserUtils.getLoginUser()));
}