This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ef594ccf4a [INLONG-9467][Agent] Improve code exception detection to
ensure task and instance state transitions (#9468)
ef594ccf4a is described below
commit ef594ccf4a5e2f48fbea039e5db68e1ce225516e
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Dec 14 10:28:46 2023 +0800
[INLONG-9467][Agent] Improve code exception detection to ensure task and
instance state transitions (#9468)
---
.../inlong/agent/metrics/audit/AuditUtils.java | 1 +
.../org/apache/inlong/agent/plugin/Instance.java | 2 +-
.../agent/core/instance/InstanceManager.java | 25 +++--
.../inlong/agent/core/instance/MockInstance.java | 3 +-
.../inlong/agent/plugin/instance/FileInstance.java | 22 +++--
.../inlong/agent/plugin/sources/LogFileSource.java | 110 +++++++++++----------
.../task/filecollect/LogFileCollectTask.java | 10 +-
7 files changed, 104 insertions(+), 69 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index 290d3b71bb..c2d946b923 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -55,6 +55,7 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
+ public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015;
private static boolean IS_AUDIT = true;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
index 90bac4c94f..990d7e60b2 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
@@ -32,7 +32,7 @@ public abstract class Instance extends AbstractStateWrapper {
*
* @throws IOException
*/
- public abstract void init(Object instanceManager, InstanceProfile profile);
+ public abstract boolean init(Object instanceManager, InstanceProfile
profile);
/**
* destroy instance.
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 3a86f32fc2..3b74cf4e48 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -429,15 +429,22 @@ public class InstanceManager extends AbstractDaemon {
try {
Class<?> taskClass =
Class.forName(instanceProfile.getInstanceClass());
Instance instance = (Instance) taskClass.newInstance();
- instance.init(this, instanceProfile);
- instanceMap.put(instanceProfile.getInstanceId(), instance);
- EXECUTOR_SERVICE.submit(instance);
- LOGGER.info(
- "add instance to memory instanceId {} instanceMap size {},
runningPool instance total {}, runningPool instance active {}",
- instance.getInstanceId(), instanceMap.size(),
EXECUTOR_SERVICE.getTaskCount(),
- EXECUTOR_SERVICE.getActiveCount());
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM,
inlongGroupId, inlongStreamId,
- instanceProfile.getSinkDataTime(), 1, 1);
+ boolean initSuc = instance.init(this, instanceProfile);
+ if (initSuc) {
+ instanceMap.put(instanceProfile.getInstanceId(), instance);
+ EXECUTOR_SERVICE.submit(instance);
+ LOGGER.info(
+ "add instance to memory instanceId {} instanceMap size
{}, runningPool instance total {}, runningPool instance active {}",
+ instance.getInstanceId(), instanceMap.size(),
EXECUTOR_SERVICE.getTaskCount(),
+ EXECUTOR_SERVICE.getActiveCount());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM,
inlongGroupId, inlongStreamId,
+ instanceProfile.getSinkDataTime(), 1, 1);
+ } else {
+ LOGGER.error(
+ "add instance to memory init failed instanceId {}",
instance.getInstanceId());
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED,
inlongGroupId, inlongStreamId,
+ instanceProfile.getSinkDataTime(), 1, 1);
+ }
} catch (Throwable t) {
LOGGER.error("add instance error {}", t.getMessage());
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
index 5e9bbbab03..dc4e16bebc 100644
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
@@ -37,11 +37,12 @@ public class MockInstance extends Instance {
private InstanceManager instanceManager;
@Override
- public void init(Object instanceManager, InstanceProfile profile) {
+ public boolean init(Object instanceManager, InstanceProfile profile) {
this.instanceManager = (InstanceManager) instanceManager;
this.profile = profile;
LOGGER.info("init called " + index);
initTime = index.getAndAdd(1);
+ return true;
}
@Override
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 23566acd9f..1f2a2cfc40 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -56,7 +56,7 @@ public class FileInstance extends Instance {
private volatile int checkFinishCount = 0;
@Override
- public void init(Object srcManager, InstanceProfile srcProfile) {
+ public boolean init(Object srcManager, InstanceProfile srcProfile) {
try {
instanceManager = (InstanceManager) srcManager;
profile = srcProfile;
@@ -68,11 +68,13 @@ public class FileInstance extends Instance {
sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
sink.init(profile);
inited = true;
- } catch (Throwable ex) {
+ return true;
+ } catch (Throwable e) {
+ handleSourceDeleted();
doChangeState(State.FATAL);
- LOGGER.error("init instance {} for task {} failed",
profile.getInstanceId(), profile.getInstanceId(),
- ex);
- ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+ LOGGER.error("init instance {} for task {} failed",
profile.getInstanceId(), profile.getInstanceId(), e);
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+ return false;
}
}
@@ -93,6 +95,15 @@ public class FileInstance extends Instance {
public void run() {
Thread.currentThread().setName("file-instance-core-" + getTaskId() +
"-" + getInstanceId());
running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error: ", e);
+ }
+ running = false;
+ }
+
+ private void doRun() {
while (!isFinished()) {
if (!source.sourceExist()) {
handleSourceDeleted();
@@ -118,7 +129,6 @@ public class FileInstance extends Instance {
sink.write(msg);
}
}
- running = false;
}
private void handleReadEnd() {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index e4de812a8a..808edb7bde 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -181,7 +181,7 @@ public class LogFileSource extends AbstractSource {
} catch (Exception ex) {
LOGGER.error("init metadata error", ex);
}
- EXECUTOR_SERVICE.execute(coreThread());
+ EXECUTOR_SERVICE.execute(run());
} catch (Exception ex) {
stopRunning();
throw new FileException("error init stream for " + file.getPath(),
ex);
@@ -435,63 +435,71 @@ public class LogFileSource extends AbstractSource {
return false;
}
- public Runnable coreThread() {
+ private Runnable run() {
return () -> {
AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" +
file);
running = true;
- long lastPrintTime = 0;
- while (isRunnable() && fileExist) {
- if (isInodeChanged()) {
- fileExist = false;
- LOGGER.info("inode changed, instance will restart and
offset will be clean, file {}",
- fileName);
- break;
- }
- if (file.length() < bytePosition) {
- fileExist = false;
- LOGGER.info("file rotate, instance will restart and offset
will be clean, file {}",
- fileName);
- break;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error maybe file deleted: ", e);
+ }
+ running = false;
+ };
+ }
+
+ private void doRun() {
+ long lastPrintTime = 0;
+ while (isRunnable() && fileExist) {
+ if (isInodeChanged()) {
+ fileExist = false;
+ LOGGER.info("inode changed, instance will restart and offset
will be clean, file {}",
+ fileName);
+ break;
+ }
+ if (file.length() < bytePosition) {
+ fileExist = false;
+ LOGGER.info("file rotate, instance will restart and offset
will be clean, file {}",
+ fileName);
+ break;
+ }
+ boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ if (!suc) {
+ break;
+ }
+ List<SourceData> lines = null;
+ try {
+ lines = readFromPos(bytePosition);
+ } catch (FileNotFoundException e) {
+ fileExist = false;
+ LOGGER.error("readFromPos file deleted error: ", e);
+ } catch (IOException e) {
+ LOGGER.error("readFromPos error: ", e);
+ }
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
+ if (lines.isEmpty()) {
+ if (queue.isEmpty()) {
+ emptyCount++;
+ } else {
+ emptyCount = 0;
}
- boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
- if (!suc) {
+ AgentUtils.silenceSleepInSeconds(1);
+ continue;
+ }
+ emptyCount = 0;
+ for (int i = 0; i < lines.size(); i++) {
+ boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
+ if (!suc4Queue) {
break;
}
- List<SourceData> lines = null;
- try {
- lines = readFromPos(bytePosition);
- } catch (FileNotFoundException e) {
- fileExist = false;
- LOGGER.error("readFromPos file deleted {}",
e.getMessage());
- } catch (IOException e) {
- LOGGER.error("readFromPos error {}", e.getMessage());
- }
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
- if (lines.isEmpty()) {
- if (queue.isEmpty()) {
- emptyCount++;
- } else {
- emptyCount = 0;
- }
- AgentUtils.silenceSleepInSeconds(1);
- continue;
- }
- emptyCount = 0;
- for (int i = 0; i < lines.size(); i++) {
- boolean suc4Queue =
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
- if (!suc4Queue) {
- break;
- }
- putIntoQueue(lines.get(i));
- }
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
- lastPrintTime = AgentUtils.getCurrentTime();
- LOGGER.info("path is {}, linePosition {}, bytePosition is
{} file len {}, reads lines size {}",
- file.getName(), linePosition, bytePosition,
file.length(), lines.size());
- }
+ putIntoQueue(lines.get(i));
}
- running = false;
- };
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
+ lastPrintTime = AgentUtils.getCurrentTime();
+ LOGGER.info("path is {}, linePosition {}, bytePosition is {}
file len {}, reads lines size {}",
+ file.getName(), linePosition, bytePosition,
file.length(), lines.size());
+ }
+ }
}
private void putIntoQueue(SourceData sourceData) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index c506d698d0..52eba9ea80 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -248,6 +248,15 @@ public class LogFileCollectTask extends Task {
public void run() {
Thread.currentThread().setName("directory-task-core-" + getTaskId());
running = true;
+ try {
+ doRun();
+ } catch (Throwable e) {
+ LOGGER.error("do run error: ", e);
+ }
+ running = false;
+ }
+
+ private void doRun() {
while (!isFinished()) {
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
LOGGER.info("log file task running! taskId {}", getTaskId());
@@ -268,7 +277,6 @@ public class LogFileCollectTask extends Task {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT,
inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
}
- running = false;
}
private void runForRetry() {