This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b1c0beb955 [INLONG-9237][Agent] Move addictive fields to package
attributes (#9238)
b1c0beb955 is described below
commit b1c0beb955763dd9e3b9fdefafa9d1d36d51486a
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Nov 8 15:25:52 2023 +0800
[INLONG-9237][Agent] Move addictive fields to package attributes (#9238)
---
.../inlong/agent/constant/TaskConstants.java | 2 +-
.../message/filecollect/ProxyMessageCache.java | 1 +
.../org/apache/inlong/agent/utils/AgentUtils.java | 25 ++++++++++++++++++++++
.../inlong/agent/plugin/sources/LogFileSource.java | 2 --
.../inlong/agent/plugin/utils/MetaDataUtils.java | 25 ----------------------
5 files changed, 27 insertions(+), 28 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index facefd011c..3285936951 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -80,7 +80,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_START_TIME = "task.fileTask.startTime";
public static final String TASK_END_TIME = "task.fileTask.endTime";
public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
- public static final String PREDEFINE_FIELDS = "task.predefineFields";
+ public static final String PREDEFINE_FIELDS = "task.predefinedFields";
// Binlog job
public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 5426c2eb54..7ca74fb603 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -79,6 +79,7 @@ public class ProxyMessageCache {
this.streamId = streamId;
this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
+
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
}
public void generateExtraMap(String dataKey) {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index fd87faca40..01c1567726 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -40,6 +40,7 @@ import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.StringTokenizer;
import java.util.TimeZone;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -251,6 +252,30 @@ public class AgentUtils {
return Pair.of(mValue, attr);
}
+ public static Map<String, String> parseAddAttrToMap(String addictiveAttr) {
+ StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
+ Map<String, String> attr = new HashMap<String, String>();
+ while (token.hasMoreTokens()) {
+ String value = token.nextToken().trim();
+ if (value.contains("=")) {
+ String[] pairs = value.split("=");
+
+ if (pairs[0].equalsIgnoreCase("m")) {
+ continue;
+ }
+
+ // when addictiveattr like "m=10&__addcol1__worldid="
+ if (value.endsWith("=") && pairs.length == 1) {
+ attr.put(pairs[0], "");
+ } else {
+ attr.put(pairs[0], pairs[1]);
+ }
+
+ }
+ }
+ return attr;
+ }
+
/**
* Get the attrs in pairs can be complicated in online env
*/
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 5cbec1efda..ab96b6a8e8 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -32,7 +32,6 @@ import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
-import org.apache.inlong.agent.plugin.utils.MetaDataUtils;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -349,7 +348,6 @@ public class LogFileSource extends AbstractSource {
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.offset.toString());
-
header.putAll(MetaDataUtils.parseAddAttr(profile.getPredefineFields()));
Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
index d2028bfc52..aa215bb539 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -137,28 +136,4 @@ public class MetaDataUtils {
}).filter(Objects::nonNull).collect(Collectors.toList());
return podName.isEmpty() ? null : podName.get(0);
}
-
- public static Map<String, String> parseAddAttr(String addictiveAttr) {
- StringTokenizer token = new StringTokenizer(addictiveAttr, "&");
- Map<String, String> attr = new HashMap<String, String>();
- while (token.hasMoreTokens()) {
- String value = token.nextToken().trim();
- if (value.contains("=")) {
- String[] pairs = value.split("=");
-
- if (pairs[0].equalsIgnoreCase("m")) {
- continue;
- }
-
- // when addictiveattr like "m=10&__addcol1__worldid="
- if (value.endsWith("=") && pairs.length == 1) {
- attr.put(pairs[0], "");
- } else {
- attr.put(pairs[0], pairs[1]);
- }
-
- }
- }
- return attr;
- }
}