This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f01af15cc8 [INLONG-9415][Agent] Increase audit reports related to
instance maintenance (#9416)
f01af15cc8 is described below
commit f01af15cc84a0deaf33b8cb045c4bcb72a673f43
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Dec 5 14:10:41 2023 +0800
[INLONG-9415][Agent] Increase audit reports related to instance maintenance
(#9416)
---
.../apache/inlong/agent/conf/InstanceProfile.java | 12 +++++++++
.../inlong/agent/metrics/audit/AuditUtils.java | 5 ++++
.../agent/core/instance/InstanceManager.java | 30 +++++++++++++++++++---
.../plugin/sinks/filecollect/AbstractSink.java | 8 ++----
.../agent/plugin/sources/file/AbstractSource.java | 8 ++----
.../plugin/sources/reader/file/AbstractReader.java | 8 ++----
6 files changed, 49 insertions(+), 22 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index acc6444aba..06d783c953 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -32,6 +32,10 @@ import org.slf4j.LoggerFactory;
import java.util.List;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC;
@@ -105,6 +109,14 @@ public class InstanceProfile extends AbstractConfiguration
implements Comparable
return get(TaskConstants.PREDEFINE_FIELDS, "");
}
+ public String getInlongGroupId() {
+ return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+ }
+
+ public String getInlongStreamId() {
+ return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+ }
+
@Override
public boolean allRequiredKeyExist() {
return hasKey(TaskConstants.FILE_UPDATE_TIME);
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index eda8754b1d..faaf3400c6 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -46,6 +46,11 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48;
public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026;
+ public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49;
+ public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50;
+ public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51;
+ public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52;
+ public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028;
private static boolean IS_AUDIT = true;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index e600c5b671..1ed8e86cf8 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -26,6 +26,7 @@ import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.InstanceDb;
import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
@@ -218,7 +219,7 @@ public class InstanceManager extends AbstractDaemon {
LOGGER.info("instance has expired, delete from db dataTime {}
taskId {} instanceId {}",
instanceFromDb.getSourceDataTime(),
instanceFromDb.getTaskId(),
instanceFromDb.getInstanceId());
- instanceDb.deleteInstance(instanceFromDb.getTaskId(),
instanceFromDb.getInstanceId());
+ deleteFromDb(instanceFromDb.getInstanceId());
iterator.remove();
}
}
@@ -347,14 +348,14 @@ public class InstanceManager extends AbstractDaemon {
profile.getInstanceId());
return;
}
- addToDb(profile);
+ addToDb(profile, true);
addToMemory(profile);
}
private void finishInstance(InstanceProfile profile) {
profile.setState(InstanceStateEnum.FINISHED);
profile.setModifyTime(AgentUtils.getCurrentTime());
- addToDb(profile);
+ addToDb(profile, false);
deleteFromMemory(profile.getInstanceId());
LOGGER.info("finished instance state {} taskId {} instanceId {}",
profile.getState(),
profile.getTaskId(), profile.getInstanceId());
@@ -366,9 +367,14 @@ public class InstanceManager extends AbstractDaemon {
}
private void deleteFromDb(String instanceId) {
+ InstanceProfile profile = instanceDb.getInstance(taskId, instanceId);
+ String inlongGroupId = profile.getInlongGroupId();
+ String inlongStreamId = profile.getInlongStreamId();
instanceDb.deleteInstance(taskId, instanceId);
LOGGER.info("delete instance from db: taskId {} instanceId {} result
{}", taskId,
instanceId, instanceDb.getInstance(taskId, instanceId));
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB,
inlongGroupId, inlongStreamId,
+ profile.getSinkDataTime(), 1, 1);
}
private void deleteFromMemory(String instanceId) {
@@ -378,26 +384,40 @@ public class InstanceManager extends AbstractDaemon {
instanceId);
return;
}
+ String inlongGroupId = instance.getProfile().getInlongGroupId();
+ String inlongStreamId = instance.getProfile().getInlongStreamId();
instance.destroy();
instanceMap.remove(instanceId);
LOGGER.info("delete instance from memory: taskId {} instanceId {}",
taskId, instance.getInstanceId());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM,
inlongGroupId, inlongStreamId,
+ instance.getProfile().getSinkDataTime(), 1, 1);
}
- private void addToDb(InstanceProfile profile) {
+ private void addToDb(InstanceProfile profile, boolean addNew) {
LOGGER.info("add instance to db state {} instanceId {}",
profile.getState(), profile.getInstanceId());
instanceDb.storeInstance(profile);
+ if (addNew) {
+ String inlongGroupId = profile.getInlongGroupId();
+ String inlongStreamId = profile.getInlongStreamId();
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB,
inlongGroupId, inlongStreamId,
+ profile.getSinkDataTime(), 1, 1);
+ }
}
/**
* add instance to memory, if there is a record refer to the instance id
exist we need to destroy it first.
*/
private void addToMemory(InstanceProfile instanceProfile) {
+ String inlongGroupId = instanceProfile.getInlongGroupId();
+ String inlongStreamId = instanceProfile.getInlongStreamId();
Instance oldInstance =
instanceMap.get(instanceProfile.getInstanceId());
if (oldInstance != null) {
oldInstance.destroy();
instanceMap.remove(instanceProfile.getInstanceId());
LOGGER.error("old instance {} should not exist, try stop it first",
instanceProfile.getInstanceId());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL,
inlongGroupId, inlongStreamId,
+ instanceProfile.getSinkDataTime(), 1, 1);
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
@@ -410,6 +430,8 @@ public class InstanceManager extends AbstractDaemon {
"add instance to memory instanceId {} instanceMap size {},
runningPool instance total {}, runningPool instance active {}",
instance.getInstanceId(), instanceMap.size(),
EXECUTOR_SERVICE.getTaskCount(),
EXECUTOR_SERVICE.getActiveCount());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM,
inlongGroupId, inlongStreamId,
+ instanceProfile.getSinkDataTime(), 1, 1);
} catch (Throwable t) {
LOGGER.error("add instance error {}", t.getMessage());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
index a92dd770e3..369f2a66d7 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
@@ -32,11 +32,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -72,8 +68,8 @@ public abstract class AbstractSink implements Sink {
public void init(InstanceProfile profile) {
this.profile = profile;
jobInstanceId = profile.getInstanceId();
- inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID,
DEFAULT_PROXY_INLONG_GROUP_ID);
- inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID,
DEFAULT_PROXY_INLONG_STREAM_ID);
+ inlongGroupId = profile.getInlongGroupId();
+ inlongStreamId = profile.getInlongStreamId();
cache = new ProxyMessageCache(this.profile, inlongGroupId,
inlongStreamId);
batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 1d085424d5..5dc79377de 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -27,10 +27,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -48,8 +44,8 @@ public abstract class AbstractSource implements Source {
@Override
public void init(InstanceProfile profile) {
- inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID,
DEFAULT_PROXY_INLONG_GROUP_ID);
- inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID,
DEFAULT_PROXY_INLONG_STREAM_ID);
+ inlongGroupId = profile.getInlongGroupId();
+ inlongStreamId = profile.getInlongStreamId();
// register metric
this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
index a8987b8eb3..611c29945b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
@@ -34,10 +34,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -59,8 +55,8 @@ public abstract class AbstractReader implements Reader {
@Override
public void init(InstanceProfile profile) {
- inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID,
DEFAULT_PROXY_INLONG_GROUP_ID);
- inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID,
DEFAULT_PROXY_INLONG_STREAM_ID);
+ inlongGroupId = profile.getInlongGroupId();
+ inlongStreamId = profile.getInlongStreamId();
this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());