This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e87f3605d4 [INLONG-9366][Agent] Remove useless offset record (#9367)
e87f3605d4 is described below
commit e87f3605d485b14a4c548de3afcb1c375650530d
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 30 14:05:42 2023 +0800
[INLONG-9366][Agent] Remove useless offset record (#9367)
---
.../java/org/apache/inlong/agent/db/OffsetDb.java | 20 +++++--
.../org/apache/inlong/agent/core/AgentManager.java | 2 -
.../agent/core/instance/InstanceManager.java | 2 +-
.../inlong/agent/core/task/OffsetManager.java | 64 ++++++++++++++++++++--
.../inlong/agent/core/task/file/TaskManager.java | 3 +
.../inlong/agent/plugin/instance/FileInstance.java | 2 +-
.../agent/plugin/sinks/filecollect/ProxySink.java | 2 +-
.../inlong/agent/plugin/sources/LogFileSource.java | 1 -
.../agent/plugin/sources/TestLogFileSource.java | 12 ++++
9 files changed, 90 insertions(+), 18 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
index 5ceeb2e4ea..5c31a2f88a 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/OffsetDb.java
@@ -17,9 +17,7 @@
package org.apache.inlong.agent.db;
-import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.OffsetProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -27,6 +25,9 @@ import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* db interface for task profile.
*/
@@ -34,11 +35,9 @@ public class OffsetDb {
private static final Logger LOGGER =
LoggerFactory.getLogger(OffsetDb.class);
private final Db db;
- private final AgentConfiguration agentConf;
- public OffsetDb() {
- agentConf = AgentConfiguration.getAgentConf();
- db = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+ public OffsetDb(Db db) {
+ this.db = db;
}
/**
@@ -54,6 +53,15 @@ public class OffsetDb {
}
}
+ public List<OffsetProfile> listAllOffsets() {
+ List<KeyValueEntity> result = this.db.findAll("");
+ List<OffsetProfile> offsetList = new ArrayList<>();
+ for (KeyValueEntity entity : result) {
+ offsetList.add(entity.getAsOffsetProfile());
+ }
+ return offsetList;
+ }
+
public OffsetProfile getOffset(String taskId, String instanceId) {
KeyValueEntity result = db.get(getKey(taskId, instanceId));
if (result == null) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 5c28e172b6..29c000dcc4 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.file.TaskManager;
import org.slf4j.Logger;
@@ -48,7 +47,6 @@ public class AgentManager extends AbstractDaemon {
public AgentManager() {
conf = AgentConfiguration.getAgentConf();
agentConfMonitor = Executors.newSingleThreadExecutor();
- OffsetManager.init();
taskManager = new TaskManager();
fetcher = initFetcher(this);
heartbeatManager = HeartbeatManager.getInstance(this);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 260e5a477f..9f5773d587 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -58,7 +58,7 @@ public class InstanceManager extends AbstractDaemon {
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000;
private long lastCleanTime = 0;
- public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "-3";
+ public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
// instance in db
private final InstanceDb instanceDb;
TaskProfileDb taskProfileDb;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 0c4ca513ce..fca223b873 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -17,38 +17,80 @@
package org.apache.inlong.agent.core.task;
+import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.InstanceDb;
import org.apache.inlong.agent.db.OffsetDb;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* used to store instance offset to db
* where key is task id + read file name and value is instance offset
*/
-public class OffsetManager {
+public class OffsetManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(OffsetManager.class);
+ public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
private static volatile OffsetManager offsetManager = null;
private final OffsetDb offsetDb;
+ // instance in db
+ private final InstanceDb instanceDb;
+
+ private OffsetManager(Db offsetBasicDb, Db instanceBasicDb) {
+ this.offsetDb = new OffsetDb(offsetBasicDb);
+ instanceDb = new InstanceDb(instanceBasicDb);
+ }
- private OffsetManager() {
- this.offsetDb = new OffsetDb();
+ /**
+ * thread for core thread.
+ *
+ * @return runnable profile.
+ */
+ private Runnable coreThread() {
+ return () -> {
+ Thread.currentThread().setName("offset-manager-core");
+ while (isRunnable()) {
+ try {
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+ List<OffsetProfile> offsets = offsetDb.listAllOffsets();
+ offsets.forEach(offset -> {
+ String taskId = offset.getTaskId();
+ String instanceId = offset.getInstanceId();
+ InstanceProfile instanceProfile =
instanceDb.getInstance(taskId, instanceId);
+ if (instanceProfile == null) {
+ deleteOffset(taskId, instanceId);
+ LOGGER.info("instance not found, delete offset
taskId {} instanceId {}", taskId,
+ instanceId);
+ }
+ });
+ LOGGER.info("offsetManager running! offsets count {}",
offsets.size());
+ } catch (Throwable ex) {
+ LOGGER.error("offset-manager-core: ", ex);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
+ }
+ }
+ };
}
/**
* task position manager singleton, can only generated by agent manager
*/
- public static OffsetManager init() {
+ public static void init(Db offsetBasicDb, Db instanceBasicDb) {
if (offsetManager == null) {
synchronized (OffsetManager.class) {
if (offsetManager == null) {
- offsetManager = new OffsetManager();
+ offsetManager = new OffsetManager(offsetBasicDb,
instanceBasicDb);
}
}
}
- return offsetManager;
}
/**
@@ -72,4 +114,14 @@ public class OffsetManager {
public OffsetProfile getOffset(String taskId, String instanceId) {
return offsetDb.getOffset(taskId, instanceId);
}
+
+ @Override
+ public void start() throws Exception {
+ submitWorker(coreThread());
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ }
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 7027b798f8..e188e4f207 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.RocksDbImp;
@@ -131,6 +132,7 @@ public class TaskManager extends AbstractDaemon {
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
offsetBasicDb =
initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
+ OffsetManager.init(offsetBasicDb, instanceBasicDb);
this.runningPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -519,6 +521,7 @@ public class TaskManager extends AbstractDaemon {
public void start() throws Exception {
restoreFromDb();
submitWorker(coreThread());
+ OffsetManager.getInstance().start();
}
@Override
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 13aef15f9d..1477ebbbac 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -131,7 +131,7 @@ public class FileInstance extends Instance {
}
private void handleSourceDeleted() {
- OffsetManager.init().deleteOffset(getTaskId(), getInstanceId());
+ OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
profile.setState(InstanceStateEnum.DELETE);
profile.setModifyTime(AgentUtils.getCurrentTime());
InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index ad4f07258a..5a01f64fa9 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -182,7 +182,7 @@ public class ProxySink extends AbstractSink {
fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER,
DEFAULT_FIELD_SPLITTER).getBytes(
StandardCharsets.UTF_8);
sourceName = profile.getInstanceId();
- offsetManager = OffsetManager.init();
+ offsetManager = OffsetManager.getInstance();
senderManager = new SenderManager(profile, inlongGroupId, sourceName);
try {
senderManager.Start();
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 4b61d32fc4..7c719b1e47 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -142,7 +142,6 @@ public class LogFileSource extends AbstractSource {
private boolean isRealTime = false;
public LogFileSource() {
- OffsetManager.init();
}
@Override
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index 3397f9f58a..be69fbcfea 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -19,8 +19,12 @@ package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.file.MemoryManager;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
@@ -52,6 +56,10 @@ public class TestLogFileSource {
private static final String[] check = {"hello line-end-symbol aa", "world
line-end-symbol",
"agent line-end-symbol"};
private static InstanceProfile instanceProfile;
+ // instance basic db
+ private static Db instanceBasicDb;
+ // offset basic db
+ private static Db offsetBasicDb;
@BeforeClass
public static void setup() {
@@ -62,6 +70,10 @@ public class TestLogFileSource {
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
instanceProfile = taskProfile.createInstanceProfile("",
fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
+ instanceBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE);
+ offsetBasicDb =
+ TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET);
+ OffsetManager.init(offsetBasicDb, instanceBasicDb);
}
private LogFileSource getSource() {