This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6b9da03b43 [INLONG-9523][Manager] Fix the problem of sink remains in
configuration after standalone cluster allocation failure (#9553)
6b9da03b43 is described below
commit 6b9da03b43f83db56cefe28b679aaa04d6a755cf
Author: fuweng11 <[email protected]>
AuthorDate: Tue Jan 9 14:06:43 2024 +0800
[INLONG-9523][Manager] Fix the problem of sink remains in configuration
after standalone cluster allocation failure (#9553)
---
.../main/resources/mappers/AuditEntityMapper.xml | 2 +
.../AbstractStandaloneSinkResourceOperator.java | 46 +++++++++++++---------
.../service/sink/ck/ClickHouseSinkOperator.java | 1 +
.../service/sink/es/ElasticsearchSinkOperator.java | 1 +
.../service/sink/iceberg/IcebergSinkOperator.java | 1 +
.../service/sink/kudu/KuduSinkOperator.java | 1 +
.../sink/starrocks/StarRocksSinkOperator.java | 1 +
7 files changed, 34 insertions(+), 19 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index 63d6af6c73..f5cce709e3 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -41,6 +41,7 @@
<result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
+ <result column="total_size" property="totalSize" jdbcType="BIGINT"/>
</resultMap>
<resultMap id="SumGroupByIdResultMap" type="java.util.Map">
@@ -50,6 +51,7 @@
<result column="ip" property="ip" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
+ <result column="total_size" property="totalSize" jdbcType="BIGINT"/>
</resultMap>
<select id="sumByLogTs" resultMap="SumByLogTsResultMap">
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
index 79893ad9c0..75174f120a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.resource.sink;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -55,26 +56,33 @@ public abstract class
AbstractStandaloneSinkResourceOperator implements SinkReso
@VisibleForTesting
protected void assignCluster(SinkInfo sinkInfo) {
- if (StringUtils.isBlank(sinkInfo.getSinkType())) {
- throw new
IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
+ try {
+ if (StringUtils.isBlank(sinkInfo.getSinkType())) {
+ throw new
IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
+ }
+
+ if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
+ String info = "no need to auto-assign cluster since the
cluster has already assigned";
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ return;
+ }
+
+ String targetCluster = assignOneCluster(sinkInfo);
+ Preconditions.expectNotBlank(targetCluster,
+ String.format("find no proper cluster assign to group=%s,
stream=%s, sink type=%s, data node=%s ",
+ sinkInfo.getInlongGroupId(),
sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
+ sinkInfo.getDataNodeName()));
+
+ StreamSinkEntity sink =
sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
+ sink.setInlongClusterName(targetCluster);
+ sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode());
+ sinkEntityMapper.updateByIdSelective(sink);
+ } catch (Throwable e) {
+ String errMsg = "assign standalone cluster failed: " +
e.getMessage();
+ log.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new WorkflowException(errMsg);
}
-
- if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
- String info = "no need to auto-assign cluster since the cluster
has already assigned";
- sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
- return;
- }
-
- String targetCluster = assignOneCluster(sinkInfo);
- Preconditions.expectNotBlank(targetCluster,
- String.format("find no proper cluster assign to group=%s,
stream=%s, sink type=%s, data node=%s ",
- sinkInfo.getInlongGroupId(),
sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
- sinkInfo.getDataNodeName()));
-
- StreamSinkEntity sink =
sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
- sink.setInlongClusterName(targetCluster);
- sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode());
- sinkEntityMapper.updateByIdSelective(sink);
}
private String assignOneCluster(SinkInfo sinkInfo) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
index 52618005d7..fa8759d7b6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
@@ -126,6 +126,7 @@ public class ClickHouseSinkOperator extends
AbstractSinkOperator {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
+ fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index 7b2109c352..37b9202e8d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -139,6 +139,7 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
+ fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index 4001b79981..9ee5fb6d3a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -147,6 +147,7 @@ public class IcebergSinkOperator extends
AbstractSinkOperator {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
+ fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
index c547930a6d..949f864a50 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kudu/KuduSinkOperator.java
@@ -119,6 +119,7 @@ public class KuduSinkOperator extends AbstractSinkOperator {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
+ fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index 29accea5e7..a3cc57a091 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -134,6 +134,7 @@ public class StarRocksSinkOperator extends
AbstractSinkOperator {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
+ fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());