This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fabb0c466b [INLONG-10094][agent] Fix the bug: the task record for data 
supplementation has not expired (#10095)
fabb0c466b is described below

commit fabb0c466b8ef0e895162c13968464559eaef709
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Apr 29 16:49:50 2024 +0800

    [INLONG-10094][agent] Fix the bug: the task record for data supplementation 
has not expired (#10095)
    
    * [INLONG-10094][agent] Fix the bug: the task record for data 
supplementation has not expired
    
    * [INLONG-10094][agent] Modify based on comments
---
 .../org/apache/inlong/agent/db/InstanceDb.java     | 14 ++++
 .../agent/core/instance/InstanceManager.java       | 59 ++------------
 .../inlong/agent/core/task/OffsetManager.java      | 93 ++++++++++++++++++----
 .../apache/inlong/agent/core/task/TaskManager.java |  2 +-
 .../agent/plugin/sources/TestLogFileSource.java    |  6 +-
 5 files changed, 102 insertions(+), 72 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
index db41243fbf..acc0fbfee3 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
@@ -40,6 +40,20 @@ public class InstanceDb {
         this.db = db;
     }
 
+    /**
+     * list all instance from db.
+     *
+     * @return list of task
+     */
+    public List<InstanceProfile> listAllInstances() {
+        List<KeyValueEntity> result = this.db.findAll("");
+        List<InstanceProfile> instanceList = new ArrayList<>();
+        for (KeyValueEntity entity : result) {
+            instanceList.add(entity.getAsInstanceProfile());
+        }
+        return instanceList;
+    }
+
     /**
      * get instance list from db.
      *
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 5545039f5e..d388f2293e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -22,22 +22,18 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.InstanceDb;
 import org.apache.inlong.agent.db.TaskProfileDb;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Instance;
 import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
-import org.apache.inlong.common.enums.TaskStateEnum;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +41,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
 
@@ -57,11 +52,9 @@ public class InstanceManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceManager.class);
     private static final int ACTION_QUEUE_CAPACITY = 100;
-    public static final int CLEAN_INSTANCE_ONCE_LIMIT = 10;
     public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
-    public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000;
-    private long lastCleanTime = 0;
-    public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
+    public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
+    private long lastPrintTime = 0;
     // instance in db
     private final InstanceDb instanceDb;
     private TaskProfileDb taskProfileDb;
@@ -167,7 +160,7 @@ public class InstanceManager extends AbstractDaemon {
             while (isRunnable()) {
                 try {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
-                    cleanDbInstance();
+                    printInstanceState();
                     dealWithActionQueue(actionQueue);
                     keepPaceWithDb();
                     String inlongGroupId = taskFromDb.getInlongGroupId();
@@ -184,10 +177,10 @@ public class InstanceManager extends AbstractDaemon {
         };
     }
 
-    private void cleanDbInstance() {
-        if (AgentUtils.getCurrentTime() - lastCleanTime > 
INSTANCE_DB_CLEAN_INTERVAL_MS) {
+    private void printInstanceState() {
+        long currentTime = AgentUtils.getCurrentTime();
+        if (currentTime - lastPrintTime > INSTANCE_PRINT_INTERVAL_MS) {
             List<InstanceProfile> instances = instanceDb.getInstances(taskId);
-            doCleanDbInstance(instances);
             InstancePrintStat stat = new InstancePrintStat();
             for (int i = 0; i < instances.size(); i++) {
                 InstanceProfile instance = instances.get(i);
@@ -196,45 +189,7 @@ public class InstanceManager extends AbstractDaemon {
             LOGGER.info(
                     "instanceManager running! taskId {} mem {} db total {} {} 
action count {}",
                     taskId, instanceMap.size(), instances.size(), stat, 
actionQueue.size());
-            lastCleanTime = AgentUtils.getCurrentTime();
-        }
-    }
-
-    private void doCleanDbInstance(List<InstanceProfile> instances) {
-        AtomicInteger cleanCount = new AtomicInteger();
-        Iterator<InstanceProfile> iterator = instances.iterator();
-        while (iterator.hasNext()) {
-            if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
-                return;
-            }
-            InstanceProfile instanceFromDb = iterator.next();
-            if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
-                return;
-            }
-            TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
-            if (taskFromDb != null) {
-                if 
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
-                    return;
-                }
-                if (taskFromDb.isRetry()) {
-                    if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) {
-                        return;
-                    }
-                } else {
-                    if (instanceFromDb.getState() != 
InstanceStateEnum.FINISHED) {
-                        return;
-                    }
-                }
-            }
-            long expireTime = 
DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT + 
taskFromDb.getCycleUnit());
-            if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > 
expireTime) {
-                cleanCount.getAndIncrement();
-                LOGGER.info("instance has expired, delete from db dataTime {} 
taskId {} instanceId {}",
-                        instanceFromDb.getSourceDataTime(), 
instanceFromDb.getTaskId(),
-                        instanceFromDb.getInstanceId());
-                deleteFromDb(instanceFromDb.getInstanceId());
-                iterator.remove();
-            }
+            lastPrintTime = currentTime;
         }
     }
 
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index fca223b873..41dc95ca95 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -20,16 +20,25 @@ package org.apache.inlong.agent.core.task;
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.InstanceDb;
 import org.apache.inlong.agent.db.OffsetDb;
+import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * used to store instance offset to db
@@ -39,14 +48,17 @@ public class OffsetManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetManager.class);
     public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
+    public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100;
+    public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
     private static volatile OffsetManager offsetManager = null;
     private final OffsetDb offsetDb;
-    // instance in db
     private final InstanceDb instanceDb;
+    private final TaskProfileDb taskProfileDb;
 
-    private OffsetManager(Db offsetBasicDb, Db instanceBasicDb) {
-        this.offsetDb = new OffsetDb(offsetBasicDb);
+    private OffsetManager(Db taskBasicDb, Db instanceBasicDb, Db 
offsetBasicDb) {
+        taskProfileDb = new TaskProfileDb(taskBasicDb);
         instanceDb = new InstanceDb(instanceBasicDb);
+        offsetDb = new OffsetDb(offsetBasicDb);
     }
 
     /**
@@ -60,18 +72,8 @@ public class OffsetManager extends AbstractDaemon {
             while (isRunnable()) {
                 try {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-                    List<OffsetProfile> offsets = offsetDb.listAllOffsets();
-                    offsets.forEach(offset -> {
-                        String taskId = offset.getTaskId();
-                        String instanceId = offset.getInstanceId();
-                        InstanceProfile instanceProfile = 
instanceDb.getInstance(taskId, instanceId);
-                        if (instanceProfile == null) {
-                            deleteOffset(taskId, instanceId);
-                            LOGGER.info("instance not found, delete offset 
taskId {} instanceId {}", taskId,
-                                    instanceId);
-                        }
-                    });
-                    LOGGER.info("offsetManager running! offsets count {}", 
offsets.size());
+                    cleanDbInstance();
+                    cleanDbOffset();
                 } catch (Throwable ex) {
                     LOGGER.error("offset-manager-core: ", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
@@ -83,11 +85,11 @@ public class OffsetManager extends AbstractDaemon {
     /**
      * task position manager singleton, can only generated by agent manager
      */
-    public static void init(Db offsetBasicDb, Db instanceBasicDb) {
+    public static void init(Db taskBasicDb, Db instanceBasicDb, Db 
offsetBasicDb) {
         if (offsetManager == null) {
             synchronized (OffsetManager.class) {
                 if (offsetManager == null) {
-                    offsetManager = new OffsetManager(offsetBasicDb, 
instanceBasicDb);
+                    offsetManager = new OffsetManager(taskBasicDb, 
instanceBasicDb, offsetBasicDb);
                 }
             }
         }
@@ -115,6 +117,63 @@ public class OffsetManager extends AbstractDaemon {
         return offsetDb.getOffset(taskId, instanceId);
     }
 
+    private void cleanDbOffset() {
+        List<OffsetProfile> offsets = offsetDb.listAllOffsets();
+        offsets.forEach(offset -> {
+            String taskId = offset.getTaskId();
+            String instanceId = offset.getInstanceId();
+            InstanceProfile instanceProfile = instanceDb.getInstance(taskId, 
instanceId);
+            if (instanceProfile == null) {
+                deleteOffset(taskId, instanceId);
+                LOGGER.info("instance not found, delete offset taskId {} 
instanceId {}", taskId,
+                        instanceId);
+            }
+        });
+        LOGGER.info("offsetManager running! offsets count {}", offsets.size());
+    }
+
+    private void cleanDbInstance() {
+        AtomicInteger cleanCount = new AtomicInteger();
+        Iterator<InstanceProfile> iterator = 
instanceDb.listAllInstances().listIterator();
+        while (iterator.hasNext()) {
+            if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
+                return;
+            }
+            InstanceProfile instanceFromDb = iterator.next();
+            String taskId = instanceFromDb.getTaskId();
+            String instanceId = instanceFromDb.getInstanceId();
+            if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
+                continue;
+            }
+            TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
+            if (taskFromDb != null) {
+                if 
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+                    continue;
+                }
+                if (taskFromDb.isRetry()) {
+                    if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) {
+                        continue;
+                    }
+                } else {
+                    if (instanceFromDb.getState() != 
InstanceStateEnum.FINISHED) {
+                        continue;
+                    }
+                }
+            }
+            long expireTime = 
DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT + 
taskFromDb.getCycleUnit());
+            if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > 
expireTime) {
+                cleanCount.getAndIncrement();
+                LOGGER.info("instance has expired, delete from db dataTime {} 
taskId {} instanceId {}",
+                        instanceFromDb.getSourceDataTime(), taskId, 
instanceId);
+                instanceDb.deleteInstance(taskId, instanceId);
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, 
instanceFromDb.getInlongGroupId(),
+                        instanceFromDb.getInlongStreamId(), 
instanceFromDb.getSinkDataTime(), 1, 1,
+                        Long.parseLong(taskId));
+                iterator.remove();
+            }
+        }
+    }
+
     @Override
     public void start() throws Exception {
         submitWorker(coreThread());
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index e3815f725e..eca7f6b25f 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -132,7 +132,7 @@ public class TaskManager extends AbstractDaemon {
                 agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
         offsetBasicDb =
                 initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
-        OffsetManager.init(offsetBasicDb, instanceBasicDb);
+        OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
         this.runningPool = new ThreadPoolExecutor(
                 0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index a9d683750c..1923dc5f1c 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -57,6 +57,8 @@ public class TestLogFileSource {
     private static final Gson GSON = new Gson();
     private static final String[] check = {"hello line-end-symbol aa", "world 
line-end-symbol",
             "agent line-end-symbol"};
+    // task basic db
+    private static Db taskBasicDb;
     // instance basic db
     private static Db instanceBasicDb;
     // offset basic db
@@ -64,12 +66,12 @@ public class TestLogFileSource {
 
     @BeforeClass
     public static void setup() {
-
         helper = new 
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
+        taskBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
         instanceBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
         offsetBasicDb =
                 TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
-        OffsetManager.init(offsetBasicDb, instanceBasicDb);
+        OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
     }
 
     private LogFileSource getSource(int taskId, long offset) {

Reply via email to