This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ca4bc8dce9 [INLONG-10268][Agent] Get the data version from the
auditVersion field (#10269)
ca4bc8dce9 is described below
commit ca4bc8dce95903b4a292356ab0c887e28c85ffde
Author: justinwwhuang <[email protected]>
AuthorDate: Mon May 27 10:36:37 2024 +0800
[INLONG-10268][Agent] Get the data version from the auditVersion field
(#10269)
* [INLONG-10268][Agent] Get the data version from the auditVersion field
* Update
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
Co-authored-by: Charles Zhang <[email protected]>
---------
Co-authored-by: Charles Zhang <[email protected]>
---
.../java/org/apache/inlong/agent/constant/TaskConstants.java | 1 +
.../org/apache/inlong/agent/message/file/ProxyMessageCache.java | 4 +++-
.../main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java | 9 ++++++++-
.../org/apache/inlong/agent/core/instance/InstanceManager.java | 3 ++-
.../java/org/apache/inlong/agent/core/task/OffsetManager.java | 4 +++-
.../org/apache/inlong/agent/plugin/instance/CommonInstance.java | 4 +++-
.../inlong/agent/plugin/sinks/filecollect/SenderManager.java | 3 ++-
.../apache/inlong/agent/plugin/sources/file/AbstractSource.java | 3 ++-
.../java/org/apache/inlong/agent/plugin/task/AbstractTask.java | 4 +++-
9 files changed, 27 insertions(+), 8 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 96669e1235..4cb1c70d12 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -69,6 +69,7 @@ public class TaskConstants extends CommonConstants {
public static final String FILE_SOURCE_EXTEND_CLASS =
"task.fileTask.extendedClass";
public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS =
"org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler";
+ public static final String TASK_AUDIT_VERSION = "task.auditVersion";
// Kafka task
public static final String TASK_KAFKA_TOPIC = "task.kafkaTask.topic";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
index 9216461579..8ad4d2b555 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
@@ -39,6 +39,7 @@ import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;
/**
@@ -61,6 +62,7 @@ public class ProxyMessageCache {
private long lastPrintTime = 0;
private long dataTime;
private boolean isRealTime = false;
+ protected long auditVersion;
/**
* extra map used when sending to dataproxy
*/
@@ -78,7 +80,7 @@ public class ProxyMessageCache {
dataTime = instanceProfile.getSinkDataTime();
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
- extraMap.put(AUDIT_VERSION, taskId);
+ extraMap.put(AUDIT_VERSION, instanceProfile.get(TASK_AUDIT_VERSION));
}
public void generateExtraMap(String dataKey) {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index d0b378dd47..ed6fef26c3 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -99,6 +99,8 @@ public class TaskProfileDto {
public static final String deafult_time_offset = "0";
+ private static final String DEFAULT_AUDIT_VERSION = "0";
+
private Task task;
private Proxy proxy;
@@ -417,7 +419,11 @@ public class TaskProfileDto {
task.setPredefinedFields(dataConfig.getPredefinedFields());
task.setCycleUnit(CycleUnitType.REAL_TIME);
task.setTimeZone(dataConfig.getTimeZone());
-
+ if (dataConfig.getAuditVersion() == null) {
+ task.setAuditVersion(DEFAULT_AUDIT_VERSION);
+ } else {
+ task.setAuditVersion(dataConfig.getAuditVersion());
+ }
// set sink type
if (dataConfig.getDataReportType() ==
NORMAL_SEND_TO_DATAPROXY.ordinal()) {
task.setSink(DEFAULT_DATA_PROXY_SINK);
@@ -537,6 +543,7 @@ public class TaskProfileDto {
private Integer state;
private String cycleUnit;
private String timeZone;
+ private String auditVersion;
private FileTask fileTask;
private BinlogTask binlogTask;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 25ce136ddf..0642e325a2 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
/**
* handle the instance created by task, including add, delete, update etc.
@@ -117,7 +118,6 @@ public class InstanceManager extends AbstractDaemon {
*/
public InstanceManager(String taskId, int instanceLimit, Db basicDb,
TaskProfileDb taskProfileDb) {
this.taskId = taskId;
- this.auditVersion = Long.parseLong(taskId);
instanceDb = new InstanceDb(basicDb);
this.taskProfileDb = taskProfileDb;
this.agentConf = AgentConfiguration.getAgentConf();
@@ -298,6 +298,7 @@ public class InstanceManager extends AbstractDaemon {
private void restoreFromDb() {
taskFromDb = taskProfileDb.getTask(taskId);
+ auditVersion = Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION));
List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
profileList.forEach((profile) -> {
InstanceStateEnum state = profile.getState();
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 41dc95ca95..fc7c41a7e7 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -40,6 +40,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
+
/**
* used to store instance offset to db
* where key is task id + read file name and value is instance offset
@@ -168,7 +170,7 @@ public class OffsetManager extends AbstractDaemon {
instanceDb.deleteInstance(taskId, instanceId);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB,
instanceFromDb.getInlongGroupId(),
instanceFromDb.getInlongStreamId(),
instanceFromDb.getSinkDataTime(), 1, 1,
- Long.parseLong(taskId));
+ Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION)));
iterator.remove();
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index 566fb7be44..7eb77c7237 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
+
/**
* common instance contains source and sink.
* main job is to read from source and write to sink
@@ -66,7 +68,7 @@ public abstract class CommonInstance extends Instance {
try {
instanceManager = (InstanceManager) srcManager;
profile = srcProfile;
- auditVersion = Long.parseLong(getTaskId());
+ auditVersion = Long.parseLong(srcProfile.get(TASK_AUDIT_VERSION));
setInodeInfo(profile);
LOGGER.info("task id: {} submit new instance {} profile detail
{}.", profile.getTaskId(),
profile.getInstanceId(), profile.toJsonStr());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 0eba81e4d2..3422f3b05e 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -56,6 +56,7 @@ import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AD
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_TASK_PROXY_SEND;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_PROXY_SEND;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
@@ -110,7 +111,7 @@ public class SenderManager {
public SenderManager(InstanceProfile profile, String inlongGroupId, String
sourcePath) {
this.profile = profile;
- auditVersion = Long.parseLong(profile.getTaskId());
+ auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION));
managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
proxySend = profile.getBoolean(TASK_PROXY_SEND,
DEFAULT_TASK_PROXY_SEND);
totalAsyncBufSize = profile
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index bc5225206a..4b36ad30cb 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -64,6 +64,7 @@ import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_REA
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
import static
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
@@ -119,7 +120,7 @@ public abstract class AbstractSource implements Source {
public void init(InstanceProfile profile) {
this.profile = profile;
taskId = profile.getTaskId();
- auditVersion = Long.parseLong(taskId);
+ auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION));
instanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index b288ff1946..a6d8d03482 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -36,6 +36,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
+import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
+
public abstract class AbstractTask extends Task {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractTask.class);
@@ -55,7 +57,7 @@ public abstract class AbstractTask extends Task {
taskManager = (TaskManager) srcManager;
this.taskProfile = taskProfile;
this.basicDb = basicDb;
- auditVersion = Long.parseLong(taskProfile.getTaskId());
+ auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
instanceManager = new InstanceManager(taskProfile.getTaskId(),
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
basicDb, taskManager.getTaskDb());
try {