This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a0f123b676 [INLONG-10384][Agent] Add functions to the Store interface
to extend new storage plugins (#10385)
a0f123b676 is described below
commit a0f123b676691a24aa01433d621a9406d7601d13
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Jun 12 17:33:14 2024 +0800
[INLONG-10384][Agent] Add functions to the Store interface to extend new
storage plugins (#10385)
---
.../org/apache/inlong/agent/plugin/file/Task.java | 4 +-
.../InstanceDb.java => store/InstanceStore.java} | 45 +++--
.../inlong/agent/{db => store}/KeyValueEntity.java | 2 +-
.../inlong/agent/{db => store}/LocalProfile.java | 2 +-
.../{db/OffsetDb.java => store/OffsetStore.java} | 46 ++---
.../RocksDbImp.java => store/RocksStoreImp.java} | 29 ++-
.../inlong/agent/{db => store}/StateSearchKey.java | 2 +-
.../inlong/agent/{db/Db.java => store/Store.java} | 12 +-
.../TaskProfileDb.java => store/TaskStore.java} | 36 ++--
.../TestRocksStoreImp.java} | 30 +--
.../org/apache/inlong/agent/core/AgentManager.java | 2 +-
.../agent/core/instance/InstanceManager.java | 132 ++++++-------
.../inlong/agent/core/task/OffsetManager.java | 53 +++---
.../apache/inlong/agent/core/task/TaskManager.java | 208 ++++++++++-----------
.../agent/core/instance/TestInstanceManager.java | 20 +-
.../apache/inlong/agent/core/task/MockTask.java | 4 +-
.../inlong/agent/core/task/TestTaskManager.java | 6 +-
.../org/apache/inlong/agent/installer/Manager.java | 2 +-
.../inlong/agent/installer/ModuleManager.java | 3 +-
.../installer/conf/InstallerConfiguration.java | 2 +-
.../inlong/agent/plugin/sources/LogFileSource.java | 2 +-
.../inlong/agent/plugin/task/AbstractTask.java | 10 +-
.../apache/inlong/agent/plugin/task/CronTask.java | 4 +-
.../inlong/agent/plugin/utils/RocksDBUtils.java | 16 +-
.../agent/plugin/sources/TestLogFileSource.java | 24 +--
.../inlong/agent/plugin/task/TestLogFileTask.java | 4 +-
26 files changed, 366 insertions(+), 334 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
index e10d872ca3..40ad855db7 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
@@ -18,8 +18,8 @@
package org.apache.inlong.agent.plugin.file;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.state.AbstractStateWrapper;
+import org.apache.inlong.agent.store.Store;
import java.io.IOException;
@@ -33,7 +33,7 @@ public abstract class Task extends AbstractStateWrapper {
*
* @throws IOException
*/
- public abstract void init(Object srcManager, TaskProfile profile, Db
basicDb) throws IOException;
+ public abstract void init(Object srcManager, TaskProfile profile, Store
basicStore) throws IOException;
/**
* destroy task.
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
similarity index 70%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
index acc0fbfee3..2551734134 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/InstanceStore.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
@@ -28,25 +28,25 @@ import java.util.ArrayList;
import java.util.List;
/**
- * db interface for instance profile.
+ * Store for instance profile
*/
-public class InstanceDb {
+public class InstanceStore {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TaskProfileDb.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceStore.class);
- private final Db db;
+ private final Store store;
- public InstanceDb(Db db) {
- this.db = db;
+ public InstanceStore(Store store) {
+ this.store = store;
}
/**
- * list all instance from db.
+ * list all instance from instance store.
*
* @return list of task
*/
public List<InstanceProfile> listAllInstances() {
- List<KeyValueEntity> result = this.db.findAll("");
+ List<KeyValueEntity> result = this.store.findAll(store.getUniqueKey());
List<InstanceProfile> instanceList = new ArrayList<>();
for (KeyValueEntity entity : result) {
instanceList.add(entity.getAsInstanceProfile());
@@ -55,12 +55,12 @@ public class InstanceDb {
}
/**
- * get instance list from db.
+ * get instance list from instance store.
*
* @return list of task
*/
public List<InstanceProfile> getInstances(String taskId) {
- List<KeyValueEntity> result = this.db.findAll(getKeyByTaskId(taskId));
+ List<KeyValueEntity> result =
this.store.findAll(getKeyByTaskId(taskId));
List<InstanceProfile> instanceList = new ArrayList<>();
for (KeyValueEntity entity : result) {
instanceList.add(entity.getAsInstanceProfile());
@@ -79,7 +79,7 @@ public class InstanceDb {
instance.get(TaskConstants.INSTANCE_ID));
KeyValueEntity entity = new KeyValueEntity(keyName,
instance.toJsonStr(),
instance.get(TaskConstants.INSTANCE_ID));
- db.put(entity);
+ store.put(entity);
} else {
LOGGER.error("instance profile invalid!");
}
@@ -92,7 +92,7 @@ public class InstanceDb {
* @param instanceId it can be file name(file collect), table name(db
sync) etc
*/
public InstanceProfile getInstance(String taskId, String instanceId) {
- KeyValueEntity result = this.db.get(getKeyByTaskAndInstanceId(taskId,
instanceId));
+ KeyValueEntity result =
this.store.get(getKeyByTaskAndInstanceId(taskId, instanceId));
if (result == null) {
return null;
}
@@ -106,18 +106,23 @@ public class InstanceDb {
* @param instanceId it can be file name(file collect), table name(db
sync) etc
*/
public void deleteInstance(String taskId, String instanceId) {
- db.remove(getKeyByTaskAndInstanceId(taskId, instanceId));
+ store.remove(getKeyByTaskAndInstanceId(taskId, instanceId));
}
- private String getKey() {
- return CommonConstants.INSTANCE_ID_PREFIX;
+ public String getKey() {
+ if (store.getUniqueKey().isEmpty()) {
+ return CommonConstants.INSTANCE_ID_PREFIX + store.getSplitter();
+ } else {
+ return store.getUniqueKey() + store.getSplitter() +
CommonConstants.INSTANCE_ID_PREFIX
+ + store.getSplitter();
+ }
}
- private String getKeyByTaskId(String taskId) {
- return CommonConstants.INSTANCE_ID_PREFIX + taskId;
+ public String getKeyByTaskId(String taskId) {
+ return getKey() + taskId;
}
- private String getKeyByTaskAndInstanceId(String taskId, String instanceId)
{
- return CommonConstants.INSTANCE_ID_PREFIX + taskId + "_" + instanceId;
+ public String getKeyByTaskAndInstanceId(String taskId, String instanceId) {
+ return getKeyByTaskId(taskId) + store.getSplitter() +
store.replaceKeywords(instanceId);
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/KeyValueEntity.java
similarity index 98%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/KeyValueEntity.java
index e9ecc80b56..cbce2f7f7f 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/KeyValueEntity.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.JobProfile;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/LocalProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/LocalProfile.java
similarity index 98%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/LocalProfile.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/LocalProfile.java
index 99f03bea68..c293853ab1 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/LocalProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/LocalProfile.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import org.apache.inlong.agent.conf.JobProfile;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/OffsetStore.java
similarity index 69%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/OffsetStore.java
index 5c31a2f88a..93059f70ab 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/OffsetStore.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.constant.CommonConstants;
@@ -29,32 +29,19 @@ import java.util.ArrayList;
import java.util.List;
/**
- * db interface for task profile.
+ * Store for offset
*/
-public class OffsetDb {
+public class OffsetStore {
- private static final Logger LOGGER =
LoggerFactory.getLogger(OffsetDb.class);
- private final Db db;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OffsetStore.class);
+ private final Store store;
- public OffsetDb(Db db) {
- this.db = db;
- }
-
- /**
- * init db by class name
- *
- * @return db
- */
- private Db initDb(String childPath) {
- try {
- return new RocksDbImp(childPath);
- } catch (Exception ex) {
- throw new UnsupportedClassVersionError(ex.getMessage());
- }
+ public OffsetStore(Store store) {
+ this.store = store;
}
public List<OffsetProfile> listAllOffsets() {
- List<KeyValueEntity> result = this.db.findAll("");
+ List<KeyValueEntity> result = this.store.findAll(store.getUniqueKey());
List<OffsetProfile> offsetList = new ArrayList<>();
for (KeyValueEntity entity : result) {
offsetList.add(entity.getAsOffsetProfile());
@@ -63,7 +50,7 @@ public class OffsetDb {
}
public OffsetProfile getOffset(String taskId, String instanceId) {
- KeyValueEntity result = db.get(getKey(taskId, instanceId));
+ KeyValueEntity result = store.get(getKey(taskId, instanceId));
if (result == null) {
return null;
}
@@ -71,7 +58,7 @@ public class OffsetDb {
}
public void deleteOffset(String taskId, String instanceId) {
- db.remove(getKey(taskId, instanceId));
+ store.remove(getKey(taskId, instanceId));
}
public void setOffset(OffsetProfile offsetProfile) {
@@ -81,11 +68,18 @@ public class OffsetDb {
offsetProfile.getInstanceId());
KeyValueEntity entity = new KeyValueEntity(keyName,
offsetProfile.toJsonStr(),
offsetProfile.get(TaskConstants.INSTANCE_ID));
- db.put(entity);
+ store.put(entity);
}
}
- private String getKey(String taskId, String instanceId) {
- return CommonConstants.OFFSET_ID_PREFIX + taskId + "_" + instanceId;
+ public String getKey(String taskId, String instanceId) {
+ if (store.getUniqueKey().isEmpty()) {
+ return CommonConstants.OFFSET_ID_PREFIX + store.getSplitter() +
taskId
+ + store.getSplitter() + store.replaceKeywords(instanceId);
+ } else {
+ return store.getUniqueKey() + store.getSplitter() +
CommonConstants.OFFSET_ID_PREFIX
+ + store.getSplitter() + taskId
+ + store.getSplitter() + store.replaceKeywords(instanceId);
+ }
}
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
similarity index 92%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
index 02edcc6165..47caee3703 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/RocksStoreImp.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constant.AgentConstants;
@@ -45,12 +45,14 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
/**
- * DB implement based on the Rocks DB.
+ * Store implement based on the Rocks DB.
*/
-public class RocksDbImp implements Db {
+public class RocksStoreImp implements Store {
- private static final Logger LOGGER =
LoggerFactory.getLogger(RocksDbImp.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RocksStoreImp.class);
private static final Gson GSON = new Gson();
+ public static final String SPLITTER = "_";
+ public static final String UNIQUE_KEY = "";
private final AgentConfiguration conf;
private final RocksDB db;
@@ -60,7 +62,7 @@ public class RocksDbImp implements Db {
private ConcurrentHashMap<String, ColumnFamilyDescriptor>
columnDescriptorMap;
private String storePath;
- public RocksDbImp(String childPath) {
+ public RocksStoreImp(String childPath) {
// init rocks db
this.conf = AgentConfiguration.getAgentConf();
this.db = initEnv(childPath);
@@ -124,7 +126,7 @@ public class RocksDbImp implements Db {
} else {
LOGGER.info("loading column families :" +
existing.stream().map(String::new).collect(Collectors.toList()));
managedColumnFamilies.addAll(
-
existing.stream().map(RocksDbImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
+
existing.stream().map(RocksStoreImp::getColumnFamilyDescriptor).collect(Collectors.toList()));
}
return managedColumnFamilies;
}
@@ -198,6 +200,21 @@ public class RocksDbImp implements Db {
return results;
}
+ @Override
+ public String getSplitter() {
+ return SPLITTER;
+ }
+
+ @Override
+ public String getUniqueKey() {
+ return UNIQUE_KEY;
+ }
+
+ @Override
+ public String replaceKeywords(String source) {
+ return source;
+ }
+
@Override
public void close() throws IOException {
db.close();
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/StateSearchKey.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/StateSearchKey.java
similarity index 96%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/StateSearchKey.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/StateSearchKey.java
index e7472799c7..ea49a39cbb 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/StateSearchKey.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/StateSearchKey.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
/**
* search key for state.
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/Store.java
similarity index 86%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/Store.java
index 8043c911fc..a0babfbc96 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/Db.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/Store.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import java.io.Closeable;
import java.util.List;
/**
- * local storage for key/value.
+ * Store for task, instance and offset info
*/
-public interface Db extends Closeable {
+public interface Store extends Closeable {
KeyValueEntity get(String key);
@@ -50,4 +50,10 @@ public interface Db extends Closeable {
* @return list of k/v
*/
List<KeyValueEntity> findAll(String prefix);
+
+ String getSplitter();
+
+ String getUniqueKey();
+
+ String replaceKeywords(String source);
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/TaskStore.java
similarity index 71%
rename from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/TaskStore.java
index 02c616ff83..649d3c38ae 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/store/TaskStore.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CommonConstants;
@@ -27,25 +27,25 @@ import java.util.ArrayList;
import java.util.List;
/**
- * db interface for task profile.
+ * Store for task profile
*/
-public class TaskProfileDb {
+public class TaskStore {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TaskProfileDb.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TaskStore.class);
- private final Db db;
+ private final Store store;
- public TaskProfileDb(Db db) {
- this.db = db;
+ public TaskStore(Store store) {
+ this.store = store;
}
/**
- * get task list from db.
+ * get task list from task store.
*
* @return list of task
*/
public List<TaskProfile> getTasks() {
- List<KeyValueEntity> result = this.db.findAll(getKey());
+ List<KeyValueEntity> result = this.store.findAll(getKey());
List<TaskProfile> taskList = new ArrayList<>();
for (KeyValueEntity entity : result) {
taskList.add(entity.getAsTaskProfile());
@@ -63,7 +63,7 @@ public class TaskProfileDb {
String keyName = getKeyByTaskId(task.getTaskId());
KeyValueEntity entity = new KeyValueEntity(keyName,
task.toJsonStr(), "");
- db.put(entity);
+ store.put(entity);
}
}
@@ -73,7 +73,7 @@ public class TaskProfileDb {
* @param taskId taskId
*/
public TaskProfile getTask(String taskId) {
- KeyValueEntity result = this.db.get(getKeyByTaskId(taskId));
+ KeyValueEntity result = this.store.get(getKeyByTaskId(taskId));
if (result == null) {
return null;
}
@@ -84,14 +84,18 @@ public class TaskProfileDb {
* delete task by id.
*/
public void deleteTask(String taskId) {
- db.remove(getKeyByTaskId(taskId));
+ store.remove(getKeyByTaskId(taskId));
}
- private String getKey() {
- return CommonConstants.TASK_ID_PREFIX;
+ public String getKey() {
+ if (store.getUniqueKey().isEmpty()) {
+ return CommonConstants.TASK_ID_PREFIX;
+ } else {
+ return store.getUniqueKey() + store.getSplitter() +
CommonConstants.TASK_ID_PREFIX;
+ }
}
- private String getKeyByTaskId(String taskId) {
- return CommonConstants.TASK_ID_PREFIX + taskId;
+ public String getKeyByTaskId(String taskId) {
+ return getKey() + store.getSplitter() + taskId;
}
}
diff --git
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
similarity index 76%
rename from
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
rename to
inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
index 3e51c4a891..e9dd639b0a 100644
---
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
+++
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/store/TestRocksStoreImp.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.db;
+package org.apache.inlong.agent.store;
import org.apache.inlong.agent.AgentBaseTestsHelper;
@@ -26,33 +26,33 @@ import org.junit.Test;
import java.io.IOException;
-public class TestRocksDbImp {
+public class TestRocksStoreImp {
- private static RocksDbImp db;
+ private static RocksStoreImp store;
private static AgentBaseTestsHelper helper;
@BeforeClass
public static void setup() throws Exception {
- helper = new
AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome();
- db = new RocksDbImp("/localdb");
+ helper = new
AgentBaseTestsHelper(TestRocksStoreImp.class.getName()).setupAgentHome();
+ store = new RocksStoreImp("/localdb");
}
@AfterClass
public static void teardown() throws IOException {
- db.close();
+ store.close();
helper.teardownAgentHome();
}
@Test
public void testKeyValueDB() {
KeyValueEntity entity = new KeyValueEntity("test1", "testA", "test");
- db.put(entity);
- KeyValueEntity ret = db.get("test1");
+ store.put(entity);
+ KeyValueEntity ret = store.get("test1");
Assert.assertEquals("test1", ret.getKey());
Assert.assertEquals("testA", ret.getJsonValue());
- db.remove("test1");
- ret = db.get("test1");
+ store.remove("test1");
+ ret = store.get("test1");
Assert.assertNull(ret);
StateSearchKey keys = StateSearchKey.SUCCESS;
@@ -61,17 +61,17 @@ public class TestRocksDbImp {
entity1.setStateSearchKey(keys);
entity.setJsonValue("testC");
- db.put(entity);
- KeyValueEntity newEntity = db.get("test1");
+ store.put(entity);
+ KeyValueEntity newEntity = store.get("test1");
Assert.assertEquals("testC", newEntity.getJsonValue());
}
@Test
public void testDeleteEntity() {
KeyValueEntity entity = new KeyValueEntity("searchKey1",
"searchResult1", "test");
- db.put(entity);
- db.remove("searchKey1");
- KeyValueEntity entityResult = db.get("searchKey1");
+ store.put(entity);
+ store.remove("searchKey1");
+ KeyValueEntity entityResult = store.get("searchKey1");
Assert.assertNull(entityResult);
}
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 3c73f6696d..ca5247fe59 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -32,7 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
- * Agent Manager, the bridge for job manager, task manager, db e.t.c it
manages agent level operations and communicates
+ * Agent Manager, the bridge for task manager, task store e.t.c it manages
agent level operations and communicates
* with outside system.
*/
public class AgentManager extends AbstractDaemon {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 0642e325a2..333c12a7d2 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -22,11 +22,11 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.InstanceDb;
-import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
@@ -47,7 +47,7 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
/**
* handle the instance created by task, including add, delete, update etc.
- * the instance info is store in both db and memory.
+ * the instance info is saved in both store and memory.
*/
public class InstanceManager extends AbstractDaemon {
@@ -57,10 +57,10 @@ public class InstanceManager extends AbstractDaemon {
public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
private long lastPrintTime = 0;
- // instance in db
- private final InstanceDb instanceDb;
- private TaskProfileDb taskProfileDb;
- private TaskProfile taskFromDb;
+ // instance in instance store
+ private final InstanceStore instanceStore;
+ private TaskStore taskStore;
+ private TaskProfile taskFromStore;
// task in memory
private final ConcurrentHashMap<String, Instance> instanceMap;
// instance profile queue.
@@ -116,10 +116,10 @@ public class InstanceManager extends AbstractDaemon {
/**
* Init task manager.
*/
- public InstanceManager(String taskId, int instanceLimit, Db basicDb,
TaskProfileDb taskProfileDb) {
+ public InstanceManager(String taskId, int instanceLimit, Store basicStore,
TaskStore taskStore) {
this.taskId = taskId;
- instanceDb = new InstanceDb(basicDb);
- this.taskProfileDb = taskProfileDb;
+ instanceStore = new InstanceStore(basicStore);
+ this.taskStore = taskStore;
this.agentConf = AgentConfiguration.getAgentConf();
instanceMap = new ConcurrentHashMap<>();
this.instanceLimit = instanceLimit;
@@ -130,8 +130,8 @@ public class InstanceManager extends AbstractDaemon {
return taskId;
}
- public InstanceDb getInstanceDb() {
- return instanceDb;
+ public InstanceStore getInstanceStore() {
+ return instanceStore;
}
public Instance getInstance(String instanceId) {
@@ -139,7 +139,7 @@ public class InstanceManager extends AbstractDaemon {
}
public InstanceProfile getInstanceProfile(String instanceId) {
- return instanceDb.getInstance(taskId, instanceId);
+ return instanceStore.getInstance(taskId, instanceId);
}
public boolean submitAction(InstanceAction action) {
@@ -163,9 +163,9 @@ public class InstanceManager extends AbstractDaemon {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
dealWithActionQueue(actionQueue);
- keepPaceWithDb();
- String inlongGroupId = taskFromDb.getInlongGroupId();
- String inlongStreamId = taskFromDb.getInlongStreamId();
+ keepPaceWithStore();
+ String inlongGroupId = taskFromStore.getInlongGroupId();
+ String inlongStreamId = taskFromStore.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId,
inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1, auditVersion);
} catch (Throwable ex) {
@@ -181,64 +181,65 @@ public class InstanceManager extends AbstractDaemon {
private void printInstanceState() {
long currentTime = AgentUtils.getCurrentTime();
if (currentTime - lastPrintTime > INSTANCE_PRINT_INTERVAL_MS) {
- List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+ List<InstanceProfile> instances =
instanceStore.getInstances(taskId);
InstancePrintStat stat = new InstancePrintStat();
for (int i = 0; i < instances.size(); i++) {
InstanceProfile instance = instances.get(i);
stat.stat(instance.getState());
}
LOGGER.info(
- "instanceManager running! taskId {} mem {} db total {} {}
action count {}",
+ "instanceManager running! taskId {} mem {} total {} {}
action count {}",
taskId, instanceMap.size(), instances.size(), stat,
actionQueue.size());
lastPrintTime = currentTime;
}
}
- private void keepPaceWithDb() {
- traverseDbTasksToMemory();
- traverseMemoryTasksToDb();
+ private void keepPaceWithStore() {
+ traverseStoreTasksToMemory();
+ traverseMemoryTasksToStore();
}
- private void traverseDbTasksToMemory() {
- instanceDb.getInstances(taskId).forEach((profileFromDb) -> {
- InstanceStateEnum dbState = profileFromDb.getState();
- Instance instance = instanceMap.get(profileFromDb.getInstanceId());
- switch (dbState) {
+ private void traverseStoreTasksToMemory() {
+ instanceStore.getInstances(taskId).forEach((profileFromStore) -> {
+ InstanceStateEnum storeState = profileFromStore.getState();
+ Instance instance =
instanceMap.get(profileFromStore.getInstanceId());
+ switch (storeState) {
case DEFAULT: {
if (instance == null) {
- LOGGER.info("traverseDbTasksToMemory add instance to
mem taskId {} instanceId {}",
- profileFromDb.getTaskId(),
profileFromDb.getInstanceId());
- addToMemory(profileFromDb);
+ LOGGER.info("traverseStoreTasksToMemory add instance
to mem taskId {} instanceId {}",
+ profileFromStore.getTaskId(),
profileFromStore.getInstanceId());
+ addToMemory(profileFromStore);
}
break;
}
case FINISHED:
DELETE: {
if (instance != null) {
- LOGGER.info("traverseDbTasksToMemory delete
instance from mem taskId {} instanceId {}",
- profileFromDb.getTaskId(),
profileFromDb.getInstanceId());
- deleteFromMemory(profileFromDb.getInstanceId());
+ LOGGER.info("traverseStoreTasksToMemory delete
instance from mem taskId {} instanceId {}",
+ profileFromStore.getTaskId(),
profileFromStore.getInstanceId());
+ deleteFromMemory(profileFromStore.getInstanceId());
}
break;
}
default: {
- LOGGER.error("instance invalid state {} taskId {}
instanceId {}", dbState,
- profileFromDb.getTaskId(),
- profileFromDb.getInstanceId());
+ LOGGER.error("instance invalid state {} taskId {}
instanceId {}", storeState,
+ profileFromStore.getTaskId(),
+ profileFromStore.getInstanceId());
}
}
});
}
- private void traverseMemoryTasksToDb() {
+ private void traverseMemoryTasksToStore() {
instanceMap.values().forEach((instance) -> {
- InstanceProfile profileFromDb =
instanceDb.getInstance(instance.getTaskId(), instance.getInstanceId());
- if (profileFromDb == null) {
+ InstanceProfile profileFromStore =
+ instanceStore.getInstance(instance.getTaskId(),
instance.getInstanceId());
+ if (profileFromStore == null) {
deleteFromMemory(instance.getInstanceId());
return;
}
- InstanceStateEnum stateFromDb = profileFromDb.getState();
- if (stateFromDb != InstanceStateEnum.DEFAULT) {
+ InstanceStateEnum stateFromStore = profileFromStore.getState();
+ if (stateFromStore != InstanceStateEnum.DEFAULT) {
deleteFromMemory(instance.getInstanceId());
}
if (AgentUtils.getCurrentTime() - instance.getLastHeartbeatTime()
> INSTANCE_KEEP_ALIVE_MS) {
@@ -279,7 +280,7 @@ public class InstanceManager extends AbstractDaemon {
@Override
public void start() {
- restoreFromDb();
+ restoreFromStore();
submitWorker(coreThread());
}
@@ -296,19 +297,19 @@ public class InstanceManager extends AbstractDaemon {
}
}
- private void restoreFromDb() {
- taskFromDb = taskProfileDb.getTask(taskId);
- auditVersion = Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION));
- List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
+ private void restoreFromStore() {
+ taskFromStore = taskStore.getTask(taskId);
+ auditVersion = Long.parseLong(taskFromStore.get(TASK_AUDIT_VERSION));
+ List<InstanceProfile> profileList = instanceStore.getInstances(taskId);
profileList.forEach((profile) -> {
InstanceStateEnum state = profile.getState();
if (state == InstanceStateEnum.DEFAULT) {
- LOGGER.info("instance restoreFromDb addToMem state {} taskId
{} instanceId {}", state, taskId,
+ LOGGER.info("instance restoreFromStore addToMem state {}
taskId {} instanceId {}", state, taskId,
profile.getInstanceId());
profile.setBoolean(RESTORE_FROM_DB, true);
addToMemory(profile);
} else {
- LOGGER.info("instance restoreFromDb ignore state {} taskId {}
instanceId {}", state, taskId,
+ LOGGER.info("instance restoreFromStore ignore state {} taskId
{} instanceId {}", state, taskId,
profile.getInstanceId());
}
});
@@ -325,31 +326,31 @@ public class InstanceManager extends AbstractDaemon {
profile.getInstanceId());
return;
}
- addToDb(profile, true);
+ addToStore(profile, true);
addToMemory(profile);
}
private void finishInstance(InstanceProfile profile) {
profile.setState(InstanceStateEnum.FINISHED);
profile.setModifyTime(AgentUtils.getCurrentTime());
- addToDb(profile, false);
+ addToStore(profile, false);
deleteFromMemory(profile.getInstanceId());
LOGGER.info("finished instance state {} taskId {} instanceId {}",
profile.getState(),
profile.getTaskId(), profile.getInstanceId());
}
private void deleteInstance(String instanceId) {
- deleteFromDb(instanceId);
+ deleteFromStore(instanceId);
deleteFromMemory(instanceId);
}
- private void deleteFromDb(String instanceId) {
- InstanceProfile profile = instanceDb.getInstance(taskId, instanceId);
+ private void deleteFromStore(String instanceId) {
+ InstanceProfile profile = instanceStore.getInstance(taskId,
instanceId);
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
- instanceDb.deleteInstance(taskId, instanceId);
- LOGGER.info("delete instance from db: taskId {} instanceId {} result
{}", taskId,
- instanceId, instanceDb.getInstance(taskId, instanceId));
+ instanceStore.deleteInstance(taskId, instanceId);
+ LOGGER.info("delete instance from instance store: taskId {} instanceId
{} result {}", taskId,
+ instanceId, instanceStore.getInstance(taskId, instanceId));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB,
inlongGroupId, inlongStreamId,
profile.getSinkDataTime(), 1, 1, auditVersion);
}
@@ -370,9 +371,10 @@ public class InstanceManager extends AbstractDaemon {
instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
}
- private void addToDb(InstanceProfile profile, boolean addNew) {
- LOGGER.info("add instance to db state {} instanceId {}",
profile.getState(), profile.getInstanceId());
- instanceDb.storeInstance(profile);
+ private void addToStore(InstanceProfile profile, boolean addNew) {
+ LOGGER.info("add instance to instance store state {} instanceId {}",
profile.getState(),
+ profile.getInstanceId());
+ instanceStore.storeInstance(profile);
if (addNew) {
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
@@ -434,13 +436,13 @@ public class InstanceManager extends AbstractDaemon {
}
public boolean shouldAddAgain(String fileName, long lastModifyTime) {
- InstanceProfile profileFromDb = instanceDb.getInstance(taskId,
fileName);
- if (profileFromDb == null) {
- LOGGER.debug("not in db should add {}", fileName);
+ InstanceProfile profileFromStore = instanceStore.getInstance(taskId,
fileName);
+ if (profileFromStore == null) {
+ LOGGER.debug("not in instance store should add {}", fileName);
return true;
} else {
- InstanceStateEnum state = profileFromDb.getState();
- if (state == InstanceStateEnum.FINISHED && lastModifyTime >
profileFromDb.getModifyTime()) {
+ InstanceStateEnum state = profileFromStore.getState();
+ if (state == InstanceStateEnum.FINISHED && lastModifyTime >
profileFromStore.getModifyTime()) {
LOGGER.debug("finished but file update again {}", fileName);
return true;
}
@@ -466,7 +468,7 @@ public class InstanceManager extends AbstractDaemon {
if (!actionQueue.isEmpty()) {
return false;
}
- List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+ List<InstanceProfile> instances = instanceStore.getInstances(taskId);
for (int i = 0; i < instances.size(); i++) {
InstanceProfile profile = instances.get(i);
if (profile.getState() != InstanceStateEnum.FINISHED) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 55b2ae3e4f..024a846e39 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -22,11 +22,11 @@ import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.InstanceDb;
-import org.apache.inlong.agent.db.OffsetDb;
-import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.OffsetStore;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
/**
- * used to store instance offset to db
+ * used to save instance offset to offset store
* where key is task id + read file name and value is instance offset
*/
public class OffsetManager extends AbstractDaemon {
@@ -53,14 +53,16 @@ public class OffsetManager extends AbstractDaemon {
public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100;
public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
private static volatile OffsetManager offsetManager = null;
- private final OffsetDb offsetDb;
- private final InstanceDb instanceDb;
- private final TaskProfileDb taskProfileDb;
-
- private OffsetManager(Db taskBasicDb, Db instanceBasicDb, Db
offsetBasicDb) {
- taskProfileDb = new TaskProfileDb(taskBasicDb);
- instanceDb = new InstanceDb(instanceBasicDb);
- offsetDb = new OffsetDb(offsetBasicDb);
+ private final OffsetStore offsetStore;
+ private final InstanceStore instanceStore;
+ private final TaskStore taskStore;
+
+ private OffsetManager(
+ Store taskBasicStore, Store instanceBasicStore,
+ Store offsetBasicStore) {
+ taskStore = new TaskStore(taskBasicStore);
+ instanceStore = new InstanceStore(instanceBasicStore);
+ offsetStore = new OffsetStore(offsetBasicStore);
}
/**
@@ -87,11 +89,14 @@ public class OffsetManager extends AbstractDaemon {
/**
* task position manager singleton, can only generated by agent manager
*/
- public static void init(Db taskBasicDb, Db instanceBasicDb, Db
offsetBasicDb) {
+ public static void init(
+ Store taskBasicStore, Store instanceBasicStore,
+ Store offsetBasicStore) {
if (offsetManager == null) {
synchronized (OffsetManager.class) {
if (offsetManager == null) {
- offsetManager = new OffsetManager(taskBasicDb,
instanceBasicDb, offsetBasicDb);
+ offsetManager = new OffsetManager(taskBasicStore,
instanceBasicStore,
+ offsetBasicStore);
}
}
}
@@ -108,23 +113,23 @@ public class OffsetManager extends AbstractDaemon {
}
public void setOffset(OffsetProfile profile) {
- offsetDb.setOffset(profile);
+ offsetStore.setOffset(profile);
}
public void deleteOffset(String taskId, String instanceId) {
- offsetDb.deleteOffset(taskId, instanceId);
+ offsetStore.deleteOffset(taskId, instanceId);
}
public OffsetProfile getOffset(String taskId, String instanceId) {
- return offsetDb.getOffset(taskId, instanceId);
+ return offsetStore.getOffset(taskId, instanceId);
}
private void cleanDbOffset() {
- List<OffsetProfile> offsets = offsetDb.listAllOffsets();
+ List<OffsetProfile> offsets = offsetStore.listAllOffsets();
offsets.forEach(offset -> {
String taskId = offset.getTaskId();
String instanceId = offset.getInstanceId();
- InstanceProfile instanceProfile = instanceDb.getInstance(taskId,
instanceId);
+ InstanceProfile instanceProfile =
instanceStore.getInstance(taskId, instanceId);
if (instanceProfile == null) {
deleteOffset(taskId, instanceId);
LOGGER.info("instance not found, delete offset taskId {}
instanceId {}", taskId,
@@ -136,7 +141,7 @@ public class OffsetManager extends AbstractDaemon {
private void cleanDbInstance() {
AtomicInteger cleanCount = new AtomicInteger();
- Iterator<InstanceProfile> iterator =
instanceDb.listAllInstances().listIterator();
+ Iterator<InstanceProfile> iterator =
instanceStore.listAllInstances().listIterator();
while (iterator.hasNext()) {
if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
return;
@@ -147,7 +152,7 @@ public class OffsetManager extends AbstractDaemon {
if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
continue;
}
- TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
+ TaskProfile taskFromDb = taskStore.getTask(taskId);
if (taskFromDb != null) {
if
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
continue;
@@ -166,9 +171,9 @@ public class OffsetManager extends AbstractDaemon {
DB_INSTANCE_EXPIRE_CYCLE_COUNT +
instanceFromDb.getCycleUnit());
if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() >
expireTime) {
cleanCount.getAndIncrement();
- LOGGER.info("instance has expired, delete from db dataTime {}
taskId {} instanceId {}",
+ LOGGER.info("instance has expired, delete from instance store
dataTime {} taskId {} instanceId {}",
instanceFromDb.getSourceDataTime(), taskId,
instanceId);
- instanceDb.deleteInstance(taskId, instanceId);
+ instanceStore.deleteInstance(taskId, instanceId);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB,
instanceFromDb.getInlongGroupId(),
instanceFromDb.getInlongStreamId(),
instanceFromDb.getSinkDataTime(), 1, 1,
Long.parseLong(instanceFromDb.get(TASK_AUDIT_VERSION)));
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 58bd274c8b..5bbdf43432 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -22,11 +22,11 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.RocksDbImp;
-import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.store.RocksStoreImp;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -50,7 +50,7 @@ import static
org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
/**
* handle the task config from manager, including add, delete, update etc.
- * the task config is store in both db and memory.
+ * the task config is store in both task store and memory.
*/
public class TaskManager extends AbstractDaemon {
@@ -60,14 +60,14 @@ public class TaskManager extends AbstractDaemon {
public static final int CORE_THREAD_PRINT_TIME = 10000;
private static final int ACTION_QUEUE_CAPACITY = 1000;
private long lastPrintTime = 0;
- // task basic db
- private final Db taskBasicDb;
- // instance basic db
- private final Db instanceBasicDb;
- // offset basic db
- private final Db offsetBasicDb;
- // task in db
- private final TaskProfileDb taskDb;
+ // task basic store
+ private final Store taskBasicStore;
+ // instance basic store
+ private final Store instanceBasicStore;
+ // offset basic store
+ private final Store offsetBasicStore;
+ // task in task store
+ private final TaskStore taskStore;
// task in memory
private final ConcurrentHashMap<String, Task> taskMap;
// task config from manager.
@@ -125,14 +125,14 @@ public class TaskManager extends AbstractDaemon {
*/
public TaskManager() {
this.agentConf = AgentConfiguration.getAgentConf();
- taskBasicDb = initDb(
+ taskBasicStore = initStore(
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_TASK));
- taskDb = new TaskProfileDb(taskBasicDb);
- instanceBasicDb = initDb(
+ taskStore = new TaskStore(taskBasicStore);
+ instanceBasicStore = initStore(
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
- offsetBasicDb =
- initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
- OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
+ offsetBasicStore =
+ initStore(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+ OffsetManager.init(taskBasicStore, instanceBasicStore,
offsetBasicStore);
this.runningPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -145,22 +145,22 @@ public class TaskManager extends AbstractDaemon {
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
}
- public TaskProfileDb getTaskDb() {
- return taskDb;
+ public TaskStore getTaskStore() {
+ return taskStore;
}
- public Db getInstanceBasicDb() {
- return instanceBasicDb;
+ public Store getInstanceBasicStore() {
+ return instanceBasicStore;
}
/**
- * init db by class name
+ * init store by class name
*
- * @return db
+ * @return store
*/
- public static Db initDb(String childPath) {
+ public static Store initStore(String childPath) {
try {
- return new RocksDbImp(childPath);
+ return new RocksStoreImp(childPath);
} catch (Exception ex) {
throw new UnsupportedClassVersionError(ex.getMessage());
}
@@ -213,13 +213,13 @@ public class TaskManager extends AbstractDaemon {
private void printTaskDetail() {
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
- List<TaskProfile> tasksInDb = taskDb.getTasks();
+ List<TaskProfile> tasksInStore = taskStore.getTasks();
TaskPrintStat stat = new TaskPrintStat();
- for (int i = 0; i < tasksInDb.size(); i++) {
- TaskProfile task = tasksInDb.get(i);
+ for (int i = 0; i < tasksInStore.size(); i++) {
+ TaskProfile task = tasksInStore.get(i);
stat.stat(task.getState());
}
- LOGGER.info("taskManager running! mem {} db total {} {} ",
taskMap.size(), tasksInDb.size(), stat);
+ LOGGER.info("taskManager running! mem {} total {} {} ",
taskMap.size(), tasksInStore.size(), stat);
lastPrintTime = AgentUtils.getCurrentTime();
}
}
@@ -230,7 +230,7 @@ public class TaskManager extends AbstractDaemon {
return;
}
keepPaceWithManager(dataConfigs);
- keepPaceWithDb();
+ keepPaceWithStore();
}
private void dealWithActionQueue(BlockingQueue<TaskAction> queue) {
@@ -271,48 +271,48 @@ public class TaskManager extends AbstractDaemon {
LOGGER.error("task {} invalid task state {}", profile, state);
}
});
- traverseManagerTasksToDb(tasksFromManager);
- traverseDbTasksToManager(tasksFromManager);
+ traverseManagerTasksToStore(tasksFromManager);
+ traverseStoreTasksToManager(tasksFromManager);
}
/**
- * keep pace with task in db
+ * keep pace with task in task store
*/
- private void keepPaceWithDb() {
- traverseDbTasksToMemory();
- traverseMemoryTasksToDb();
+ private void keepPaceWithStore() {
+ traverseStoreTasksToMemory();
+ traverseMemoryTasksToStore();
}
/**
- * keep pace with task in db
+ * keep pace with task in task store
*/
- private void traverseManagerTasksToDb(Map<String, TaskProfile>
tasksFromManager) {
+ private void traverseManagerTasksToStore(Map<String, TaskProfile>
tasksFromManager) {
tasksFromManager.values().forEach((profileFromManager) -> {
- TaskProfile taskFromDb =
taskDb.getTask(profileFromManager.getTaskId());
- if (taskFromDb == null) {
- LOGGER.info("traverseManagerTasksToDb task {} not found in db
retry {} state {}, add it",
+ TaskProfile taskFromStore =
taskStore.getTask(profileFromManager.getTaskId());
+ if (taskFromStore == null) {
+ LOGGER.info("traverseManagerTasksToStore task {} not found in
task store retry {} state {}, add it",
profileFromManager.getTaskId(),
profileFromManager.isRetry(),
profileFromManager.getState());
addTask(profileFromManager);
} else {
TaskStateEnum managerState = profileFromManager.getState();
- TaskStateEnum dbState = taskFromDb.getState();
- if (managerState == dbState) {
+ TaskStateEnum storeState = taskFromStore.getState();
+ if (managerState == storeState) {
return;
}
- if (dbState == TaskStateEnum.RETRY_FINISH) {
- LOGGER.info("traverseManagerTasksToDb task {} dbState {}
retry {}, do nothing",
- taskFromDb.getTaskId(), dbState,
- taskFromDb.isRetry());
+ if (storeState == TaskStateEnum.RETRY_FINISH) {
+ LOGGER.info("traverseManagerTasksToStore task {}
storeState {} retry {}, do nothing",
+ taskFromStore.getTaskId(), storeState,
+ taskFromStore.isRetry());
return;
}
if (managerState == TaskStateEnum.RUNNING) {
- LOGGER.info("traverseManagerTasksToDb task {} dbState {}
retry {}, active it",
- taskFromDb.getTaskId(), dbState,
taskFromDb.isRetry());
+ LOGGER.info("traverseManagerTasksToStore task {}
storeState {} retry {}, active it",
+ taskFromStore.getTaskId(), storeState,
taskFromStore.isRetry());
activeTask(profileFromManager);
} else {
- LOGGER.info("traverseManagerTasksToDb task {} dbState {}
retry {}, freeze it",
- taskFromDb.getTaskId(), dbState,
taskFromDb.isRetry());
+ LOGGER.info("traverseManagerTasksToStore task {}
storeState {} retry {}, freeze it",
+ taskFromStore.getTaskId(), storeState,
taskFromStore.isRetry());
freezeTask(profileFromManager);
}
}
@@ -320,13 +320,13 @@ public class TaskManager extends AbstractDaemon {
}
/**
- * traverse tasks in db, if not found in tasks from manager then delete it
+ * traverse tasks in task store, if not found in tasks from manager then
delete it
*/
- private void traverseDbTasksToManager(Map<String, TaskProfile>
tasksFromManager) {
- taskDb.getTasks().forEach((profileFromDb) -> {
- if (!tasksFromManager.containsKey(profileFromDb.getTaskId())) {
- LOGGER.info("traverseDbTasksToManager try to delete task {}",
profileFromDb.getTaskId());
- deleteTask(profileFromDb);
+ private void traverseStoreTasksToManager(Map<String, TaskProfile>
tasksFromManager) {
+ taskStore.getTasks().forEach((profileFromStore) -> {
+ if (!tasksFromManager.containsKey(profileFromStore.getTaskId())) {
+ LOGGER.info("traverseStoreTasksToManager try to delete task
{}", profileFromStore.getTaskId());
+ deleteTask(profileFromStore);
}
});
}
@@ -335,49 +335,49 @@ public class TaskManager extends AbstractDaemon {
* manager task state is RUNNING and taskMap not found then add
* manager task state is FROZE and taskMap found thrn delete
*/
- private void traverseDbTasksToMemory() {
- taskDb.getTasks().forEach((profileFromDb) -> {
- TaskStateEnum dbState = profileFromDb.getState();
- Task task = taskMap.get(profileFromDb.getTaskId());
- if (dbState == TaskStateEnum.RUNNING) {
+ private void traverseStoreTasksToMemory() {
+ taskStore.getTasks().forEach((profileFromStore) -> {
+ TaskStateEnum storeState = profileFromStore.getState();
+ Task task = taskMap.get(profileFromStore.getTaskId());
+ if (storeState == TaskStateEnum.RUNNING) {
if (task == null) {
- LOGGER.info("traverseDbTasksToMemory add task to mem
taskId {}", profileFromDb.getTaskId());
- addToMemory(profileFromDb);
+ LOGGER.info("traverseStoreTasksToMemory add task to mem
taskId {}", profileFromStore.getTaskId());
+ addToMemory(profileFromStore);
}
- } else if (dbState == TaskStateEnum.FROZEN) {
+ } else if (storeState == TaskStateEnum.FROZEN) {
if (task != null) {
- LOGGER.info("traverseDbTasksToMemory delete task from mem
taskId {}",
- profileFromDb.getTaskId());
- deleteFromMemory(profileFromDb.getTaskId());
+ LOGGER.info("traverseStoreTasksToMemory delete task from
mem taskId {}",
+ profileFromStore.getTaskId());
+ deleteFromMemory(profileFromStore.getTaskId());
}
} else {
- if (dbState != TaskStateEnum.RETRY_FINISH) {
- LOGGER.error("task {} invalid state {}",
profileFromDb.getTaskId(), dbState);
+ if (storeState != TaskStateEnum.RETRY_FINISH) {
+ LOGGER.error("task {} invalid state {}",
profileFromStore.getTaskId(), storeState);
}
}
});
}
/**
- * task in taskMap but not in taskDb then delete
- * task in taskMap but task state from db is FROZEN then delete
+ * task in taskMap but not in task store then delete
+ * task in taskMap but task state from task store is FROZEN then delete
*/
- private void traverseMemoryTasksToDb() {
+ private void traverseMemoryTasksToStore() {
taskMap.values().forEach((task) -> {
- TaskProfile profileFromDb = taskDb.getTask(task.getTaskId());
- if (profileFromDb == null) {
+ TaskProfile profileFromStore = taskStore.getTask(task.getTaskId());
+ if (profileFromStore == null) {
deleteFromMemory(task.getTaskId());
return;
}
- TaskStateEnum stateFromDb = profileFromDb.getState();
- if (stateFromDb != TaskStateEnum.RUNNING) {
+ TaskStateEnum stateFromStore = profileFromStore.getState();
+ if (stateFromStore != TaskStateEnum.RUNNING) {
deleteFromMemory(task.getTaskId());
}
});
}
/**
- * add task profile to db
+ * add task profile to task store
* if task state is RUNNING then add task to memory
*/
private void addTask(TaskProfile taskProfile) {
@@ -389,7 +389,7 @@ public class TaskManager extends AbstractDaemon {
LOGGER.error("task profile invalid {}", taskProfile.toJsonStr());
return;
}
- addToDb(taskProfile);
+ addToStore(taskProfile);
TaskStateEnum state =
TaskStateEnum.getTaskState(taskProfile.getInt(TASK_STATE));
if (state == TaskStateEnum.RUNNING) {
addToMemory(taskProfile);
@@ -400,37 +400,37 @@ public class TaskManager extends AbstractDaemon {
}
private void deleteTask(TaskProfile taskProfile) {
- deleteFromDb(taskProfile);
+ deleteFromStore(taskProfile);
deleteFromMemory(taskProfile.getTaskId());
}
private void freezeTask(TaskProfile taskProfile) {
- updateToDb(taskProfile);
+ updateToStore(taskProfile);
deleteFromMemory(taskProfile.getTaskId());
}
private void finishTask(TaskProfile taskProfile) {
taskProfile.setState(TaskStateEnum.RETRY_FINISH);
- updateToDb(taskProfile);
+ updateToStore(taskProfile);
deleteFromMemory(taskProfile.getTaskId());
}
private void activeTask(TaskProfile taskProfile) {
- updateToDb(taskProfile);
+ updateToStore(taskProfile);
addToMemory(taskProfile);
}
- private void restoreFromDb() {
- LOGGER.info("restore from db start");
- List<TaskProfile> taskProfileList = taskDb.getTasks();
+ private void restoreFromStore() {
+ LOGGER.info("restore from task store start");
+ List<TaskProfile> taskProfileList = taskStore.getTasks();
taskProfileList.forEach((profile) -> {
if (profile.getState() == TaskStateEnum.RUNNING) {
- LOGGER.info("restore from db taskId {}", profile.getTaskId());
+ LOGGER.info("restore from task store taskId {}",
profile.getTaskId());
profile.setBoolean(RESTORE_FROM_DB, true);
addToMemory(profile);
}
});
- LOGGER.info("restore from db end");
+ LOGGER.info("restore from task store end");
}
private void stopAllTasks() {
@@ -452,30 +452,30 @@ public class TaskManager extends AbstractDaemon {
}
/**
- * add task to db, it was expected that there is no record refer the task
id.
+ * add task to task store, it was expected that there is no record refer
the task id.
* cause the task id will change if the task content changes, replace the
record
- * if it is found, the memory record will be updated by the db.
+ * if it is found, the memory record will be updated by the task store.
*/
- private void addToDb(TaskProfile taskProfile) {
- if (taskDb.getTask(taskProfile.getTaskId()) != null) {
+ private void addToStore(TaskProfile taskProfile) {
+ if (taskStore.getTask(taskProfile.getTaskId()) != null) {
LOGGER.error("task {} should not exist", taskProfile.getTaskId());
}
- taskDb.storeTask(taskProfile);
+ taskStore.storeTask(taskProfile);
}
- private void deleteFromDb(TaskProfile taskProfile) {
- if (taskDb.getTask(taskProfile.getTaskId()) == null) {
- LOGGER.error("try to delete task {} but not found in db",
taskProfile);
+ private void deleteFromStore(TaskProfile taskProfile) {
+ if (taskStore.getTask(taskProfile.getTaskId()) == null) {
+ LOGGER.error("try to delete task {} but not found in task store",
taskProfile);
return;
}
- taskDb.deleteTask(taskProfile.getTaskId());
+ taskStore.deleteTask(taskProfile.getTaskId());
}
- private void updateToDb(TaskProfile taskProfile) {
- if (taskDb.getTask(taskProfile.getTaskId()) == null) {
+ private void updateToStore(TaskProfile taskProfile) {
+ if (taskStore.getTask(taskProfile.getTaskId()) == null) {
LOGGER.error("task {} not found, agent may have been reinstalled",
taskProfile);
}
- taskDb.storeTask(taskProfile);
+ taskStore.storeTask(taskProfile);
}
/**
@@ -492,7 +492,7 @@ public class TaskManager extends AbstractDaemon {
try {
Class<?> taskClass = Class.forName(taskProfile.getTaskClass());
Task task = (Task) taskClass.newInstance();
- task.init(this, taskProfile, instanceBasicDb);
+ task.init(this, taskProfile, instanceBasicStore);
taskMap.put(taskProfile.getTaskId(), task);
runningPool.submit(task);
LOGGER.info(
@@ -523,12 +523,12 @@ public class TaskManager extends AbstractDaemon {
}
public TaskProfile getTaskProfile(String taskId) {
- return taskDb.getTask(taskId);
+ return taskStore.getTask(taskId);
}
@Override
public void start() throws Exception {
- restoreFromDb();
+ restoreFromStore();
submitWorker(coreThread());
OffsetManager.getInstance().start();
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 7c21666a9e..f45e4c55e0 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -22,9 +22,9 @@ import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.InstanceDb;
-import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.store.InstanceStore;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
@@ -54,12 +54,12 @@ public class TestInstanceManager {
public static void setup() {
helper = new
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
- Db basicDb = TaskManager.initDb("/localdb");
+ Store basicStore = TaskManager.initStore("/localdb");
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING, "GMT+6:00");
- Db taskBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
- TaskProfileDb taskDb = new TaskProfileDb(taskBasicDb);
- taskDb.storeTask(taskProfile);
- manager = new InstanceManager("1", 20, basicDb, taskDb);
+ Store taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+ TaskStore taskStore = new TaskStore(taskBasicStore);
+ taskStore.storeTask(taskProfile);
+ manager = new InstanceManager("1", 20, basicStore, taskStore);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
}
@@ -71,12 +71,12 @@ public class TestInstanceManager {
@Test
public void testInstanceManager() {
- InstanceDb instanceDb = manager.getInstanceDb();
+ InstanceStore instanceStore = manager.getInstanceStore();
for (int i = 1; i <= 10; i++) {
InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
String.valueOf(i), taskProfile.getCycleUnit(),
"2023092710",
AgentUtils.getCurrentTime());
- instanceDb.storeInstance(profile);
+ instanceStore.storeInstance(profile);
}
manager.start();
for (int i = 1; i <= 10; i++) {
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
index 03f2b3b8fa..a5a572dc65 100644
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
@@ -18,8 +18,8 @@
package org.apache.inlong.agent.core.task;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.store.Store;
import java.io.IOException;
@@ -36,7 +36,7 @@ public class MockTask extends Task {
private TaskManager manager;
@Override
- public void init(Object srcManager, TaskProfile profile, Db basicDb)
throws IOException {
+ public void init(Object srcManager, TaskProfile profile, Store basicStore)
throws IOException {
manager = (TaskManager) srcManager;
this.profile = profile;
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
index 11cc413ed1..61c255bbdf 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
@@ -19,7 +19,7 @@ package org.apache.inlong.agent.core.task;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
-import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.store.TaskStore;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.junit.AfterClass;
@@ -55,12 +55,12 @@ public class TestTaskManager {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
try {
manager = new TaskManager();
- TaskProfileDb taskProfileDb = manager.getTaskDb();
+ TaskStore taskStore = manager.getTaskStore();
for (int i = 1; i <= 10; i++) {
TaskProfile taskProfile = helper.getTaskProfile(i, pattern,
false, 0L, 0L, TaskStateEnum.RUNNING,
"GMT+8:00");
taskProfile.setTaskClass(MockTask.class.getCanonicalName());
- taskProfileDb.storeTask(taskProfile);
+ taskStore.storeTask(taskProfile);
}
manager.start();
for (int i = 1; i <= 10; i++) {
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
index d89b342c8d..cd1317c6da 100755
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/Manager.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Installer Manager, the bridge for job manager, task manager, db e.t.c it
manages agent level operations and
+ * Installer Manager, the bridge for job manager, task manager, store e.t.c it
manages agent level operations and
* communicates with outside system.
*/
public class Manager extends AbstractDaemon {
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index cd693436bf..c79d31f753 100755
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -70,8 +70,7 @@ import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MA
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
/**
- * Installer Manager, the bridge for job manager, task manager, db e.t.c it
manages agent level operations and
- * communicates with outside system.
+ * module manager, deal with module add, delete and modify
*/
public class ModuleManager extends AbstractDaemon {
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
index a9b3fa9bf8..e26a9e24bc 100644
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/conf/InstallerConfiguration.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
/**
* Installer configuration. Only one instance in the process.
- * Basically it use properties file to store configurations.
+ * Basically it use properties file to save configurations.
*/
public class InstallerConfiguration extends AbstractConfiguration {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index c187c4d143..376200e475 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -155,7 +155,7 @@ public class LogFileSource extends AbstractSource {
fileName);
offset = 0;
} else {
- LOGGER.info("getInitLineOffset inode no change taskId {} from
db {}, file {}", taskId, offset,
+ LOGGER.info("getInitLineOffset inode no change taskId {} from
offset store {}, file {}", taskId, offset,
fileName);
}
} else {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index 2de853b596..c463543bf5 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -23,10 +23,10 @@ import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
@@ -44,7 +44,7 @@ public abstract class AbstractTask extends Task {
public static final int CORE_THREAD_PRINT_TIME = 10000;
protected static final int DEFAULT_INSTANCE_LIMIT = 1;
protected TaskProfile taskProfile;
- protected Db basicDb;
+ protected Store basicStore;
protected TaskManager taskManager;
private InstanceManager instanceManager;
protected volatile boolean running = false;
@@ -53,13 +53,13 @@ public abstract class AbstractTask extends Task {
protected long auditVersion;
@Override
- public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
+ public void init(Object srcManager, TaskProfile taskProfile, Store
basicStore) throws IOException {
taskManager = (TaskManager) srcManager;
this.taskProfile = taskProfile;
- this.basicDb = basicDb;
+ this.basicStore = basicStore;
auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
instanceManager = new InstanceManager(taskProfile.getTaskId(),
getInstanceLimit(),
- basicDb, taskManager.getTaskDb());
+ basicStore, taskManager.getTaskStore());
try {
instanceManager.start();
} catch (Exception e) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
index fad0f98f2a..8f333b8d46 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java
@@ -18,8 +18,8 @@
package org.apache.inlong.agent.plugin.task;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.store.Store;
/**
* Generate job by crontab expression.
@@ -27,7 +27,7 @@ import org.apache.inlong.agent.plugin.file.Task;
public class CronTask extends Task {
@Override
- public void init(Object srcManager, TaskProfile profile, Db basicDb) {
+ public void init(Object srcManager, TaskProfile profile, Store basicStore)
{
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
index 14a86dba65..740d2364aa 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
@@ -21,9 +21,9 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.db.Db;
-import org.apache.inlong.agent.db.RocksDbImp;
-import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.store.RocksStoreImp;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.store.TaskStore;
import java.util.List;
@@ -31,13 +31,13 @@ public class RocksDBUtils {
public static void main(String[] args) {
AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
- Db db = new RocksDbImp(
+ Store store = new RocksStoreImp(
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH));
- upgrade(db);
+ upgrade(store);
}
- public static void upgrade(Db db) {
- TaskProfileDb triggerProfileDb = new TaskProfileDb(db);
+ public static void upgrade(Store store) {
+ TaskStore triggerProfileDb = new TaskStore(store);
List<TaskProfile> allTaskProfiles = triggerProfileDb.getTasks();
allTaskProfiles.forEach(triggerProfile -> {
if (triggerProfile.hasKey(TaskConstants.TASK_DIR_FILTER_PATTERN)) {
@@ -50,6 +50,6 @@ public class RocksDBUtils {
});
}
- public static void printTrigger(Db db) {
+ public static void printTrigger(Store store) {
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 1cd3a2b927..70ab3170f9 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -25,10 +25,10 @@ import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.task.MemoryManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskManager;
-import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.store.Store;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
@@ -57,21 +57,21 @@ public class TestLogFileSource {
private static final Gson GSON = new Gson();
private static final String[] check = {"hello line-end-symbol aa", "world
line-end-symbol",
"agent line-end-symbol"};
- // task basic db
- private static Db taskBasicDb;
- // instance basic db
- private static Db instanceBasicDb;
- // offset basic db
- private static Db offsetBasicDb;
+ // task basic store
+ private static Store taskBasicStore;
+ // instance basic store
+ private static Store instanceBasicStore;
+ // offset basic store
+ private static Store offsetBasicStore;
@BeforeClass
public static void setup() {
helper = new
AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome();
- taskBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
- instanceBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
- offsetBasicDb =
- TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
- OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb);
+ taskBasicStore =
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+ instanceBasicStore =
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
+ offsetBasicStore =
+
TaskManager.initStore(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
+ OffsetManager.init(taskBasicStore, instanceBasicStore,
offsetBasicStore);
}
private LogFileSource getSource(int taskId, long offset) {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index b538c69f61..c1b8fe5c0b 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -95,8 +95,8 @@ public class TestLogFileTask {
return null;
}).when(task, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
Assert.assertTrue(task.isProfileValid(taskProfile));
- manager.getTaskDb().storeTask(taskProfile);
- task.init(manager, taskProfile, manager.getInstanceBasicDb());
+ manager.getTaskStore().storeTask(taskProfile);
+ task.init(manager, taskProfile, manager.getInstanceBasicStore());
EXECUTOR_SERVICE.submit(task);
} catch (Exception e) {
LOGGER.error("source init error {}", e);