This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b020fd2f65 [INLONG-9280][Manager] Support different size of extended
fields of InlongStream (#9283)
b020fd2f65 is described below
commit b020fd2f657f850143c2c8af59cad9be57ecc0a9
Author: vernedeng <[email protected]>
AuthorDate: Wed Nov 15 10:46:21 2023 +0800
[INLONG-9280][Manager] Support different size of extended fields of
InlongStream (#9283)
---
.../inlong/manager/pojo/sink/cls/ClsSinkDTO.java | 9 +++++++++
.../manager/pojo/sink/es/ElasticsearchSinkDTO.java | 2 +-
.../pojo/sink/es/ElasticsearchSinkRequest.java | 9 ---------
.../manager/pojo/stream/InlongStreamExtParam.java | 3 +++
.../manager/pojo/stream/InlongStreamInfo.java | 3 +++
.../manager/pojo/stream/InlongStreamRequest.java | 3 +++
.../service/core/impl/SortClusterServiceImpl.java | 15 ---------------
.../manager/service/sink/cls/ClsSinkOperator.java | 21 ++++++++++++++++-----
.../service/sink/es/ElasticsearchSinkOperator.java | 13 ++++++++++++-
9 files changed, 47 insertions(+), 31 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
index f76f5bfb01..87508b5e1c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/cls/ClsSinkDTO.java
@@ -55,6 +55,15 @@ public class ClsSinkDTO {
@ApiModelProperty("Cloud log service index tokenizer")
private String tokenizer;
+ @ApiModelProperty("contentOffset")
+ private Integer contentOffset = 0;
+
+ @ApiModelProperty("fieldOffset")
+ private Integer fieldOffset;
+
+ @ApiModelProperty("separator")
+ private String separator;
+
/**
* Get the dto instance from the request
*/
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
index 35565f2cd5..4045231768 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkDTO.java
@@ -44,7 +44,7 @@ public class ElasticsearchSinkDTO {
private String indexNamePattern;
@ApiModelProperty("contentOffset")
- private Integer contentOffset;
+ private Integer contentOffset = 0;
@ApiModelProperty("fieldOffset")
private Integer fieldOffset;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
index 182308fe0a..0f8d756c5a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/es/ElasticsearchSinkRequest.java
@@ -40,13 +40,4 @@ public class ElasticsearchSinkRequest extends SinkRequest {
@ApiModelProperty("indexNamePattern")
private String indexNamePattern;
- @ApiModelProperty("contentOffset")
- private Integer contentOffset;
-
- @ApiModelProperty("fieldOffset")
- private Integer fieldOffset;
-
- @ApiModelProperty("separator")
- private String separator;
-
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index ad69b997c9..2690576aad 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -51,6 +51,9 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "Predefined fields")
private String predefinedFields;
+ @ApiModelProperty(value = "Extended field size")
+ private Integer extendedFieldSize = 0;
+
/**
* Pack extended attributes into ExtParams
*
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
index f1c305c475..06a441336f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamInfo.java
@@ -136,6 +136,9 @@ public class InlongStreamInfo extends BaseInlongStream {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
+ @ApiModelProperty(value = "Extended field size")
+ private Integer extendedFieldSize = 0;
+
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError = true;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index 4ad91f17af..bb6a0d2964 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -124,6 +124,9 @@ public class InlongStreamRequest extends BaseInlongStream {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
+ @ApiModelProperty(value = "Extended field size")
+ private Integer extendedFieldSize = 0;
+
@ApiModelProperty(value = "The message body wrap type, including: RAW,
INLONG_MSG_V0, INLONG_MSG_V1, PB, etc")
private String wrapType;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index 170b295249..24ee49f757 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -20,14 +20,12 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.node.DataNodeOperator;
@@ -37,7 +35,6 @@ import
org.apache.inlong.manager.service.sink.StreamSinkOperator;
import com.google.gson.Gson;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -281,7 +278,6 @@ public class SortClusterServiceImpl implements
SortClusterService {
StreamSinkOperator operator =
sinkOperatorFactory.getInstance(streamSink.getSinkType());
List<String> fields = fieldMap.get(streamSink.getId());
Map<String, String> params =
operator.parse2IdParams(streamSink, fields, dataNodeInfo);
- setFiledOffset(streamSink, params);
return params;
} catch (Exception e) {
LOGGER.error("fail to parse id params of groupId={},
streamId={} name={}, type={}}",
@@ -294,17 +290,6 @@ public class SortClusterServiceImpl implements
SortClusterService {
.collect(Collectors.toList());
}
- private void setFiledOffset(StreamSinkEntity streamSink, Map<String,
String> params) {
-
- SortSourceStreamInfo sortSourceStreamInfo =
allStreams.get(streamSink.getInlongGroupId())
- .get(streamSink.getInlongStreamId());
- InlongStreamExtParam inlongStreamExtParam = JsonUtils.parseObject(
- sortSourceStreamInfo.getExtParams(),
InlongStreamExtParam.class);
- if (ObjectUtils.anyNotNull(inlongStreamExtParam) &&
!inlongStreamExtParam.getUseExtendedFields()) {
- params.put(FILED_OFFSET, String.valueOf(0));
- }
- }
-
private Map<String, String> parseSinkParams(DataNodeInfo nodeInfo) {
DataNodeOperator operator =
dataNodeOperatorFactory.getInstance(nodeInfo.getType());
return operator.parse2SinkParams(nodeInfo);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
index a2ec9e1958..cc0dafc91c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/cls/ClsSinkOperator.java
@@ -25,6 +25,7 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
@@ -36,6 +37,7 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.cls.ClsSink;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkDTO;
import org.apache.inlong.manager.pojo.sink.cls.ClsSinkRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -74,6 +76,15 @@ public class ClsSinkOperator extends AbstractSinkOperator {
ClsSinkRequest sinkRequest = (ClsSinkRequest) request;
try {
ClsSinkDTO dto = ClsSinkDTO.getFromRequest(sinkRequest,
targetEntity.getExtParams());
+
+ InlongStreamEntity stream = inlongStreamEntityMapper
+ .selectByIdentifier(request.getInlongGroupId(),
request.getInlongStreamId());
+ dto.setSeparator(String.valueOf((char)
(Integer.parseInt(stream.getDataSeparator()))));
+
+ InlongStreamExtParam streamExt =
+ JsonUtils.parseObject(stream.getExtParams(),
InlongStreamExtParam.class);
+ dto.setFieldOffset(streamExt.getExtendedFieldSize());
+
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
@@ -116,16 +127,16 @@ public class ClsSinkOperator extends AbstractSinkOperator
{
DataNodeInfo dataNodeInfo) {
Map<String, String> params = super.parse2IdParams(streamSink, fields,
dataNodeInfo);
ClsSinkDTO clsSinkDTO =
JsonUtils.parseObject(streamSink.getExtParams(), ClsSinkDTO.class);
- params.put(TOPIC_ID, clsSinkDTO.getTopicId());
+ params.computeIfAbsent(TOPIC_ID, k -> clsSinkDTO.getTopicId());
ClsDataNodeInfo clsDataNodeInfo = (ClsDataNodeInfo) dataNodeInfo;
- params.put(SECRET_ID, clsDataNodeInfo.getSendSecretId());
- params.put(SECRET_KEY, clsDataNodeInfo.getSendSecretKey());
- params.put(END_POINT, clsDataNodeInfo.getEndpoint());
+ params.computeIfAbsent(SECRET_ID, k ->
clsDataNodeInfo.getSendSecretId());
+ params.computeIfAbsent(SECRET_KEY, k ->
clsDataNodeInfo.getSendSecretKey());
+ params.computeIfAbsent(END_POINT, k -> clsDataNodeInfo.getEndpoint());
StringBuilder fieldNames = new StringBuilder();
for (String field : fields) {
fieldNames.append(field).append(InlongConstants.BLANK);
}
- params.put(KEY_FIELDS, fieldNames.toString());
+ params.computeIfAbsent(KEY_FIELDS, k -> fieldNames.toString());
return params;
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
index fba388029f..7b2109c352 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/es/ElasticsearchSinkOperator.java
@@ -22,6 +22,8 @@ import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
@@ -32,6 +34,7 @@ import
org.apache.inlong.manager.pojo.sink.es.ElasticsearchFieldInfo;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkDTO;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSinkRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtParam;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -78,6 +81,14 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
try {
ElasticsearchSinkDTO dto =
ElasticsearchSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());
+ InlongStreamEntity stream = inlongStreamEntityMapper
+ .selectByIdentifier(request.getInlongGroupId(),
request.getInlongStreamId());
+ dto.setSeparator(String.valueOf((char)
(Integer.parseInt(stream.getDataSeparator()))));
+
+ InlongStreamExtParam streamExt =
+ JsonUtils.parseObject(stream.getExtParams(),
InlongStreamExtParam.class);
+ dto.setFieldOffset(streamExt.getExtendedFieldSize());
+
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
@@ -108,7 +119,7 @@ public class ElasticsearchSinkOperator extends
AbstractSinkOperator {
for (String field : fields) {
sb.append(field).append(" ");
}
- idParams.put(KEY_FIELDS, sb.toString());
+ idParams.computeIfAbsent(KEY_FIELDS, k -> sb.toString());
return idParams;
}