This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fabb0c466b [INLONG-10094][agent] Fix the bug: the task record for data
supplementation has not expired (#10095)
fabb0c466b is described below
commit fabb0c466b8ef0e895162c13968464559eaef709
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Apr 29 16:49:50 2024 +0800
[INLONG-10094][agent] Fix the bug: the task record for data supplementation
has not expired (#10095)
* [INLONG-10094][agent] Fix the bug: the task record for data
supplementation has not expired
* [INLONG-10094][agent] Modify based on comments
---
.../org/apache/inlong/agent/db/InstanceDb.java | 14 ++++
.../agent/core/instance/InstanceManager.java | 59 ++------------
.../inlong/agent/core/task/OffsetManager.java | 93 ++++++++++++++++++----
.../apache/inlong/agent/core/task/TaskManager.java | 2 +-
.../agent/plugin/sources/TestLogFileSource.java | 6 +-
5 files changed, 102 insertions(+), 72 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
index db41243fbf..acc0fbfee3 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
@@ -40,6 +40,20 @@ public class InstanceDb {
this.db = db;
}
+ /**
+ * list all instance from db.
+ *
+ * @return list of task
+ */
+ public List<InstanceProfile> listAllInstances() {
+ List<KeyValueEntity> result = this.db.findAll("");
+ List<InstanceProfile> instanceList = new ArrayList<>();
+ for (KeyValueEntity entity : result) {
+ instanceList.add(entity.getAsInstanceProfile());
+ }
+ return instanceList;
+ }
+
/**
* get instance list from db.
*
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 5545039f5e..d388f2293e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -22,22 +22,18 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.InstanceDb;
import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
-import org.apache.inlong.common.enums.TaskStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
@@ -57,11 +52,9 @@ public class InstanceManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceManager.class);
private static final int ACTION_QUEUE_CAPACITY = 100;
- public static final int CLEAN_INSTANCE_ONCE_LIMIT = 10;
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
- public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000;
- private long lastCleanTime = 0;
- public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
+ public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
+ private long lastPrintTime = 0;
// instance in db
private final InstanceDb instanceDb;
private TaskProfileDb taskProfileDb;
@@ -167,7 +160,7 @@ public class InstanceManager extends AbstractDaemon {
while (isRunnable()) {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
- cleanDbInstance();
+ printInstanceState();
dealWithActionQueue(actionQueue);
keepPaceWithDb();
String inlongGroupId = taskFromDb.getInlongGroupId();
@@ -184,10 +177,10 @@ public class InstanceManager extends AbstractDaemon {
};
}
- private void cleanDbInstance() {
- if (AgentUtils.getCurrentTime() - lastCleanTime >
INSTANCE_DB_CLEAN_INTERVAL_MS) {
+ private void printInstanceState() {
+ long currentTime = AgentUtils.getCurrentTime();
+ if (currentTime - lastPrintTime > INSTANCE_PRINT_INTERVAL_MS) {
List<InstanceProfile> instances = instanceDb.getInstances(taskId);
- doCleanDbInstance(instances);
InstancePrintStat stat = new InstancePrintStat();
for (int i = 0; i < instances.size(); i++) {
InstanceProfile instance = instances.get(i);
@@ -196,45 +189,7 @@ public class InstanceManager extends AbstractDaemon {
LOGGER.info(
"instanceManager running! taskId {} mem {} db total {} {}
action count {}",
taskId, instanceMap.size(), instances.size(), stat,
actionQueue.size());
- lastCleanTime = AgentUtils.getCurrentTime();
- }
- }
-
- private void doCleanDbInstance(List<InstanceProfile> instances) {
- AtomicInteger cleanCount = new AtomicInteger();
- Iterator<InstanceProfile> iterator = instances.iterator();
- while (iterator.hasNext()) {
- if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
- return;
- }
- InstanceProfile instanceFromDb = iterator.next();
- if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
- return;
- }
- TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
- if (taskFromDb != null) {
- if
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
- return;
- }
- if (taskFromDb.isRetry()) {
- if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) {
- return;
- }
- } else {
- if (instanceFromDb.getState() !=
InstanceStateEnum.FINISHED) {
- return;
- }
- }
- }
- long expireTime =
DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT +
taskFromDb.getCycleUnit());
- if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() >
expireTime) {
- cleanCount.getAndIncrement();
- LOGGER.info("instance has expired, delete from db dataTime {}
taskId {} instanceId {}",
- instanceFromDb.getSourceDataTime(),
instanceFromDb.getTaskId(),
- instanceFromDb.getInstanceId());
- deleteFromDb(instanceFromDb.getInstanceId());
- iterator.remove();
- }
+ lastPrintTime = currentTime;
}
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index fca223b873..41dc95ca95 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -20,16 +20,25 @@ package org.apache.inlong.agent.core.task;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.InstanceDb;
import org.apache.inlong.agent.db.OffsetDb;
+import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* used to store instance offset to db
@@ -39,14 +48,17 @@ public class OffsetManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(OffsetManager.class);
public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
+ public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100;
+ public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
private static volatile OffsetManager offsetManager = null;
private final OffsetDb offsetDb;
- // instance in db
private final InstanceDb instanceDb;
+ private final TaskProfileDb taskProfileDb;
- private OffsetManager(Db offsetBasicDb, Db instanceBasicDb) {
- this.offsetDb = new OffsetDb(offsetBasicDb);
+ private OffsetManager(Db taskBasicDb, Db instanceBasicDb, Db
offsetBasicDb) {
+ taskProfileDb = new TaskProfileDb(taskBasicDb);
instanceDb = new InstanceDb(instanceBasicDb);
+ offsetDb = new OffsetDb(offsetBasicDb);
}
/**
@@ -60,18 +72,8 @@ public class OffsetManager extends AbstractDaemon {
while (isRunnable()) {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
- List<OffsetProfile> offsets = offsetDb.listAllOffsets();
- offsets.forEach(offset -> {
- String taskId = offset.getTaskId();
- String instanceId = offset.getInstanceId();
- InstanceProfile instanceProfile =
instanceDb.getInstance(taskId, instanceId);
- if (instanceProfile == null) {
- deleteOffset(taskId, instanceId);
- LOGGER.info("instance not found, delete offset
taskId {} instanceId {}", taskId,
- instanceId);
- }
- });
- LOGGER.info("offsetManager running! offsets count {}",
offsets.size());
+ cleanDbInstance();
+ cleanDbOffset();
} catch (Throwable ex) {
LOGGER.error("offset-manager-core: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
@@ -83,11 +85,11 @@ public class OffsetManager extends AbstractDaemon {
/**
* task position manager singleton, can only generated by agent manager
*/
- public static void init(Db offsetBasicDb, Db instanceBasicDb) {
+ public static void init(Db taskBasicDb, Db instanceBasicDb, Db
offsetBasicDb) {
if (offsetManager == null) {
synchronized (OffsetManager.class) {
if (offsetManager == null) {
- offsetManager = new OffsetManager(offsetBasicDb,
instanceBasicDb);
+ offsetManager = new OffsetManager(taskBasicDb,
instanceBasicDb, offsetBasicDb);
}
}
}
@@ -115,6 +117,63 @@ public class OffsetManager extends AbstractDaemon {
return offsetDb.getOffset(taskId, instanceId);
}
+ private void cleanDbOffset() {
+ List<OffsetProfile> offsets = offsetDb.listAllOffsets();
+ offsets.forEach(offset -> {
+ String taskId = offset.getTaskId();
+ String instanceId = offset.getInstanceId();
+ InstanceProfile instanceProfile = instanceDb.getInstance(taskId,
instanceId);
+ if (instanceProfile == null) {
+ deleteOffset(taskId, instanceId);
+ LOGGER.info("instance not found, delete offset taskId {}
instanceId {}", taskId,
+ instanceId);
+ }
+ });
+ LOGGER.info("offsetManager running! offsets count {}", offsets.size());
+ }
+
+ private void cleanDbInstance() {
+ AtomicInteger cleanCount = new AtomicInteger();
+ Iterator<InstanceProfile> iterator =
instanceDb.listAllInstances().listIterator();
+ while (iterator.hasNext()) {
+ if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
+ return;
+ }
+ InstanceProfile instanceFromDb = iterator.next();
+ String taskId = instanceFromDb.getTaskId();
+ String instanceId = instanceFromDb.getInstanceId();
+ if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
+ continue;
+ }
+ TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
+ if (taskFromDb != null) {
+ if
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ continue;
+ }
+ if (taskFromDb.isRetry()) {
+ if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) {
+ continue;
+ }
+ } else {
+ if (instanceFromDb.getState() !=
InstanceStateEnum.FINISHED) {
+ continue;
+ }
+ }
+ }
+ long expireTime =
DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT +
taskFromDb.getCycleUnit());
+ if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() >
expireTime) {
+ cleanCount.getAndIncrement();
+ LOGGER.info("instance has expired, delete from db dataTime {}
taskId {} instanceId {}",
+ instanceFromDb.getSourceDataTime(), taskId,
instanceId);
+ instanceDb.deleteInstance(taskId, instanceId);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB,
instanceFromDb.getInlongGroupId(),
+ instanceFromDb.getInlongStreamId(),
instanceFromDb.getSinkDataTime(), 1, 1,
+ Long.parseLong(taskId));
+ iterator.remove();
+ }
+ }
+ }
+
@Override
public void start() throws Exception {
submitWorker(coreThread());
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index e3815f725e..eca7f6b25f 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -132,7 +132,7 @@ public class TaskManager extends AbstractDaemon {
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
offsetBasicDb =
initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
- OffsetManager.init(offsetBasicDb, instanceBasicDb);
+ OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
this.runningPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index a9d683750c..1923dc5f1c 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -57,6 +57,8 @@ public class TestLogFileSource {
private static final Gson GSON = new Gson();
private static final String[] check = {"hello line-end-symbol aa", "world
line-end-symbol",
"agent line-end-symbol"};
+ // task basic db
+ private static Db taskBasicDb;
// instance basic db
private static Db instanceBasicDb;
// offset basic db
@@ -64,12 +66,12 @@ public class TestLogFileSource {
@BeforeClass
public static void setup() {
-
helper = new
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
+ taskBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
instanceBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
offsetBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
- OffsetManager.init(offsetBasicDb, instanceBasicDb);
+ OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
}
private LogFileSource getSource(int taskId, long offset) {