This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a0f123b676 [INLONG-10384][Agent] Add functions to the Store interface 
to extend new storage plugins (#10385)
a0f123b676 is described below

commit a0f123b676691a24aa01433d621a9406d7601d13
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Jun 12 17:33:14 2024 +0800

    [INLONG-10384][Agent] Add functions to the Store interface to extend new 
storage plugins (#10385)
---
 .../org/apache/inlong/agent/plugin/file/Task.java  |   4 +-
 .../InstanceDb.java => store/InstanceStore.java}   |  45 +++--
 .../inlong/agent/{db => store}/KeyValueEntity.java |   2 +-
 .../inlong/agent/{db => store}/LocalProfile.java   |   2 +-
 .../{db/OffsetDb.java => store/OffsetStore.java}   |  46 ++---
 .../RocksDbImp.java => store/RocksStoreImp.java}   |  29 ++-
 .../inlong/agent/{db => store}/StateSearchKey.java |   2 +-
 .../inlong/agent/{db/Db.java => store/Store.java}  |  12 +-
 .../TaskProfileDb.java => store/TaskStore.java}    |  36 ++--
 .../TestRocksStoreImp.java}                        |  30 +--
 .../org/apache/inlong/agent/core/AgentManager.java |   2 +-
 .../agent/core/instance/InstanceManager.java       | 132 ++++++-------
 .../inlong/agent/core/task/OffsetManager.java      |  53 +++---
 .../apache/inlong/agent/core/task/TaskManager.java | 208 ++++++++++-----------
 .../agent/core/instance/TestInstanceManager.java   |  20 +-
 .../apache/inlong/agent/core/task/MockTask.java    |   4 +-
 .../inlong/agent/core/task/TestTaskManager.java    |   6 +-
 .../org/apache/inlong/agent/installer/Manager.java |   2 +-
 .../inlong/agent/installer/ModuleManager.java      |   3 +-
 .../installer/conf/InstallerConfiguration.java     |   2 +-
 .../inlong/agent/plugin/sources/LogFileSource.java |   2 +-
 .../inlong/agent/plugin/task/AbstractTask.java     |  10 +-
 .../apache/inlong/agent/plugin/task/CronTask.java  |   4 +-
 .../inlong/agent/plugin/utils/RocksDBUtils.java    |  16 +-
 .../agent/plugin/sources/TestLogFileSource.java    |  24 +--
 .../inlong/agent/plugin/task/TestLogFileTask.java  |   4 +-
 26 files changed, 366 insertions(+), 334 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
index e10d872ca3..40ad855db7 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.agent.plugin.file;
 
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.state.AbstractStateWrapper;
+import org.apache.inlong.agent.store.Store;
 
 import java.io.IOException;
 
@@ -33,7 +33,7 @@ public abstract class Task extends AbstractStateWrapper {
      *
      * @throws IOException
      */
-    public abstract void init(Object srcManager, TaskProfile profile, Db 
basicDb) throws IOException;
+    public abstract void init(Object srcManager, TaskProfile profile, Store 
basicStore) throws IOException;
 
     /**
      * destroy task.
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
similarity index 70%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
index acc0fbfee3..2551734134 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
@@ -28,25 +28,25 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * db interface for instance profile.
+ * Store for instance profile
  */
-public class InstanceDb {
+public class InstanceStore {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskProfileDb.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceStore.class);
 
-    private final Db db;
+    private final Store store;
 
-    public InstanceDb(Db db) {
-        this.db = db;
+    public InstanceStore(Store store) {
+        this.store = store;
     }
 
     /**
-     * list all instance from db.
+     * list all instance from instance store.
      *
      * @return list of task
      */
     public List<InstanceProfile> listAllInstances() {
-        List<KeyValueEntity> result = this.db.findAll("");
+        List<KeyValueEntity> result = this.store.findAll(store.getUniqueKey());
         List<InstanceProfile> instanceList = new ArrayList<>();
         for (KeyValueEntity entity : result) {
             instanceList.add(entity.getAsInstanceProfile());
@@ -55,12 +55,12 @@ public class InstanceDb {
     }
 
     /**
-     * get instance list from db.
+     * get instance list from instance store.
      *
      * @return list of task
      */
     public List<InstanceProfile> getInstances(String taskId) {
-        List<KeyValueEntity> result = this.db.findAll(getKeyByTaskId(taskId));
+        List<KeyValueEntity> result = 
this.store.findAll(getKeyByTaskId(taskId));
         List<InstanceProfile> instanceList = new ArrayList<>();
         for (KeyValueEntity entity : result) {
             instanceList.add(entity.getAsInstanceProfile());
@@ -79,7 +79,7 @@ public class InstanceDb {
                     instance.get(TaskConstants.INSTANCE_ID));
             KeyValueEntity entity = new KeyValueEntity(keyName,
                     instance.toJsonStr(), 
instance.get(TaskConstants.INSTANCE_ID));
-            db.put(entity);
+            store.put(entity);
         } else {
             LOGGER.error("instance profile invalid!");
         }
@@ -92,7 +92,7 @@ public class InstanceDb {
      * @param instanceId it can be file name(file collect), table name(db 
sync) etc
      */
     public InstanceProfile getInstance(String taskId, String instanceId) {
-        KeyValueEntity result = this.db.get(getKeyByTaskAndInstanceId(taskId, 
instanceId));
+        KeyValueEntity result = 
this.store.get(getKeyByTaskAndInstanceId(taskId, instanceId));
         if (result == null) {
             return null;
         }
@@ -106,18 +106,23 @@ public class InstanceDb {
      * @param instanceId it can be file name(file collect), table name(db 
sync) etc
      */
     public void deleteInstance(String taskId, String instanceId) {
-        db.remove(getKeyByTaskAndInstanceId(taskId, instanceId));
+        store.remove(getKeyByTaskAndInstanceId(taskId, instanceId));
     }
 
-    private String getKey() {
-        return CommonConstants.INSTANCE_ID_PREFIX;
+    public String getKey() {
+        if (store.getUniqueKey().isEmpty()) {
+            return CommonConstants.INSTANCE_ID_PREFIX + store.getSplitter();
+        } else {
+            return store.getUniqueKey() + store.getSplitter() + 
CommonConstants.INSTANCE_ID_PREFIX
+                    + store.getSplitter();
+        }
     }
 
-    private String getKeyByTaskId(String taskId) {
-        return CommonConstants.INSTANCE_ID_PREFIX + taskId;
+    public String getKeyByTaskId(String taskId) {
+        return getKey() + taskId;
     }
 
-    private String getKeyByTaskAndInstanceId(String taskId, String instanceId) 
{
-        return CommonConstants.INSTANCE_ID_PREFIX + taskId + "_" + instanceId;
+    public String getKeyByTaskAndInstanceId(String taskId, String instanceId) {
+        return getKeyByTaskId(taskId) + store.getSplitter() + 
store.replaceKeywords(instanceId);
     }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/KeyValueEntity.java
similarity index 98%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/KeyValueEntity.java
index e9ecc80b56..cbce2f7f7f 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/KeyValueEntity.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.JobProfile;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/LocalProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/LocalProfile.java
similarity index 98%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/LocalProfile.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/LocalProfile.java
index 99f03bea68..c293853ab1 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/LocalProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/LocalProfile.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import org.apache.inlong.agent.conf.JobProfile;
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/OffsetStore.java
similarity index 69%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/OffsetStore.java
index 5c31a2f88a..93059f70ab 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/OffsetStore.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
@@ -29,32 +29,19 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * db interface for task profile.
+ * Store for offset
  */
-public class OffsetDb {
+public class OffsetStore {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetDb.class);
-    private final Db db;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetStore.class);
+    private final Store store;
 
-    public OffsetDb(Db db) {
-        this.db = db;
-    }
-
-    /**
-     * init db by class name
-     *
-     * @return db
-     */
-    private Db initDb(String childPath) {
-        try {
-            return new RocksDbImp(childPath);
-        } catch (Exception ex) {
-            throw new UnsupportedClassVersionError(ex.getMessage());
-        }
+    public OffsetStore(Store store) {
+        this.store = store;
     }
 
     public List<OffsetProfile> listAllOffsets() {
-        List<KeyValueEntity> result = this.db.findAll("");
+        List<KeyValueEntity> result = this.store.findAll(store.getUniqueKey());
         List<OffsetProfile> offsetList = new ArrayList<>();
         for (KeyValueEntity entity : result) {
             offsetList.add(entity.getAsOffsetProfile());
@@ -63,7 +50,7 @@ public class OffsetDb {
     }
 
     public OffsetProfile getOffset(String taskId, String instanceId) {
-        KeyValueEntity result = db.get(getKey(taskId, instanceId));
+        KeyValueEntity result = store.get(getKey(taskId, instanceId));
         if (result == null) {
             return null;
         }
@@ -71,7 +58,7 @@ public class OffsetDb {
     }
 
     public void deleteOffset(String taskId, String instanceId) {
-        db.remove(getKey(taskId, instanceId));
+        store.remove(getKey(taskId, instanceId));
     }
 
     public void setOffset(OffsetProfile offsetProfile) {
@@ -81,11 +68,18 @@ public class OffsetDb {
                     offsetProfile.getInstanceId());
             KeyValueEntity entity = new KeyValueEntity(keyName,
                     offsetProfile.toJsonStr(), 
offsetProfile.get(TaskConstants.INSTANCE_ID));
-            db.put(entity);
+            store.put(entity);
         }
     }
 
-    private String getKey(String taskId, String instanceId) {
-        return CommonConstants.OFFSET_ID_PREFIX + taskId + "_" + instanceId;
+    public String getKey(String taskId, String instanceId) {
+        if (store.getUniqueKey().isEmpty()) {
+            return CommonConstants.OFFSET_ID_PREFIX + store.getSplitter() + 
taskId
+                    + store.getSplitter() + store.replaceKeywords(instanceId);
+        } else {
+            return store.getUniqueKey() + store.getSplitter() + 
CommonConstants.OFFSET_ID_PREFIX
+                    + store.getSplitter() + taskId
+                    + store.getSplitter() + store.replaceKeywords(instanceId);
+        }
     }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
similarity index 92%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
index 02edcc6165..47caee3703 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.constant.AgentConstants;
@@ -45,12 +45,14 @@ import java.util.stream.Collectors;
 import static java.util.Objects.requireNonNull;
 
 /**
- * DB implement based on the Rocks DB.
+ * Store implement based on the Rocks DB.
  */
-public class RocksDbImp implements Db {
+public class RocksStoreImp implements Store {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(RocksDbImp.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RocksStoreImp.class);
     private static final Gson GSON = new Gson();
+    public static final String SPLITTER = "_";
+    public static final String UNIQUE_KEY = "";
 
     private final AgentConfiguration conf;
     private final RocksDB db;
@@ -60,7 +62,7 @@ public class RocksDbImp implements Db {
     private ConcurrentHashMap<String, ColumnFamilyDescriptor> 
columnDescriptorMap;
     private String storePath;
 
-    public RocksDbImp(String childPath) {
+    public RocksStoreImp(String childPath) {
         // init rocks db
         this.conf = AgentConfiguration.getAgentConf();
         this.db = initEnv(childPath);
@@ -124,7 +126,7 @@ public class RocksDbImp implements Db {
         } else {
             LOGGER.info("loading column families :" + 
existing.stream().map(String::new).collect(Collectors.toList()));
             managedColumnFamilies.addAll(
-                    
existing.stream().map(RocksDbImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
+                    
existing.stream().map(RocksStoreImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
         }
         return managedColumnFamilies;
     }
@@ -198,6 +200,21 @@ public class RocksDbImp implements Db {
         return results;
     }
 
+    @Override
+    public String getSplitter() {
+        return SPLITTER;
+    }
+
+    @Override
+    public String getUniqueKey() {
+        return UNIQUE_KEY;
+    }
+
+    @Override
+    public String replaceKeywords(String source) {
+        return source;
+    }
+
     @Override
     public void close() throws IOException {
         db.close();
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/StateSearchKey.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/StateSearchKey.java
similarity index 96%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/StateSearchKey.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/StateSearchKey.java
index e7472799c7..ea49a39cbb 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/StateSearchKey.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/StateSearchKey.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 /**
  * search key for state.
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/Store.java
similarity index 86%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/Store.java
index 8043c911fc..a0babfbc96 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/Store.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import java.io.Closeable;
 import java.util.List;
 
 /**
- * local storage for key/value.
+ * Store for task, instance and offset info
  */
-public interface Db extends Closeable {
+public interface Store extends Closeable {
 
     KeyValueEntity get(String key);
 
@@ -50,4 +50,10 @@ public interface Db extends Closeable {
      * @return list of k/v
      */
     List<KeyValueEntity> findAll(String prefix);
+
+    String getSplitter();
+
+    String getUniqueKey();
+
+    String replaceKeywords(String source);
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/TaskStore.java
similarity index 71%
rename from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/TaskStore.java
index 02c616ff83..649d3c38ae 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/TaskStore.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
@@ -27,25 +27,25 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * db interface for task profile.
+ * Store for task profile
  */
-public class TaskProfileDb {
+public class TaskStore {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskProfileDb.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskStore.class);
 
-    private final Db db;
+    private final Store store;
 
-    public TaskProfileDb(Db db) {
-        this.db = db;
+    public TaskStore(Store store) {
+        this.store = store;
     }
 
     /**
-     * get task list from db.
+     * get task list from task store.
      *
      * @return list of task
      */
     public List<TaskProfile> getTasks() {
-        List<KeyValueEntity> result = this.db.findAll(getKey());
+        List<KeyValueEntity> result = this.store.findAll(getKey());
         List<TaskProfile> taskList = new ArrayList<>();
         for (KeyValueEntity entity : result) {
             taskList.add(entity.getAsTaskProfile());
@@ -63,7 +63,7 @@ public class TaskProfileDb {
             String keyName = getKeyByTaskId(task.getTaskId());
             KeyValueEntity entity = new KeyValueEntity(keyName,
                     task.toJsonStr(), "");
-            db.put(entity);
+            store.put(entity);
         }
     }
 
@@ -73,7 +73,7 @@ public class TaskProfileDb {
      * @param taskId taskId
      */
     public TaskProfile getTask(String taskId) {
-        KeyValueEntity result = this.db.get(getKeyByTaskId(taskId));
+        KeyValueEntity result = this.store.get(getKeyByTaskId(taskId));
         if (result == null) {
             return null;
         }
@@ -84,14 +84,18 @@ public class TaskProfileDb {
      * delete task by id.
      */
     public void deleteTask(String taskId) {
-        db.remove(getKeyByTaskId(taskId));
+        store.remove(getKeyByTaskId(taskId));
     }
 
-    private String getKey() {
-        return CommonConstants.TASK_ID_PREFIX;
+    public String getKey() {
+        if (store.getUniqueKey().isEmpty()) {
+            return CommonConstants.TASK_ID_PREFIX;
+        } else {
+            return store.getUniqueKey() + store.getSplitter() + 
CommonConstants.TASK_ID_PREFIX;
+        }
     }
 
-    private String getKeyByTaskId(String taskId) {
-        return CommonConstants.TASK_ID_PREFIX + taskId;
+    public String getKeyByTaskId(String taskId) {
+        return getKey() + store.getSplitter() + taskId;
     }
 }
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
similarity index 76%
rename from 
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
rename to 
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
index 3e51c4a891..e9dd639b0a 100644
--- 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
 
 import org.apache.inlong.agent.AgentBaseTestsHelper;
 
@@ -26,33 +26,33 @@ import org.junit.Test;
 
 import java.io.IOException;
 
-public class TestRocksDbImp {
+public class TestRocksStoreImp {
 
-    private static RocksDbImp db;
+    private static RocksStoreImp store;
     private static AgentBaseTestsHelper helper;
 
     @BeforeClass
     public static void setup() throws Exception {
-        helper = new 
AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome();
-        db = new RocksDbImp("/localdb");
+        helper = new 
AgentBaseTestsHelper(TestRocksStoreImp.class.getName()).setupAgentHome();
+        store = new RocksStoreImp("/localdb");
     }
 
     @AfterClass
     public static void teardown() throws IOException {
-        db.close();
+        store.close();
         helper.teardownAgentHome();
     }
 
     @Test
     public void testKeyValueDB() {
         KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
-        db.put(entity);
-        KeyValueEntity ret = db.get("test1");
+        store.put(entity);
+        KeyValueEntity ret = store.get("test1");
         Assert.assertEquals("test1", ret.getKey());
         Assert.assertEquals("testA", ret.getJsonValue());
 
-        db.remove("test1");
-        ret = db.get("test1");
+        store.remove("test1");
+        ret = store.get("test1");
         Assert.assertNull(ret);
 
         StateSearchKey keys = StateSearchKey.SUCCESS;
@@ -61,17 +61,17 @@ public class TestRocksDbImp {
         entity1.setStateSearchKey(keys);
 
         entity.setJsonValue("testC");
-        db.put(entity);
-        KeyValueEntity newEntity = db.get("test1");
+        store.put(entity);
+        KeyValueEntity newEntity = store.get("test1");
         Assert.assertEquals("testC", newEntity.getJsonValue());
     }
 
     @Test
     public void testDeleteEntity() {
         KeyValueEntity entity = new KeyValueEntity("searchKey1", 
"searchResult1", "test");
-        db.put(entity);
-        db.remove("searchKey1");
-        KeyValueEntity entityResult = db.get("searchKey1");
+        store.put(entity);
+        store.remove("searchKey1");
+        KeyValueEntity entityResult = store.get("searchKey1");
         Assert.assertNull(entityResult);
     }
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 3c73f6696d..ca5247fe59 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -32,7 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 /**
- * Agent Manager, the bridge for job manager, task manager, db e.t.c it 
manages agent level operations and communicates
+ * Agent Manager, the bridge for task manager, task store e.t.c it manages 
agent level operations and communicates
  * with outside system.
  */
 public class AgentManager extends AbstractDaemon {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 0642e325a2..333c12a7d2 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -22,11 +22,11 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.InstanceDb;
-import org.apache.inlong.agent.db.TaskProfileDb;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
@@ -47,7 +47,7 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
 
 /**
  * handle the instance created by task, including add, delete, update etc.
- * the instance info is store in both db and memory.
+ * the instance info is saved in both store and memory.
  */
 public class InstanceManager extends AbstractDaemon {
 
@@ -57,10 +57,10 @@ public class InstanceManager extends AbstractDaemon {
     public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
     public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
     private long lastPrintTime = 0;
-    // instance in db
-    private final InstanceDb instanceDb;
-    private TaskProfileDb taskProfileDb;
-    private TaskProfile taskFromDb;
+    // instance in instance store
+    private final InstanceStore instanceStore;
+    private TaskStore taskStore;
+    private TaskProfile taskFromStore;
     // task in memory
     private final ConcurrentHashMap<String, Instance> instanceMap;
     // instance profile queue.
@@ -116,10 +116,10 @@ public class InstanceManager extends AbstractDaemon {
     /**
      * Init task manager.
      */
-    public InstanceManager(String taskId, int instanceLimit, Db basicDb, 
TaskProfileDb taskProfileDb) {
+    public InstanceManager(String taskId, int instanceLimit, Store basicStore, 
TaskStore taskStore) {
         this.taskId = taskId;
-        instanceDb = new InstanceDb(basicDb);
-        this.taskProfileDb = taskProfileDb;
+        instanceStore = new InstanceStore(basicStore);
+        this.taskStore = taskStore;
         this.agentConf = AgentConfiguration.getAgentConf();
         instanceMap = new ConcurrentHashMap<>();
         this.instanceLimit = instanceLimit;
@@ -130,8 +130,8 @@ public class InstanceManager extends AbstractDaemon {
         return taskId;
     }
 
-    public InstanceDb getInstanceDb() {
-        return instanceDb;
+    public InstanceStore getInstanceStore() {
+        return instanceStore;
     }
 
     public Instance getInstance(String instanceId) {
@@ -139,7 +139,7 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     public InstanceProfile getInstanceProfile(String instanceId) {
-        return instanceDb.getInstance(taskId, instanceId);
+        return instanceStore.getInstance(taskId, instanceId);
     }
 
     public boolean submitAction(InstanceAction action) {
@@ -163,9 +163,9 @@ public class InstanceManager extends AbstractDaemon {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
                     printInstanceState();
                     dealWithActionQueue(actionQueue);
-                    keepPaceWithDb();
-                    String inlongGroupId = taskFromDb.getInlongGroupId();
-                    String inlongStreamId = taskFromDb.getInlongStreamId();
+                    keepPaceWithStore();
+                    String inlongGroupId = taskFromStore.getInlongGroupId();
+                    String inlongStreamId = taskFromStore.getInlongStreamId();
                     
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, 
inlongStreamId,
                             AgentUtils.getCurrentTime(), 1, 1, auditVersion);
                 } catch (Throwable ex) {
@@ -181,64 +181,65 @@ public class InstanceManager extends AbstractDaemon {
     private void printInstanceState() {
         long currentTime = AgentUtils.getCurrentTime();
         if (currentTime - lastPrintTime > INSTANCE_PRINT_INTERVAL_MS) {
-            List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+            List<InstanceProfile> instances = 
instanceStore.getInstances(taskId);
             InstancePrintStat stat = new InstancePrintStat();
             for (int i = 0; i < instances.size(); i++) {
                 InstanceProfile instance = instances.get(i);
                 stat.stat(instance.getState());
             }
             LOGGER.info(
-                    "instanceManager running! taskId {} mem {} db total {} {} 
action count {}",
+                    "instanceManager running! taskId {} mem {} total {} {} 
action count {}",
                     taskId, instanceMap.size(), instances.size(), stat, 
actionQueue.size());
             lastPrintTime = currentTime;
         }
     }
 
-    private void keepPaceWithDb() {
-        traverseDbTasksToMemory();
-        traverseMemoryTasksToDb();
+    private void keepPaceWithStore() {
+        traverseStoreTasksToMemory();
+        traverseMemoryTasksToStore();
     }
 
-    private void traverseDbTasksToMemory() {
-        instanceDb.getInstances(taskId).forEach((profileFromDb) -> {
-            InstanceStateEnum dbState = profileFromDb.getState();
-            Instance instance = instanceMap.get(profileFromDb.getInstanceId());
-            switch (dbState) {
+    private void traverseStoreTasksToMemory() {
+        instanceStore.getInstances(taskId).forEach((profileFromStore) -> {
+            InstanceStateEnum storeState = profileFromStore.getState();
+            Instance instance = 
instanceMap.get(profileFromStore.getInstanceId());
+            switch (storeState) {
                 case DEFAULT: {
                     if (instance == null) {
-                        LOGGER.info("traverseDbTasksToMemory add instance to 
mem taskId {} instanceId {}",
-                                profileFromDb.getTaskId(), 
profileFromDb.getInstanceId());
-                        addToMemory(profileFromDb);
+                        LOGGER.info("traverseStoreTasksToMemory add instance 
to mem taskId {} instanceId {}",
+                                profileFromStore.getTaskId(), 
profileFromStore.getInstanceId());
+                        addToMemory(profileFromStore);
                     }
                     break;
                 }
                 case FINISHED:
                     DELETE: {
                         if (instance != null) {
-                            LOGGER.info("traverseDbTasksToMemory delete 
instance from mem taskId {} instanceId {}",
-                                    profileFromDb.getTaskId(), 
profileFromDb.getInstanceId());
-                            deleteFromMemory(profileFromDb.getInstanceId());
+                            LOGGER.info("traverseStoreTasksToMemory delete 
instance from mem taskId {} instanceId {}",
+                                    profileFromStore.getTaskId(), 
profileFromStore.getInstanceId());
+                            deleteFromMemory(profileFromStore.getInstanceId());
                         }
                         break;
                     }
                 default: {
-                    LOGGER.error("instance invalid state {} taskId {} 
instanceId {}", dbState,
-                            profileFromDb.getTaskId(),
-                            profileFromDb.getInstanceId());
+                    LOGGER.error("instance invalid state {} taskId {} 
instanceId {}", storeState,
+                            profileFromStore.getTaskId(),
+                            profileFromStore.getInstanceId());
                 }
             }
         });
     }
 
-    private void traverseMemoryTasksToDb() {
+    private void traverseMemoryTasksToStore() {
         instanceMap.values().forEach((instance) -> {
-            InstanceProfile profileFromDb = 
instanceDb.getInstance(instance.getTaskId(), instance.getInstanceId());
-            if (profileFromDb == null) {
+            InstanceProfile profileFromStore =
+                    instanceStore.getInstance(instance.getTaskId(), 
instance.getInstanceId());
+            if (profileFromStore == null) {
                 deleteFromMemory(instance.getInstanceId());
                 return;
             }
-            InstanceStateEnum stateFromDb = profileFromDb.getState();
-            if (stateFromDb != InstanceStateEnum.DEFAULT) {
+            InstanceStateEnum stateFromStore = profileFromStore.getState();
+            if (stateFromStore != InstanceStateEnum.DEFAULT) {
                 deleteFromMemory(instance.getInstanceId());
             }
             if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime() 
> INSTANCE_KEEP_ALIVE_MS) {
@@ -279,7 +280,7 @@ public class InstanceManager extends AbstractDaemon {
 
     @Override
     public void start() {
-        restoreFromDb();
+        restoreFromStore();
         submitWorker(coreThread());
     }
 
@@ -296,19 +297,19 @@ public class InstanceManager extends AbstractDaemon {
         }
     }
 
-    private void restoreFromDb() {
-        taskFromDb = taskProfileDb.getTask(taskId);
-        auditVersion = Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION));
-        List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
+    private void restoreFromStore() {
+        taskFromStore = taskStore.getTask(taskId);
+        auditVersion = Long.parseLong(taskFromStore.get(TASK_AUDIT_VERSION));
+        List<InstanceProfile> profileList = instanceStore.getInstances(taskId);
         profileList.forEach((profile) -> {
             InstanceStateEnum state = profile.getState();
             if (state == InstanceStateEnum.DEFAULT) {
-                LOGGER.info("instance restoreFromDb addToMem state {} taskId 
{} instanceId {}", state, taskId,
+                LOGGER.info("instance restoreFromStore addToMem state {} 
taskId {} instanceId {}", state, taskId,
                         profile.getInstanceId());
                 profile.setBoolean(RESTORE_FROM_DB, true);
                 addToMemory(profile);
             } else {
-                LOGGER.info("instance restoreFromDb ignore state {} taskId {} 
instanceId {}", state, taskId,
+                LOGGER.info("instance restoreFromStore ignore state {} taskId 
{} instanceId {}", state, taskId,
                         profile.getInstanceId());
             }
         });
@@ -325,31 +326,31 @@ public class InstanceManager extends AbstractDaemon {
                     profile.getInstanceId());
             return;
         }
-        addToDb(profile, true);
+        addToStore(profile, true);
         addToMemory(profile);
     }
 
     private void finishInstance(InstanceProfile profile) {
         profile.setState(InstanceStateEnum.FINISHED);
         profile.setModifyTime(AgentUtils.getCurrentTime());
-        addToDb(profile, false);
+        addToStore(profile, false);
         deleteFromMemory(profile.getInstanceId());
         LOGGER.info("finished instance state {} taskId {} instanceId {}", 
profile.getState(),
                 profile.getTaskId(), profile.getInstanceId());
     }
 
     private void deleteInstance(String instanceId) {
-        deleteFromDb(instanceId);
+        deleteFromStore(instanceId);
         deleteFromMemory(instanceId);
     }
 
-    private void deleteFromDb(String instanceId) {
-        InstanceProfile profile = instanceDb.getInstance(taskId, instanceId);
+    private void deleteFromStore(String instanceId) {
+        InstanceProfile profile = instanceStore.getInstance(taskId, 
instanceId);
         String inlongGroupId = profile.getInlongGroupId();
         String inlongStreamId = profile.getInlongStreamId();
-        instanceDb.deleteInstance(taskId, instanceId);
-        LOGGER.info("delete instance from db: taskId {} instanceId {} result 
{}", taskId,
-                instanceId, instanceDb.getInstance(taskId, instanceId));
+        instanceStore.deleteInstance(taskId, instanceId);
+        LOGGER.info("delete instance from instance store: taskId {} instanceId 
{} result {}", taskId,
+                instanceId, instanceStore.getInstance(taskId, instanceId));
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, 
inlongGroupId, inlongStreamId,
                 profile.getSinkDataTime(), 1, 1, auditVersion);
     }
@@ -370,9 +371,10 @@ public class InstanceManager extends AbstractDaemon {
                 instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
     }
 
-    private void addToDb(InstanceProfile profile, boolean addNew) {
-        LOGGER.info("add instance to db state {} instanceId {}", 
profile.getState(), profile.getInstanceId());
-        instanceDb.storeInstance(profile);
+    private void addToStore(InstanceProfile profile, boolean addNew) {
+        LOGGER.info("add instance to instance store state {} instanceId {}", 
profile.getState(),
+                profile.getInstanceId());
+        instanceStore.storeInstance(profile);
         if (addNew) {
             String inlongGroupId = profile.getInlongGroupId();
             String inlongStreamId = profile.getInlongStreamId();
@@ -434,13 +436,13 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     public boolean shouldAddAgain(String fileName, long lastModifyTime) {
-        InstanceProfile profileFromDb = instanceDb.getInstance(taskId, 
fileName);
-        if (profileFromDb == null) {
-            LOGGER.debug("not in db should add {}", fileName);
+        InstanceProfile profileFromStore = instanceStore.getInstance(taskId, 
fileName);
+        if (profileFromStore == null) {
+            LOGGER.debug("not in instance store should add {}", fileName);
             return true;
         } else {
-            InstanceStateEnum state = profileFromDb.getState();
-            if (state == InstanceStateEnum.FINISHED && lastModifyTime > 
profileFromDb.getModifyTime()) {
+            InstanceStateEnum state = profileFromStore.getState();
+            if (state == InstanceStateEnum.FINISHED && lastModifyTime > 
profileFromStore.getModifyTime()) {
                 LOGGER.debug("finished but file update again {}", fileName);
                 return true;
             }
@@ -466,7 +468,7 @@ public class InstanceManager extends AbstractDaemon {
         if (!actionQueue.isEmpty()) {
             return false;
         }
-        List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+        List<InstanceProfile> instances = instanceStore.getInstances(taskId);
         for (int i = 0; i < instances.size(); i++) {
             InstanceProfile profile = instances.get(i);
             if (profile.getState() != InstanceStateEnum.FINISHED) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 55b2ae3e4f..024a846e39 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -22,11 +22,11 @@ import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.InstanceDb;
-import org.apache.inlong.agent.db.OffsetDb;
-import org.apache.inlong.agent.db.TaskProfileDb;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.OffsetStore;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
 
 /**
- * used to store instance offset to db
+ * used to save instance offset to offset store
  * where key is task id + read file name and value is instance offset
  */
 public class OffsetManager extends AbstractDaemon {
@@ -53,14 +53,16 @@ public class OffsetManager extends AbstractDaemon {
     public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100;
     public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
     private static volatile OffsetManager offsetManager = null;
-    private final OffsetDb offsetDb;
-    private final InstanceDb instanceDb;
-    private final TaskProfileDb taskProfileDb;
-
-    private OffsetManager(Db taskBasicDb, Db instanceBasicDb, Db 
offsetBasicDb) {
-        taskProfileDb = new TaskProfileDb(taskBasicDb);
-        instanceDb = new InstanceDb(instanceBasicDb);
-        offsetDb = new OffsetDb(offsetBasicDb);
+    private final OffsetStore offsetStore;
+    private final InstanceStore instanceStore;
+    private final TaskStore taskStore;
+
+    private OffsetManager(
+            Store taskBasicStore, Store instanceBasicStore,
+            Store offsetBasicStore) {
+        taskStore = new TaskStore(taskBasicStore);
+        instanceStore = new InstanceStore(instanceBasicStore);
+        offsetStore = new OffsetStore(offsetBasicStore);
     }
 
     /**
@@ -87,11 +89,14 @@ public class OffsetManager extends AbstractDaemon {
     /**
      * task position manager singleton, can only generated by agent manager
      */
-    public static void init(Db taskBasicDb, Db instanceBasicDb, Db 
offsetBasicDb) {
+    public static void init(
+            Store taskBasicStore, Store instanceBasicStore,
+            Store offsetBasicStore) {
         if (offsetManager == null) {
             synchronized (OffsetManager.class) {
                 if (offsetManager == null) {
-                    offsetManager = new OffsetManager(taskBasicDb, 
instanceBasicDb, offsetBasicDb);
+                    offsetManager = new OffsetManager(taskBasicStore, 
instanceBasicStore,
+                            offsetBasicStore);
                 }
             }
         }
@@ -108,23 +113,23 @@ public class OffsetManager extends AbstractDaemon {
     }
 
     public void setOffset(OffsetProfile profile) {
-        offsetDb.setOffset(profile);
+        offsetStore.setOffset(profile);
     }
 
     public void deleteOffset(String taskId, String instanceId) {
-        offsetDb.deleteOffset(taskId, instanceId);
+        offsetStore.deleteOffset(taskId, instanceId);
     }
 
     public OffsetProfile getOffset(String taskId, String instanceId) {
-        return offsetDb.getOffset(taskId, instanceId);
+        return offsetStore.getOffset(taskId, instanceId);
     }
 
     private void cleanDbOffset() {
-        List<OffsetProfile> offsets = offsetDb.listAllOffsets();
+        List<OffsetProfile> offsets = offsetStore.listAllOffsets();
         offsets.forEach(offset -> {
             String taskId = offset.getTaskId();
             String instanceId = offset.getInstanceId();
-            InstanceProfile instanceProfile = instanceDb.getInstance(taskId, 
instanceId);
+            InstanceProfile instanceProfile = 
instanceStore.getInstance(taskId, instanceId);
             if (instanceProfile == null) {
                 deleteOffset(taskId, instanceId);
                 LOGGER.info("instance not found, delete offset taskId {} 
instanceId {}", taskId,
@@ -136,7 +141,7 @@ public class OffsetManager extends AbstractDaemon {
 
     private void cleanDbInstance() {
         AtomicInteger cleanCount = new AtomicInteger();
-        Iterator<InstanceProfile> iterator = 
instanceDb.listAllInstances().listIterator();
+        Iterator<InstanceProfile> iterator = 
instanceStore.listAllInstances().listIterator();
         while (iterator.hasNext()) {
             if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
                 return;
@@ -147,7 +152,7 @@ public class OffsetManager extends AbstractDaemon {
             if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
                 continue;
             }
-            TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
+            TaskProfile taskFromDb = taskStore.getTask(taskId);
             if (taskFromDb != null) {
                 if 
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
                     continue;
@@ -166,9 +171,9 @@ public class OffsetManager extends AbstractDaemon {
                     DB_INSTANCE_EXPIRE_CYCLE_COUNT + 
instanceFromDb.getCycleUnit());
             if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > 
expireTime) {
                 cleanCount.getAndIncrement();
-                LOGGER.info("instance has expired, delete from db dataTime {} 
taskId {} instanceId {}",
+                LOGGER.info("instance has expired, delete from instance store 
dataTime {} taskId {} instanceId {}",
                         instanceFromDb.getSourceDataTime(), taskId, 
instanceId);
-                instanceDb.deleteInstance(taskId, instanceId);
+                instanceStore.deleteInstance(taskId, instanceId);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, 
instanceFromDb.getInlongGroupId(),
                         instanceFromDb.getInlongStreamId(), 
instanceFromDb.getSinkDataTime(), 1, 1,
                         
Long.parseLong(instanceFromDb.get(TASK_AUDIT_VERSION)));
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 58bd274c8b..5bbdf43432 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -22,11 +22,11 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.RocksDbImp;
-import org.apache.inlong.agent.db.TaskProfileDb;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.store.RocksStoreImp;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
@@ -50,7 +50,7 @@ import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
 
 /**
  * handle the task config from manager, including add, delete, update etc.
- * the task config is store in both db and memory.
+ * the task config is store in both task store and memory.
  */
 public class TaskManager extends AbstractDaemon {
 
@@ -60,14 +60,14 @@ public class TaskManager extends AbstractDaemon {
     public static final int CORE_THREAD_PRINT_TIME = 10000;
     private static final int ACTION_QUEUE_CAPACITY = 1000;
     private long lastPrintTime = 0;
-    // task basic db
-    private final Db taskBasicDb;
-    // instance basic db
-    private final Db instanceBasicDb;
-    // offset basic db
-    private final Db offsetBasicDb;
-    // task in db
-    private final TaskProfileDb taskDb;
+    // task basic store
+    private final Store taskBasicStore;
+    // instance basic store
+    private final Store instanceBasicStore;
+    // offset basic store
+    private final Store offsetBasicStore;
+    // task in task store
+    private final TaskStore taskStore;
     // task in memory
     private final ConcurrentHashMap<String, Task> taskMap;
     // task config from manager.
@@ -125,14 +125,14 @@ public class TaskManager extends AbstractDaemon {
      */
     public TaskManager() {
         this.agentConf = AgentConfiguration.getAgentConf();
-        taskBasicDb = initDb(
+        taskBasicStore = initStore(
                 agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_TASK));
-        taskDb = new TaskProfileDb(taskBasicDb);
-        instanceBasicDb = initDb(
+        taskStore = new TaskStore(taskBasicStore);
+        instanceBasicStore = initStore(
                 agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
-        offsetBasicDb =
-                initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
-        OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
+        offsetBasicStore =
+                initStore(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+        OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
         this.runningPool = new ThreadPoolExecutor(
                 0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
@@ -145,22 +145,22 @@ public class TaskManager extends AbstractDaemon {
         actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
     }
 
-    public TaskProfileDb getTaskDb() {
-        return taskDb;
+    public TaskStore getTaskStore() {
+        return taskStore;
     }
 
-    public Db getInstanceBasicDb() {
-        return instanceBasicDb;
+    public Store getInstanceBasicStore() {
+        return instanceBasicStore;
     }
 
     /**
-     * init db by class name
+     * init store by class name
      *
-     * @return db
+     * @return store
      */
-    public static Db initDb(String childPath) {
+    public static Store initStore(String childPath) {
         try {
-            return new RocksDbImp(childPath);
+            return new RocksStoreImp(childPath);
         } catch (Exception ex) {
             throw new UnsupportedClassVersionError(ex.getMessage());
         }
@@ -213,13 +213,13 @@ public class TaskManager extends AbstractDaemon {
 
     private void printTaskDetail() {
         if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
-            List<TaskProfile> tasksInDb = taskDb.getTasks();
+            List<TaskProfile> tasksInStore = taskStore.getTasks();
             TaskPrintStat stat = new TaskPrintStat();
-            for (int i = 0; i < tasksInDb.size(); i++) {
-                TaskProfile task = tasksInDb.get(i);
+            for (int i = 0; i < tasksInStore.size(); i++) {
+                TaskProfile task = tasksInStore.get(i);
                 stat.stat(task.getState());
             }
-            LOGGER.info("taskManager running! mem {} db total {} {} ", 
taskMap.size(), tasksInDb.size(), stat);
+            LOGGER.info("taskManager running! mem {} total {} {} ", 
taskMap.size(), tasksInStore.size(), stat);
             lastPrintTime = AgentUtils.getCurrentTime();
         }
     }
@@ -230,7 +230,7 @@ public class TaskManager extends AbstractDaemon {
             return;
         }
         keepPaceWithManager(dataConfigs);
-        keepPaceWithDb();
+        keepPaceWithStore();
     }
 
     private void dealWithActionQueue(BlockingQueue<TaskAction> queue) {
@@ -271,48 +271,48 @@ public class TaskManager extends AbstractDaemon {
                 LOGGER.error("task {} invalid task state {}", profile, state);
             }
         });
-        traverseManagerTasksToDb(tasksFromManager);
-        traverseDbTasksToManager(tasksFromManager);
+        traverseManagerTasksToStore(tasksFromManager);
+        traverseStoreTasksToManager(tasksFromManager);
     }
 
     /**
-     * keep pace with task in db
+     * keep pace with task in task store
      */
-    private void keepPaceWithDb() {
-        traverseDbTasksToMemory();
-        traverseMemoryTasksToDb();
+    private void keepPaceWithStore() {
+        traverseStoreTasksToMemory();
+        traverseMemoryTasksToStore();
     }
 
     /**
-     * keep pace with task in db
+     * keep pace with task in task store
      */
-    private void traverseManagerTasksToDb(Map<String, TaskProfile> 
tasksFromManager) {
+    private void traverseManagerTasksToStore(Map<String, TaskProfile> 
tasksFromManager) {
         tasksFromManager.values().forEach((profileFromManager) -> {
-            TaskProfile taskFromDb = 
taskDb.getTask(profileFromManager.getTaskId());
-            if (taskFromDb == null) {
-                LOGGER.info("traverseManagerTasksToDb task {} not found in db 
retry {} state {}, add it",
+            TaskProfile taskFromStore = 
taskStore.getTask(profileFromManager.getTaskId());
+            if (taskFromStore == null) {
+                LOGGER.info("traverseManagerTasksToStore task {} not found in 
task store retry {} state {}, add it",
                         profileFromManager.getTaskId(),
                         profileFromManager.isRetry(), 
profileFromManager.getState());
                 addTask(profileFromManager);
             } else {
                 TaskStateEnum managerState = profileFromManager.getState();
-                TaskStateEnum dbState = taskFromDb.getState();
-                if (managerState == dbState) {
+                TaskStateEnum storeState = taskFromStore.getState();
+                if (managerState == storeState) {
                     return;
                 }
-                if (dbState == TaskStateEnum.RETRY_FINISH) {
-                    LOGGER.info("traverseManagerTasksToDb task {} dbState {} 
retry {}, do nothing",
-                            taskFromDb.getTaskId(), dbState,
-                            taskFromDb.isRetry());
+                if (storeState == TaskStateEnum.RETRY_FINISH) {
+                    LOGGER.info("traverseManagerTasksToStore task {} 
storeState {} retry {}, do nothing",
+                            taskFromStore.getTaskId(), storeState,
+                            taskFromStore.isRetry());
                     return;
                 }
                 if (managerState == TaskStateEnum.RUNNING) {
-                    LOGGER.info("traverseManagerTasksToDb task {} dbState {} 
retry {}, active it",
-                            taskFromDb.getTaskId(), dbState, 
taskFromDb.isRetry());
+                    LOGGER.info("traverseManagerTasksToStore task {} 
storeState {} retry {}, active it",
+                            taskFromStore.getTaskId(), storeState, 
taskFromStore.isRetry());
                     activeTask(profileFromManager);
                 } else {
-                    LOGGER.info("traverseManagerTasksToDb task {} dbState {} 
retry {}, freeze it",
-                            taskFromDb.getTaskId(), dbState, 
taskFromDb.isRetry());
+                    LOGGER.info("traverseManagerTasksToStore task {} 
storeState {} retry {}, freeze it",
+                            taskFromStore.getTaskId(), storeState, 
taskFromStore.isRetry());
                     freezeTask(profileFromManager);
                 }
             }
@@ -320,13 +320,13 @@ public class TaskManager extends AbstractDaemon {
     }
 
     /**
-     * traverse tasks in db, if not found in tasks from manager then delete it
+     * traverse tasks in task store, if not found in tasks from manager then 
delete it
      */
-    private void traverseDbTasksToManager(Map<String, TaskProfile> 
tasksFromManager) {
-        taskDb.getTasks().forEach((profileFromDb) -> {
-            if (!tasksFromManager.containsKey(profileFromDb.getTaskId())) {
-                LOGGER.info("traverseDbTasksToManager try to delete task {}", 
profileFromDb.getTaskId());
-                deleteTask(profileFromDb);
+    private void traverseStoreTasksToManager(Map<String, TaskProfile> 
tasksFromManager) {
+        taskStore.getTasks().forEach((profileFromStore) -> {
+            if (!tasksFromManager.containsKey(profileFromStore.getTaskId())) {
+                LOGGER.info("traverseStoreTasksToManager try to delete task 
{}", profileFromStore.getTaskId());
+                deleteTask(profileFromStore);
             }
         });
     }
@@ -335,49 +335,49 @@ public class TaskManager extends AbstractDaemon {
      * manager task state is RUNNING and taskMap not found then add
      * manager task state is FROZE and taskMap found thrn delete
      */
-    private void traverseDbTasksToMemory() {
-        taskDb.getTasks().forEach((profileFromDb) -> {
-            TaskStateEnum dbState = profileFromDb.getState();
-            Task task = taskMap.get(profileFromDb.getTaskId());
-            if (dbState == TaskStateEnum.RUNNING) {
+    private void traverseStoreTasksToMemory() {
+        taskStore.getTasks().forEach((profileFromStore) -> {
+            TaskStateEnum storeState = profileFromStore.getState();
+            Task task = taskMap.get(profileFromStore.getTaskId());
+            if (storeState == TaskStateEnum.RUNNING) {
                 if (task == null) {
-                    LOGGER.info("traverseDbTasksToMemory add task to mem 
taskId {}", profileFromDb.getTaskId());
-                    addToMemory(profileFromDb);
+                    LOGGER.info("traverseStoreTasksToMemory add task to mem 
taskId {}", profileFromStore.getTaskId());
+                    addToMemory(profileFromStore);
                 }
-            } else if (dbState == TaskStateEnum.FROZEN) {
+            } else if (storeState == TaskStateEnum.FROZEN) {
                 if (task != null) {
-                    LOGGER.info("traverseDbTasksToMemory delete task from mem 
taskId {}",
-                            profileFromDb.getTaskId());
-                    deleteFromMemory(profileFromDb.getTaskId());
+                    LOGGER.info("traverseStoreTasksToMemory delete task from 
mem taskId {}",
+                            profileFromStore.getTaskId());
+                    deleteFromMemory(profileFromStore.getTaskId());
                 }
             } else {
-                if (dbState != TaskStateEnum.RETRY_FINISH) {
-                    LOGGER.error("task {} invalid state {}", 
profileFromDb.getTaskId(), dbState);
+                if (storeState != TaskStateEnum.RETRY_FINISH) {
+                    LOGGER.error("task {} invalid state {}", 
profileFromStore.getTaskId(), storeState);
                 }
             }
         });
     }
 
     /**
-     * task in taskMap but not in taskDb then delete
-     * task in taskMap but task state from db is FROZEN then delete
+     * task in taskMap but not in task store then delete
+     * task in taskMap but task state from task store is FROZEN then delete
      */
-    private void traverseMemoryTasksToDb() {
+    private void traverseMemoryTasksToStore() {
         taskMap.values().forEach((task) -> {
-            TaskProfile profileFromDb = taskDb.getTask(task.getTaskId());
-            if (profileFromDb == null) {
+            TaskProfile profileFromStore = taskStore.getTask(task.getTaskId());
+            if (profileFromStore == null) {
                 deleteFromMemory(task.getTaskId());
                 return;
             }
-            TaskStateEnum stateFromDb = profileFromDb.getState();
-            if (stateFromDb != TaskStateEnum.RUNNING) {
+            TaskStateEnum stateFromStore = profileFromStore.getState();
+            if (stateFromStore != TaskStateEnum.RUNNING) {
                 deleteFromMemory(task.getTaskId());
             }
         });
     }
 
     /**
-     * add task profile to db
+     * add task profile to task store
      * if task state is RUNNING then add task to memory
      */
     private void addTask(TaskProfile taskProfile) {
@@ -389,7 +389,7 @@ public class TaskManager extends AbstractDaemon {
             LOGGER.error("task profile invalid {}", taskProfile.toJsonStr());
             return;
         }
-        addToDb(taskProfile);
+        addToStore(taskProfile);
         TaskStateEnum state = 
TaskStateEnum.getTaskState(taskProfile.getInt(TASK_STATE));
         if (state == TaskStateEnum.RUNNING) {
             addToMemory(taskProfile);
@@ -400,37 +400,37 @@ public class TaskManager extends AbstractDaemon {
     }
 
     private void deleteTask(TaskProfile taskProfile) {
-        deleteFromDb(taskProfile);
+        deleteFromStore(taskProfile);
         deleteFromMemory(taskProfile.getTaskId());
     }
 
     private void freezeTask(TaskProfile taskProfile) {
-        updateToDb(taskProfile);
+        updateToStore(taskProfile);
         deleteFromMemory(taskProfile.getTaskId());
     }
 
     private void finishTask(TaskProfile taskProfile) {
         taskProfile.setState(TaskStateEnum.RETRY_FINISH);
-        updateToDb(taskProfile);
+        updateToStore(taskProfile);
         deleteFromMemory(taskProfile.getTaskId());
     }
 
     private void activeTask(TaskProfile taskProfile) {
-        updateToDb(taskProfile);
+        updateToStore(taskProfile);
         addToMemory(taskProfile);
     }
 
-    private void restoreFromDb() {
-        LOGGER.info("restore from db start");
-        List<TaskProfile> taskProfileList = taskDb.getTasks();
+    private void restoreFromStore() {
+        LOGGER.info("restore from task store start");
+        List<TaskProfile> taskProfileList = taskStore.getTasks();
         taskProfileList.forEach((profile) -> {
             if (profile.getState() == TaskStateEnum.RUNNING) {
-                LOGGER.info("restore from db taskId {}", profile.getTaskId());
+                LOGGER.info("restore from task store taskId {}", 
profile.getTaskId());
                 profile.setBoolean(RESTORE_FROM_DB, true);
                 addToMemory(profile);
             }
         });
-        LOGGER.info("restore from db end");
+        LOGGER.info("restore from task store end");
     }
 
     private void stopAllTasks() {
@@ -452,30 +452,30 @@ public class TaskManager extends AbstractDaemon {
     }
 
     /**
-     * add task to db, it was expected that there is no record refer the task 
id.
+     * add task to task store, it was expected that there is no record refer 
the task id.
      * cause the task id will change if the task content changes, replace the 
record
-     * if it is found, the memory record will be updated by the db.
+     * if it is found, the memory record will be updated by the task store.
      */
-    private void addToDb(TaskProfile taskProfile) {
-        if (taskDb.getTask(taskProfile.getTaskId()) != null) {
+    private void addToStore(TaskProfile taskProfile) {
+        if (taskStore.getTask(taskProfile.getTaskId()) != null) {
             LOGGER.error("task {} should not exist", taskProfile.getTaskId());
         }
-        taskDb.storeTask(taskProfile);
+        taskStore.storeTask(taskProfile);
     }
 
-    private void deleteFromDb(TaskProfile taskProfile) {
-        if (taskDb.getTask(taskProfile.getTaskId()) == null) {
-            LOGGER.error("try to delete task {} but not found in db", 
taskProfile);
+    private void deleteFromStore(TaskProfile taskProfile) {
+        if (taskStore.getTask(taskProfile.getTaskId()) == null) {
+            LOGGER.error("try to delete task {} but not found in task store", 
taskProfile);
             return;
         }
-        taskDb.deleteTask(taskProfile.getTaskId());
+        taskStore.deleteTask(taskProfile.getTaskId());
     }
 
-    private void updateToDb(TaskProfile taskProfile) {
-        if (taskDb.getTask(taskProfile.getTaskId()) == null) {
+    private void updateToStore(TaskProfile taskProfile) {
+        if (taskStore.getTask(taskProfile.getTaskId()) == null) {
             LOGGER.error("task {} not found, agent may have been reinstalled", 
taskProfile);
         }
-        taskDb.storeTask(taskProfile);
+        taskStore.storeTask(taskProfile);
     }
 
     /**
@@ -492,7 +492,7 @@ public class TaskManager extends AbstractDaemon {
         try {
             Class<?> taskClass = Class.forName(taskProfile.getTaskClass());
             Task task = (Task) taskClass.newInstance();
-            task.init(this, taskProfile, instanceBasicDb);
+            task.init(this, taskProfile, instanceBasicStore);
             taskMap.put(taskProfile.getTaskId(), task);
             runningPool.submit(task);
             LOGGER.info(
@@ -523,12 +523,12 @@ public class TaskManager extends AbstractDaemon {
     }
 
     public TaskProfile getTaskProfile(String taskId) {
-        return taskDb.getTask(taskId);
+        return taskStore.getTask(taskId);
     }
 
     @Override
     public void start() throws Exception {
-        restoreFromDb();
+        restoreFromStore();
         submitWorker(coreThread());
         OffsetManager.getInstance().start();
     }
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 7c21666a9e..f45e4c55e0 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -22,9 +22,9 @@ import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.AgentBaseTestsHelper;
 import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.InstanceDb;
-import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
@@ -54,12 +54,12 @@ public class TestInstanceManager {
     public static void setup() {
         helper = new 
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
-        Db basicDb = TaskManager.initDb("/localdb");
+        Store basicStore = TaskManager.initStore("/localdb");
         taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING, "GMT+6:00");
-        Db taskBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
-        TaskProfileDb taskDb = new TaskProfileDb(taskBasicDb);
-        taskDb.storeTask(taskProfile);
-        manager = new InstanceManager("1", 20, basicDb, taskDb);
+        Store taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+        TaskStore taskStore = new TaskStore(taskBasicStore);
+        taskStore.storeTask(taskProfile);
+        manager = new InstanceManager("1", 20, basicStore, taskStore);
         manager.CORE_THREAD_SLEEP_TIME_MS = 100;
     }
 
@@ -71,12 +71,12 @@ public class TestInstanceManager {
 
     @Test
     public void testInstanceManager() {
-        InstanceDb instanceDb = manager.getInstanceDb();
+        InstanceStore instanceStore = manager.getInstanceStore();
         for (int i = 1; i <= 10; i++) {
             InstanceProfile profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
                     String.valueOf(i), taskProfile.getCycleUnit(), 
"2023092710",
                     AgentUtils.getCurrentTime());
-            instanceDb.storeInstance(profile);
+            instanceStore.storeInstance(profile);
         }
         manager.start();
         for (int i = 1; i <= 10; i++) {
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
index 03f2b3b8fa..a5a572dc65 100644
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.agent.core.task;
 
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.store.Store;
 
 import java.io.IOException;
 
@@ -36,7 +36,7 @@ public class MockTask extends Task {
     private TaskManager manager;
 
     @Override
-    public void init(Object srcManager, TaskProfile profile, Db basicDb) 
throws IOException {
+    public void init(Object srcManager, TaskProfile profile, Store basicStore) 
throws IOException {
         manager = (TaskManager) srcManager;
         this.profile = profile;
     }
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
index 11cc413ed1..61c255bbdf 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.core.task;
 
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.core.AgentBaseTestsHelper;
-import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
 import org.junit.AfterClass;
@@ -55,12 +55,12 @@ public class TestTaskManager {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
         try {
             manager = new TaskManager();
-            TaskProfileDb taskProfileDb = manager.getTaskDb();
+            TaskStore taskStore = manager.getTaskStore();
             for (int i = 1; i <= 10; i++) {
                 TaskProfile taskProfile = helper.getTaskProfile(i, pattern, 
false, 0L, 0L, TaskStateEnum.RUNNING,
                         "GMT+8:00");
                 taskProfile.setTaskClass(MockTask.class.getCanonicalName());
-                taskProfileDb.storeTask(taskProfile);
+                taskStore.storeTask(taskProfile);
             }
             manager.start();
             for (int i = 1; i <= 10; i++) {
diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
index d89b342c8d..cd1317c6da 100755
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Installer Manager, the bridge for job manager, task manager, db e.t.c it 
manages agent level operations and
+ * Installer Manager, the bridge for job manager, task manager, store e.t.c it 
manages agent level operations and
  * communicates with outside system.
  */
 public class Manager extends AbstractDaemon {
diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index cd693436bf..c79d31f753 100755
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -70,8 +70,7 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MA
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
 
 /**
- * Installer Manager, the bridge for job manager, task manager, db e.t.c it 
manages agent level operations and
- * communicates with outside system.
+ * module manager, deal with module add, delete and modify
  */
 public class ModuleManager extends AbstractDaemon {
 
diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
index a9b3fa9bf8..e26a9e24bc 100644
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 
 /**
  * Installer configuration. Only one instance in the process.
- * Basically it use properties file to store configurations.
+ * Basically it use properties file to save configurations.
  */
 public class InstallerConfiguration extends AbstractConfiguration {
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index c187c4d143..376200e475 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -155,7 +155,7 @@ public class LogFileSource extends AbstractSource {
                         fileName);
                 offset = 0;
             } else {
-                LOGGER.info("getInitLineOffset inode no change taskId {} from 
db {}, file {}", taskId, offset,
+                LOGGER.info("getInitLineOffset inode no change taskId {} from 
offset store {}, file {}", taskId, offset,
                         fileName);
             }
         } else {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 2de853b596..c463543bf5 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -23,10 +23,10 @@ import org.apache.inlong.agent.core.instance.ActionType;
 import org.apache.inlong.agent.core.instance.InstanceAction;
 import org.apache.inlong.agent.core.instance.InstanceManager;
 import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.file.Task;
 import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.store.Store;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
@@ -44,7 +44,7 @@ public abstract class AbstractTask extends Task {
     public static final int CORE_THREAD_PRINT_TIME = 10000;
     protected static final int DEFAULT_INSTANCE_LIMIT = 1;
     protected TaskProfile taskProfile;
-    protected Db basicDb;
+    protected Store basicStore;
     protected TaskManager taskManager;
     private InstanceManager instanceManager;
     protected volatile boolean running = false;
@@ -53,13 +53,13 @@ public abstract class AbstractTask extends Task {
     protected long auditVersion;
 
     @Override
-    public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) 
throws IOException {
+    public void init(Object srcManager, TaskProfile taskProfile, Store 
basicStore) throws IOException {
         taskManager = (TaskManager) srcManager;
         this.taskProfile = taskProfile;
-        this.basicDb = basicDb;
+        this.basicStore = basicStore;
         auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
         instanceManager = new InstanceManager(taskProfile.getTaskId(), 
getInstanceLimit(),
-                basicDb, taskManager.getTaskDb());
+                basicStore, taskManager.getTaskStore());
         try {
             instanceManager.start();
         } catch (Exception e) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
index fad0f98f2a..8f333b8d46 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.agent.plugin.task;
 
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.store.Store;
 
 /**
  * Generate job by crontab expression.
@@ -27,7 +27,7 @@ import org.apache.inlong.agent.plugin.file.Task;
 public class CronTask extends Task {
 
     @Override
-    public void init(Object srcManager, TaskProfile profile, Db basicDb) {
+    public void init(Object srcManager, TaskProfile profile, Store basicStore) 
{
 
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
index 14a86dba65..740d2364aa 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
@@ -21,9 +21,9 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.RocksDbImp;
-import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.store.RocksStoreImp;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
 
 import java.util.List;
 
@@ -31,13 +31,13 @@ public class RocksDBUtils {
 
     public static void main(String[] args) {
         AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
-        Db db = new RocksDbImp(
+        Store store = new RocksStoreImp(
                 agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH));
-        upgrade(db);
+        upgrade(store);
     }
 
-    public static void upgrade(Db db) {
-        TaskProfileDb triggerProfileDb = new TaskProfileDb(db);
+    public static void upgrade(Store store) {
+        TaskStore triggerProfileDb = new TaskStore(store);
         List<TaskProfile> allTaskProfiles = triggerProfileDb.getTasks();
         allTaskProfiles.forEach(triggerProfile -> {
             if (triggerProfile.hasKey(TaskConstants.TASK_DIR_FILTER_PATTERN)) {
@@ -50,6 +50,6 @@ public class RocksDBUtils {
         });
     }
 
-    public static void printTrigger(Db db) {
+    public static void printTrigger(Store store) {
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 1cd3a2b927..70ab3170f9 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -25,10 +25,10 @@ import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.store.Store;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
@@ -57,21 +57,21 @@ public class TestLogFileSource {
     private static final Gson GSON = new Gson();
     private static final String[] check = {"hello line-end-symbol aa", "world 
line-end-symbol",
             "agent line-end-symbol"};
-    // task basic db
-    private static Db taskBasicDb;
-    // instance basic db
-    private static Db instanceBasicDb;
-    // offset basic db
-    private static Db offsetBasicDb;
+    // task basic store
+    private static Store taskBasicStore;
+    // instance basic store
+    private static Store instanceBasicStore;
+    // offset basic store
+    private static Store offsetBasicStore;
 
     @BeforeClass
     public static void setup() {
         helper = new 
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
-        taskBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
-        instanceBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
-        offsetBasicDb =
-                TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
-        OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
+        taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+        instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
+        offsetBasicStore =
+                
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
+        OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
     }
 
     private LogFileSource getSource(int taskId, long offset) {
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index b538c69f61..c1b8fe5c0b 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -95,8 +95,8 @@ public class TestLogFileTask {
                 return null;
             }).when(task, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
             Assert.assertTrue(task.isProfileValid(taskProfile));
-            manager.getTaskDb().storeTask(taskProfile);
-            task.init(manager, taskProfile, manager.getInstanceBasicDb());
+            manager.getTaskStore().storeTask(taskProfile);
+            task.init(manager, taskProfile, manager.getInstanceBasicStore());
             EXECUTOR_SERVICE.submit(task);
         } catch (Exception e) {
             LOGGER.error("source init error {}", e);

Reply via email to