This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3596862db0 [INLONG-9215][Agent] Add predefine fields (#9217)
3596862db0 is described below
commit 3596862db00db0a9ce11e406896a4f9d362ab3ee
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Nov 3 17:39:01 2023 +0800
[INLONG-9215][Agent] Add predefine fields (#9217)
* [INLONG-9215][Agent] Add predefine fields
* [INLONG-9215][Agent] Add predefine fields
* [INLONG-9215][Agent] Add predefine fields
---
.../apache/inlong/agent/conf/InstanceProfile.java | 4 ++++
.../inlong/agent/constant/TaskConstants.java | 1 +
.../apache/inlong/agent/pojo/TaskProfileDto.java | 4 +++-
.../inlong/agent/plugin/sources/LogFileSource.java | 4 +++-
.../inlong/agent/plugin/utils/MetaDataUtils.java | 25 ++++++++++++++++++++++
5 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index 5592008085..5c3e74fe86 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -101,6 +101,10 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
setLong(TaskConstants.FILE_UPDATE_TIME, lastUpdateTime);
}
+ public String getPredefineFields() {
+ return get(TaskConstants.PREDEFINE_FIELDS, "");
+ }
+
@Override
public boolean allRequiredKeyExist() {
return hasKey(TaskConstants.FILE_UPDATE_TIME);
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index fa2ac856fd..facefd011c 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -80,6 +80,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
+ public static final String PREDEFINE_FIELDS = "task.predefineFields";
// Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 964ac1cf02..0fcf7a95f4 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -154,7 +154,7 @@ public class TaskProfileDto {
}
if (null != taskConfig.getLineEndPattern()) {
- FileTask.Line line = new Line();
+ Line line = new Line();
line.setEndPattern(taskConfig.getLineEndPattern());
fileTask.setLine(line);
}
@@ -410,6 +410,7 @@ public class TaskProfileDto {
task.setUuid(dataConfig.getUuid());
task.setVersion(dataConfig.getVersion());
task.setState(dataConfig.getState());
+ task.setPredefinedFields(dataConfig.getPredefinedFields());
// set sink type
if (dataConfig.getDataReportType() ==
NORMAL_SEND_TO_DATAPROXY.ordinal()) {
@@ -516,6 +517,7 @@ public class TaskProfileDto {
private String mqClusters;
private String topicInfo;
private String taskClass;
+ private String predefinedFields;
private Integer state;
private FileTask fileTask;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 1e187e2bf4..c056b3aae1 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -32,6 +32,7 @@ import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
+import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -137,7 +138,7 @@ public class LogFileSource extends AbstractSource {
@Override
public void init(InstanceProfile profile) {
try {
- LOGGER.info("FileReaderOperator init: {}", profile.toJsonStr());
+ LOGGER.info("LogFileSource init: {}", profile.toJsonStr());
this.profile = profile;
super.init(profile);
taskId = profile.getTaskId();
@@ -348,6 +349,7 @@ public class LogFileSource extends AbstractSource {
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.offset.toString());
+
header.putAll(MetaDataUtils.parseAddAttr(profile.getPredefineFields()));
Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index aa215bb539..d2028bfc52 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -136,4 +137,28 @@ public class MetaDataUtils {
}).filter(Objects::nonNull).collect(Collectors.toList());
return podName.isEmpty() ? null : podName.get(0);
}
+
+ public static Map<String, String> parseAddAttr(String addictiveAttr) {
+ StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
+ Map<String, String> attr = new HashMap<String, String>();
+ while (token.hasMoreTokens()) {
+ String value = token.nextToken().trim();
+ if (value.contains("=")) {
+ String[] pairs = value.split("=");
+
+ if (pairs[0].equalsIgnoreCase("m")) {
+ continue;
+ }
+
+ // when addictiveattr like "m=10&__addcol1__worldid="
+ if (value.endsWith("=") && pairs.length == 1) {
+ attr.put(pairs[0], "");
+ } else {
+ attr.put(pairs[0], pairs[1]);
+ }
+
+ }
+ }
+ return attr;
+ }
}