This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ff41d6aa2b [INLONG-9209][Manager] Support configuring predefined
fields and issuing agents (#9210)
ff41d6aa2b is described below
commit ff41d6aa2bd7e4ec21f5bae53b51b27a0d568d5d
Author: fuweng11 <[email protected]>
AuthorDate: Fri Nov 3 16:22:30 2023 +0800
[INLONG-9209][Manager] Support configuring predefined fields and issuing
agents (#9210)
* [INLONG-9209][Manager] Support configuring predefined fields and issuing
agents
* [INLONG-9209][Manager] Fix UT
---
.../java/org/apache/inlong/common/pojo/agent/DataConfig.java | 1 +
.../org/apache/inlong/manager/pojo/stream/BaseInlongStream.java | 3 +++
.../apache/inlong/manager/pojo/stream/InlongStreamExtParam.java | 3 +++
.../inlong/manager/service/core/impl/AgentServiceImpl.java | 9 ++++++++-
4 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index 975f74d128..30bbafab5e 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -48,6 +48,7 @@ public class DataConfig {
private Integer syncSend;
private String syncPartitionKey;
private Integer state;
+ private String predefinedFields;
private String extParams;
/**
* The task version.
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
index 1327c9ac7b..4abd2f836f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
@@ -20,15 +20,18 @@ package org.apache.inlong.manager.pojo.stream;
import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
/**
* The base parameter class of InlongStream, support user extend their own
business params.
*/
@Data
@AllArgsConstructor
+@NoArgsConstructor
@ApiModel("Base info of inlong stream")
public class BaseInlongStream {
// you can add extend parameters in this class
+ private String predefinedFields;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 0dba8034af..ad69b997c9 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -48,6 +48,9 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
+ @ApiModelProperty(value = "Predefined fields")
+ private String predefinedFields;
+
/**
* Pack extended attributes into ExtParams
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0d4ff83a37..d4731d4e55 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -55,6 +55,7 @@ import
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
@@ -101,6 +102,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
+import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
/**
* Agent service layer implementation
@@ -161,7 +163,7 @@ public class AgentServiceImpl implements AgentService {
// because the eviction handler needs to query cluster info cache
long expireTime = 10 * 5;
taskCache = Caffeine.newBuilder()
- .expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS)
+ .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
.build(this::fetchTask);
if (updateTaskTimeoutEnabled) {
@@ -601,6 +603,11 @@ public class AgentServiceImpl implements AgentService {
extParams = (null != dataSeparator ? getExtParams(extParams,
dataSeparator) : extParams);
}
+ InlongStreamInfo streamInfo =
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+ // Processing extParams
+ unpackExtParams(streamEntity.getExtParams(), streamInfo);
+ dataConfig.setPredefinedFields(streamInfo.getPredefinedFields());
+
int dataReportType = groupEntity.getDataReportType();
dataConfig.setDataReportType(dataReportType);
if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {