This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 764b1e5251 [INLONG-11811][Agent] Increase the retention time of
offset, default to 7 days (#11812)
764b1e5251 is described below
commit 764b1e52517e60413ccec099d6e6f31224d2c868
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Mar 26 17:10:19 2025 +0800
[INLONG-11811][Agent] Increase the retention time of offset, default to 7
days (#11812)
---
.../main/java/org/apache/inlong/agent/constant/AgentConstants.java | 5 +++--
.../main/java/org/apache/inlong/agent/core/task/OffsetManager.java | 6 ++++--
.../inlong/agent/plugin/task/logcollection/LogAbstractTask.java | 4 ++--
3 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 45ba789250..d9880800e6 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -77,10 +77,11 @@ public class AgentConstants {
public static final boolean DEFAULT_ENABLE_OOM_EXIT = false;
public static final String AGENT_SCAN_RANGE = "agent.scan.range";
+ public static final String AGENT_OFFSET_TTL = "agent.offset.ttl";
public static final String DEFAULT_AGENT_SCAN_RANGE = "-2";
public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
- public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
- public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";
+ public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-2";
+ public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-120";
public static final String AGENT_INSTANCE_LIMIT = "agent.instance.limit";
public static final int DEFAULT_AGENT_INSTANCE_LIMIT = 100;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index b82e399c81..bd2925305e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -53,7 +53,7 @@ public class OffsetManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(OffsetManager.class);
public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
public static final int CLEAN_INSTANCE_ONCE_LIMIT = 1000;
- public static final long TWO_HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
+ public static final long SEVEN_DAY_TIMEOUT_INTERVAL_MS = 7 * 24 * 3600 *
1000;
private static volatile OffsetManager offsetManager = null;
private final OffsetStore offsetStore;
private final InstanceStore instanceStore;
@@ -163,7 +163,9 @@ public class OffsetManager extends AbstractDaemon {
}
}
}
- long expireTime =
Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) +
TWO_HOUR_TIMEOUT_INTERVAL;
+ long expireTime =
+ Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit()))
+ AgentConfiguration.getAgentConf()
+ .getLong(AgentConstants.AGENT_OFFSET_TTL,
SEVEN_DAY_TIMEOUT_INTERVAL_MS);
if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() >
expireTime) {
cleanCount.getAndIncrement();
LOGGER.info("instance has expired, delete from instance store
dataTime {} taskId {} instanceId {}",
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
index 49e45ba751..1e354e7c4b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/logcollection/LogAbstractTask.java
@@ -45,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
public abstract class LogAbstractTask extends AbstractTask {
private static final int INSTANCE_QUEUE_CAPACITY = 10;
- public static final long ONE_HOUR_TIMEOUT_INTERVAL = 3600 * 1000;
+ public static final long ONE_HOUR_TIMEOUT_INTERVAL_MS = 3600 * 1000;
private static final Logger LOGGER =
LoggerFactory.getLogger(LogAbstractTask.class);
protected boolean retry;
protected BlockingQueue<InstanceProfile> instanceQueue;
@@ -210,7 +210,7 @@ public abstract class LogAbstractTask extends AbstractTask {
String dataTime = entry.getKey();
if (!DateUtils.isValidCreationTime(dataTime,
Math.abs(OffsetManager.getScanCycleRange(taskProfile.getCycleUnit()))
- + ONE_HOUR_TIMEOUT_INTERVAL)) {
+ + ONE_HOUR_TIMEOUT_INTERVAL_MS)) {
/* Remove it from memory map. */
eventMap.remove(dataTime);
LOGGER.warn("remove too old event from event map taskId {}
dataTime {}", taskProfile.getTaskId(),