This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a891202393 [INLONG-9214][Agent] Limit max file count to collect once
(#9216)
a891202393 is described below
commit a8912023936b5742ef6113f975ca0fa1eb6e5a83
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Nov 3 17:40:13 2023 +0800
[INLONG-9214][Agent] Limit max file count to collect once (#9216)
---
.../apache/inlong/agent/core/instance/InstanceManager.java | 11 +++++++----
.../inlong/agent/core/instance/TestInstanceManager.java | 2 +-
.../inlong/agent/plugin/task/filecollect/FileScanner.java | 14 ++++++--------
.../agent/plugin/task/filecollect/LogFileCollectTask.java | 5 +++--
.../org/apache/inlong/agent/plugin/utils/PluginUtils.java | 3 +--
5 files changed, 18 insertions(+), 17 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index e209535db8..de96b93bc1 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -21,7 +21,6 @@ import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.InstanceDb;
import org.apache.inlong.agent.plugin.Instance;
@@ -62,7 +61,7 @@ public class InstanceManager extends AbstractDaemon {
new SynchronousQueue<>(),
new AgentThreadFactory("instance-manager"));
- private final int taskMaxLimit;
+ private final int instanceLimit;
private final AgentConfiguration agentConf;
private final String taskId;
private volatile boolean runAtLeastOneTime = false;
@@ -71,12 +70,12 @@ public class InstanceManager extends AbstractDaemon {
/**
* Init task manager.
*/
- public InstanceManager(String taskId, Db basicDb) {
+ public InstanceManager(String taskId, int instanceLimit, Db basicDb) {
this.taskId = taskId;
instanceDb = new InstanceDb(basicDb);
this.agentConf = AgentConfiguration.getAgentConf();
instanceMap = new ConcurrentHashMap<>();
- taskMaxLimit = agentConf.getInt(AgentConstants.JOB_NUMBER_LIMIT,
AgentConstants.DEFAULT_JOB_NUMBER_LIMIT);
+ this.instanceLimit = instanceLimit;
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
}
@@ -236,6 +235,10 @@ public class InstanceManager extends AbstractDaemon {
}
private void addInstance(InstanceProfile profile) {
+ if (instanceMap.size() >= instanceLimit) {
+ LOGGER.error("instanceMap size {} over limit {}",
instanceMap.size(), instanceLimit);
+ return;
+ }
LOGGER.info("add instance taskId {} instanceId {}", taskId,
profile.getInstanceId());
if (!shouldAddAgain(profile.getInstanceId(),
profile.getFileUpdateTime())) {
LOGGER.info("shouldAddAgain returns false skip taskId {}
instanceId {}", taskId, profile.getInstanceId());
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index e858743cb3..b1107bb2ab 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -50,7 +50,7 @@ public class TestInstanceManager {
String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
Db basicDb = TaskManager.initDb("/localdb");
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING);
- manager = new InstanceManager("1", basicDb);
+ manager = new InstanceManager("1", 2, basicDb);
manager.start();
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index bfb1a7a80a..efa87e22dc 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -18,7 +18,6 @@
package org.apache.inlong.agent.plugin.task.filecollect;
import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
import org.apache.inlong.agent.plugin.utils.file.Files;
@@ -35,6 +34,8 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM;
+
/*
* This class is mainly used for scanning log file that we want to read. We
use this class at
* inlong_agent recover process, the do and redo tasks and the current log
file access when we deploy a
@@ -69,14 +70,12 @@ public class FileScanner {
logger.info("task {} this scan time is between {} and {}.",
new Object[]{conf.getTaskId(), startTime, endTime});
- return scanTaskBetweenTimes(conf, originPattern, startTime, endTime);
+ return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern,
startTime, endTime);
}
/* Scan log files and create tasks between two times. */
- public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf,
String originPattern, String startTime,
+ public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit,
String originPattern, String startTime,
String endTime) {
- String cycleUnit = conf.getCycleUnit();
- int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime,
cycleUnit);
List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>();
for (Long time : dateRegion) {
@@ -87,7 +86,7 @@ public class FileScanner {
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator +
allPaths.get(1);
ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir,
secondDir, filename, 3,
- maxFileNum);
+ DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
String dataTime = NewDateUtils.millSecConvertToTimeStr(time,
cycleUnit);
@@ -100,8 +99,7 @@ public class FileScanner {
return infos;
}
- public static ArrayList<String> scanFile(TaskProfile conf, String
originPattern, long dataTime) {
- int maxFileNum = conf.getInt(TaskConstants.FILE_MAX_NUM);
+ public static ArrayList<String> scanFile(int maxFileNum, String
originPattern, long dataTime) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(dataTime);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 54170a5290..1629b89595 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -113,7 +113,8 @@ public class LogFileCollectTask extends Task {
retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
.collect(Collectors.toSet());
- instanceManager = new InstanceManager(taskProfile.getTaskId(),
basicDb);
+ instanceManager = new InstanceManager(taskProfile.getTaskId(),
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
+ basicDb);
try {
instanceManager.start();
} catch (Exception e) {
@@ -409,7 +410,7 @@ public class LogFileCollectTask extends Task {
continue;
}
if (Files.isDirectory(child)) {
- LOGGER.warn("The find creation event is triggered by a
directory: " + child
+ LOGGER.info("The find creation event is triggered by a
directory: " + child
.getFileName());
entity.registerRecursively(child);
continue;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 74f7f238db..e08313c10b 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -55,7 +55,6 @@ import static
org.apache.inlong.agent.constant.KubernetesConstants.HTTPS;
import static
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST;
import static
org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT;
import static
org.apache.inlong.agent.constant.TaskConstants.FILE_DIR_FILTER_PATTERNS;
-import static org.apache.inlong.agent.constant.TaskConstants.FILE_MAX_NUM;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_RETRY_TIME;
import static
org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_TIME_OFFSET;
@@ -97,7 +96,7 @@ public class PluginUtils {
Set<PathPattern> pathPatterns =
PathPattern.buildPathPattern(dirPatterns,
jobConf.get(TASK_FILE_TIME_OFFSET, null));
updateRetryTime(jobConf, pathPatterns);
- int maxFileNum = jobConf.getInt(FILE_MAX_NUM, DEFAULT_FILE_MAX_NUM);
+ int maxFileNum = DEFAULT_FILE_MAX_NUM;
LOGGER.info("dir pattern {}, max file num {}", dirPatterns,
maxFileNum);
Collection<File> allFiles = new ArrayList<>();
pathPatterns.forEach(pathPattern ->
allFiles.addAll(pathPattern.walkSuitableFiles(maxFileNum)));