This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 31a6f8bcbc [INLONG-10535][Agent] Support minute level tasks (#10536)
31a6f8bcbc is described below
commit 31a6f8bcbcd61b9571136ffbd18475a47430c807
Author: justinwwhuang <[email protected]>
AuthorDate: Sat Jun 29 16:15:23 2024 +0800
[INLONG-10535][Agent] Support minute level tasks (#10536)
Co-authored-by: AloysZhang <[email protected]>
---
.../inlong/agent/constant/CycleUnitType.java | 5 +-
.../apache/inlong/agent/utils/DateTransUtils.java | 37 ++----
.../agent/plugin/fetcher/ManagerFetcher.java | 34 +++---
.../inlong/agent/plugin/task/file/FileScanner.java | 19 +--
.../inlong/agent/plugin/task/file/LogFileTask.java | 4 +-
.../inlong/agent/plugin/task/file/WatchEntity.java | 4 +-
.../agent/plugin/utils/file/FilePathUtil.java | 8 +-
.../agent/plugin/utils/file/NewDateUtils.java | 128 ++++-----------------
.../inlong/agent/plugin/AgentBaseTestsHelper.java | 3 +-
.../inlong/agent/plugin/task/TestLogFileTask.java | 101 +++++++++-------
.../inlong/agent/plugin/utils/TestUtils.java | 97 +++++++++++-----
.../resources/testScan/202309281030_1/test_1.txt | 3 +
.../resources/testScan/2023092810_1/test_1.txt | 3 +
.../resources/testScan/202309301059_1/test_1.txt | 3 +
.../src/test/resources/testScan/20241030/23/59.txt | 3 +
15 files changed, 203 insertions(+), 249 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
index d12e825b85..4572464fbf 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
@@ -19,7 +19,8 @@ package org.apache.inlong.agent.constant;
public class CycleUnitType {
- public static final String DAY = "D";
- public static final String HOUR = "h";
+ public static final String DAY = "d";
+ public static final String HOUR = "H";
+ public static final String MINUTE = "m";
public static final String REAL_TIME = "R";
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index fe6257d64e..cd63d74318 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.utils;
+import org.apache.inlong.agent.constant.CycleUnitType;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,15 +47,11 @@ public class DateTransUtils {
throws ParseException {
long retTime = 0;
SimpleDateFormat df = null;
- if (cycleUnit.equals("Y") && time.length() == 4) {
- df = new SimpleDateFormat("yyyy");
- } else if (cycleUnit.equals("M") && time.length() == 6) {
- df = new SimpleDateFormat("yyyyMM");
- } else if (cycleUnit.equals("D") && time.length() == 8) {
+ if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY) && time.length() ==
8) {
df = new SimpleDateFormat("yyyyMMdd");
- } else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) {
+ } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR) &&
time.length() == 10) {
df = new SimpleDateFormat("yyyyMMddHH");
- } else if (cycleUnit.contains("m") && time.length() == 12) {
+ } else if (cycleUnit.equals(CycleUnitType.MINUTE) && time.length() ==
12) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("time {}, cycleUnit {} can't parse!", time,
cycleUnit);
@@ -77,15 +75,11 @@ public class DateTransUtils {
Date dateTime = calendarInstance.getTime();
SimpleDateFormat df = null;
- if ("Y".equalsIgnoreCase(cycleUnit)) {
- df = new SimpleDateFormat("yyyy");
- } else if ("M".equals(cycleUnit)) {
- df = new SimpleDateFormat("yyyyMM");
- } else if ("D".equalsIgnoreCase(cycleUnit)) {
+ if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMdd");
- } else if ("h".equalsIgnoreCase(cycleUnit)) {
+ } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMddHH");
- } else if (cycleUnit.contains("m")) {
+ } else if (CycleUnitType.MINUTE.equals(cycleUnit)) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
logger.error("cycleUnit {} can't parse!", cycleUnit);
@@ -93,21 +87,6 @@ public class DateTransUtils {
}
df.setTimeZone(tz);
retTime = df.format(dateTime);
-
- if (cycleUnit.contains("m")) {
- int cycleNum = Integer.parseInt(cycleUnit.substring(0,
- cycleUnit.length() - 1));
- int mmTime = Integer.parseInt(retTime.substring(
- retTime.length() - 2, retTime.length()));
- String realMMTime = "";
- if (cycleNum * (mmTime / cycleNum) <= 0) {
- realMMTime = "0" + cycleNum * (mmTime / cycleNum);
- } else {
- realMMTime = "" + cycleNum * (mmTime / cycleNum);
- }
- retTime = retTime.substring(0, retTime.length() - 2) + realMMTime;
- }
-
return retTime;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 812e4c2513..d31126860e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -217,7 +218,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
String endStr = "2023-07-22 00:00:00";
Long start = 0L;
Long end = 0L;
- String normalPattern = testDir + "YYYY/YYYYMMDD_2.log_[0-9]+";
+ String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+";
String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+";
try {
Date parse = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").parse(startStr);
@@ -227,28 +228,31 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
} catch (ParseException e) {
e.printStackTrace();
}
- configs.add(getTestDataConfig(normalTaskId, normalPattern, false,
start, end, state));
- configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start,
end, state));
+ configs.add(getTestDataConfig(normalTaskId, normalPattern, false,
start, end, CycleUnitType.MINUTE, state));
+ configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start,
end, CycleUnitType.DAY, state));
return TaskResult.builder().dataConfigs(configs).build();
}
private DataConfig getTestDataConfig(int taskId, String pattern, boolean
retry, Long startTime, Long endTime,
- int state) {
+ String cycleUnit, int state) {
DataConfig dataConfig = new DataConfig();
- dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
- dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
- dataConfig.setDataReportType(1); // 老字段 reportType
- dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
- dataConfig.setTaskId(taskId); // 老字段 任务 id
- dataConfig.setState(state); // 新增! 任务状态 1 正常 2 暂停
+ dataConfig.setInlongGroupId("devcloud_group_id");
+ dataConfig.setInlongStreamId("devcloud_stream_id");
+ dataConfig.setDataReportType(0);
+ dataConfig.setTaskType(3);
+ dataConfig.setTaskId(taskId);
+ dataConfig.setState(state);
+ dataConfig.setTimeZone("GMT+8:00");
FileTaskConfig fileTaskConfig = new FileTaskConfig();
- fileTaskConfig.setPattern(pattern);// 正则
- fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2
小时前的
- fileTaskConfig.setMaxFileCount(100); // 最大文件数
- fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
- fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
+ fileTaskConfig.setPattern(pattern);
+ fileTaskConfig.setTimeOffset("0d");
+ fileTaskConfig.setMaxFileCount(100);
+ fileTaskConfig.setCycleUnit(cycleUnit);
+ fileTaskConfig.setRetry(retry);
fileTaskConfig.setStartTime(startTime);
fileTaskConfig.setEndTime(endTime);
+ fileTaskConfig.setDataContentStyle("CSV");
+ fileTaskConfig.setDataSeparator("|");
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
index 58328540d4..e37b6deb89 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java
@@ -94,11 +94,11 @@ public class FileScanner {
for (Long time : dateRegion) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
- String filename = NewDateUtils.replaceDateExpression(calendar,
originPattern);
- ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
+ String fileName = NewDateUtils.replaceDateExpression(calendar,
originPattern);
+ ArrayList<String> allPaths =
FilePathUtil.cutDirectoryByWildcard(fileName);
String firstDir = allPaths.get(0);
String secondDir = allPaths.get(0) + File.separator +
allPaths.get(1);
- ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir,
secondDir, filename, 3,
+ ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir,
secondDir, fileName, 3,
DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
@@ -111,17 +111,6 @@ public class FileScanner {
return infos;
}
- public static ArrayList<String> scanFile(int maxFileNum, String
originPattern, long dataTime) {
- Calendar calendar = Calendar.getInstance();
- calendar.setTimeInMillis(dataTime);
-
- String filename = NewDateUtils.replaceDateExpression(calendar,
originPattern);
- ArrayList<String> allPaths = FilePathUtil.cutDirectory(filename);
- String firstDir = allPaths.get(0);
- String secondDir = allPaths.get(0) + File.separator + allPaths.get(1);
- return getUpdatedOrNewFiles(firstDir, secondDir, filename, 3,
maxFileNum);
- }
-
private static ArrayList<String> getUpdatedOrNewFiles(String firstDir,
String secondDir,
String fileName, long depth, int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
@@ -151,7 +140,7 @@ public class FileScanner {
int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
ArrayList<String> directories = FilePathUtil
- .getDirectoryLayers(logFileName);
+ .cutDirectoryByWildcardAndDateExpression(logFileName);
String parentDir = directories.get(0) + File.separator
+ directories.get(1);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 44495b6059..44cff75f2f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -182,7 +182,7 @@ public class LogFileTask extends AbstractTask {
}
public void addPathPattern(String originPattern) {
- ArrayList<String> directories =
FilePathUtil.getDirectoryLayers(originPattern);
+ ArrayList<String> directories =
FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
String basicStaticPath = directories.get(0);
LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern,
basicStaticPath});
/* Remember the failed watcher creations. */
@@ -530,7 +530,7 @@ public class LogFileTask extends AbstractTask {
PathDateExpression dateExpression = entity.getDateExpression();
if (dateExpression.getLongestDatePattern().length() != 0) {
String dataTime = getDataTimeFromFileName(newFileName,
entity.getOriginPattern(), dateExpression);
- LOGGER.info("file {} ,fileTime {}", newFileName, dataTime);
+ LOGGER.info("file {}, fileTime {}", newFileName, dataTime);
if (!NewDateUtils.isValidCreationTime(dataTime,
entity.getCycleUnit(),
taskProfile.getTimeOffset())) {
return false;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
index cac872182c..a62a602784 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java
@@ -65,11 +65,11 @@ public class WatchEntity {
String cycleUnit) {
this.watchService = watchService;
this.originPattern = originPattern;
- ArrayList<String> directoryLayers =
FilePathUtil.getDirectoryLayers(originPattern);
+ ArrayList<String> directoryLayers =
FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern);
this.basicStaticPath = directoryLayers.get(0);
this.regexPattern =
NewDateUtils.replaceDateExpressionWithRegex(originPattern);
pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE |
Pattern.DOTALL | Pattern.MULTILINE);
- ArrayList<String> directories =
FilePathUtil.cutDirectory(originPattern);
+ ArrayList<String> directories =
FilePathUtil.cutDirectoryByWildcard(originPattern);
this.originPatternWithoutFileName = directories.get(0);
this.patternWithoutFileName = Pattern
.compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName),
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
index 2b7b98dfab..912b035642 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java
@@ -29,7 +29,7 @@ public class FilePathUtil {
private static final String DAY = "DD";
private static final String HOUR = "hh";
- public static ArrayList<String> cutDirectory(String directory) {
+ public static ArrayList<String> cutDirectoryByWildcard(String directory) {
String baseDirectory;
String regixDirecotry;
String fileName;
@@ -81,7 +81,7 @@ public class FilePathUtil {
return ret;
}
- public static ArrayList<String> getDirectoryLayers(String directory) {
+ public static ArrayList<String>
cutDirectoryByWildcardAndDateExpression(String directory) {
String baseDirectory;
String regixDirectory;
String fileName;
@@ -158,8 +158,8 @@ public class FilePathUtil {
}
public static boolean isSameDir(String fileName1, String fileName2) {
- ArrayList<String> ret1 = FilePathUtil.cutDirectory(fileName1);
- ArrayList<String> ret2 = FilePathUtil.cutDirectory(fileName2);
+ ArrayList<String> ret1 =
FilePathUtil.cutDirectoryByWildcard(fileName1);
+ ArrayList<String> ret2 =
FilePathUtil.cutDirectoryByWildcard(fileName2);
return ret1.get(0).equals(ret2.get(0));
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 529e348fc5..563a168885 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -17,6 +17,7 @@
package org.apache.inlong.agent.plugin.utils.file;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.utils.DateTransUtils;
import hirondelle.date4j.DateTime;
@@ -30,7 +31,6 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
-import java.util.StringTokenizer;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -170,34 +170,34 @@ public class NewDateUtils {
calendar.set(Calendar.MINUTE, minTime);
/* Calculate the offset. */
- if ("D".equalsIgnoreCase(offsetUnit)) {
+ if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
}
- if ("H".equalsIgnoreCase(offsetUnit)) {
+ if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
}
} else if (cycleUnit.length() == 1) {
- if ("D".equalsIgnoreCase(cycleUnit)) {
+ if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
- } else if ("h".equalsIgnoreCase(cycleUnit)) {
+ } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
}
}
/* Calculate the offset. */
- if ("D".equalsIgnoreCase(offsetUnit)) {
+ if (CycleUnitType.DAY.equalsIgnoreCase(offsetUnit)) {
calendar.add(Calendar.DAY_OF_YEAR, offsetNumber);
}
- if ("h".equalsIgnoreCase(offsetUnit)) {
+ if (CycleUnitType.HOUR.equalsIgnoreCase(offsetUnit)) {
calendar.add(Calendar.HOUR_OF_DAY, offsetNumber);
}
- if ("m".equals(offsetUnit)) {
+ if (CycleUnitType.MINUTE.equals(offsetUnit)) {
calendar.add(Calendar.MINUTE, offsetNumber);
}
@@ -207,15 +207,11 @@ public class NewDateUtils {
public static boolean isValidCreationTime(String dataTime, String
cycleUnit,
String timeOffset) {
long timeInterval = 0;
- if ("Y".equalsIgnoreCase(cycleUnit)) {
+ if (CycleUnitType.DAY.equalsIgnoreCase(cycleUnit)) {
timeInterval = DAY_TIMEOUT_INTERVAL;
- } else if ("M".equals(cycleUnit)) {
+ } else if (CycleUnitType.HOUR.equalsIgnoreCase(cycleUnit)) {
timeInterval = HOUR_TIMEOUT_INTERVAL;
- } else if ("D".equalsIgnoreCase(cycleUnit)) {
- timeInterval = DAY_TIMEOUT_INTERVAL;
- } else if ("h".equalsIgnoreCase(cycleUnit)) {
- timeInterval = HOUR_TIMEOUT_INTERVAL;
- } else if (cycleUnit.contains("m")) {
+ } else if (cycleUnit.endsWith(CycleUnitType.MINUTE)) {
timeInterval = HOUR_TIMEOUT_INTERVAL;
} else {
logger.error("cycleUnit {} can't parse!", cycleUnit);
@@ -265,23 +261,20 @@ public class NewDateUtils {
return matcher.find();
}
- public static String getDateTime(String fileName, String dataName,
- PathDateExpression dateExpression) {
+ public static String getDateTime(String fileName, String dataName,
PathDateExpression dateExpression) {
String dataTime = null;
if (isBraceContain(dataName)) {
String fullRegx = replaceDateExpressionWithRegex(dataName,
"dataTime");
- Pattern fullPatt = Pattern.compile(fullRegx);
- Matcher matcher = fullPatt.matcher(fileName);
+ Pattern fullPattern = Pattern.compile(fullRegx);
+ Matcher matcher = fullPattern.matcher(fileName);
if (matcher.find()) {
dataTime = matcher.group("dataTime");
}
} else {
dataTime = getDateTime(fileName, dateExpression);
}
-
return dataTime;
-
}
public static String getDateTime(String fileName, PathDateExpression
dateExpression) {
@@ -423,8 +416,7 @@ public class NewDateUtils {
return sb.toString();
}
- public static String replaceDateExpression(Calendar dateTime,
- String dataPath) {
+ public static String replaceDateExpression(Calendar dateTime, String
dataPath) {
if (dataPath == null) {
return null;
}
@@ -439,7 +431,6 @@ public class NewDateUtils {
// find longest DATEPATTERN
ArrayList<MatchPoint> mp = extractAllTimeRegex(dataPath);
-
if (mp == null || mp.size() == 0) {
return dataPath;
}
@@ -471,35 +462,7 @@ public class NewDateUtils {
return sb.toString();
}
- public static String replaceDateExpression1(Calendar dateTime,
- String logFileName) {
- if (dateTime == null || logFileName == null) {
- return null;
- }
-
- String year = String.valueOf(dateTime.get(Calendar.YEAR));
- String month = String.valueOf(dateTime.get(Calendar.MONTH) + 1);
- String day = String.valueOf(dateTime.get(Calendar.DAY_OF_MONTH));
- String hour = String.valueOf(dateTime.get(Calendar.HOUR_OF_DAY));
- String minute = String.valueOf(dateTime.get(Calendar.MINUTE));
-
- int hhIndex = logFileName.indexOf("hh");
- int mmIndex = logFileName.indexOf("mm");
-
- logFileName = logFileName.replaceAll("YYYY", year);
- logFileName = logFileName.replaceAll("MM", externDate(month));
- logFileName = logFileName.replaceAll("DD", externDate(day));
- logFileName = logFileName.replaceAll("hh", externDate(hour));
-
- if (hhIndex != -1 && mmIndex != -1 && mmIndex >= hhIndex + 2
- && mmIndex < hhIndex + 4) {
- logFileName = logFileName.replaceAll("mm", externDate(minute));
- }
-
- return logFileName;
- }
-
- private static String externDate(String time) {
+ public static String externDate(String time) {
if (time.length() == 1) {
return "0" + time;
} else {
@@ -507,51 +470,12 @@ public class NewDateUtils {
}
}
- public static String parseCycleUnit(String scheduleTime) {
- String cycleUnit = "D";
-
- StringTokenizer st = new StringTokenizer(scheduleTime, " ");
-
- if (st.countTokens() <= 0) {
- return "D";
- }
-
- int index = 0;
- while (st.hasMoreElements()) {
- String currentString = st.nextToken();
- if (currentString.contains("/")) {
- if (index == 1) {
- cycleUnit = "10m";
- } else if (index == 2) {
- cycleUnit = "h";
- }
- break;
- }
-
- if (currentString.equals("*")) {
- if (index == 3) {
- cycleUnit = "D";
- }
- break;
- }
-
- index++;
- }
-
- logger.info("ScheduleTime: " + scheduleTime + ", cycleUnit: "
- + cycleUnit);
-
- return cycleUnit;
- }
-
public static List<Long> getDateRegion(long startTime, long endTime,
String cycleUnit) {
List<Long> ret = new ArrayList<Long>();
DateTime dtStart = DateTime.forInstant(startTime,
TimeZone.getDefault());
DateTime dtEnd = DateTime.forInstant(endTime, TimeZone.getDefault());
- if (cycleUnit.equals("M")) {
- dtEnd = dtEnd.getEndOfMonth();
- } else if (cycleUnit.equals("D")) {
+ if (cycleUnit.equals(CycleUnitType.DAY)) {
dtEnd = dtEnd.getEndOfDay();
}
@@ -561,22 +485,12 @@ public class NewDateUtils {
int hour = 0;
int minute = 0;
int second = 0;
- if (cycleUnit.equalsIgnoreCase("Y")) {
- year = 1;
- } else if (cycleUnit.equals("M")) {
- month = 1;
- } else if (cycleUnit.equalsIgnoreCase("D")) {
+ if (cycleUnit.equalsIgnoreCase(CycleUnitType.DAY)) {
day = 1;
- } else if (cycleUnit.equalsIgnoreCase("h")) {
+ } else if (cycleUnit.equalsIgnoreCase(CycleUnitType.HOUR)) {
hour = 1;
- } else if (cycleUnit.equals("10m")) {
- minute = 10;
- } else if (cycleUnit.equals("15m")) {
- minute = 15;
- } else if (cycleUnit.equals("30m")) {
- minute = 30;
- } else if (cycleUnit.equalsIgnoreCase("s")) {
- second = 1;
+ } else if (cycleUnit.equals(CycleUnitType.MINUTE)) {
+ minute = 1;
} else {
logger.error("cycleUnit {} is error: ", cycleUnit);
return ret;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index dd9c320576..10e4532be2 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -108,8 +108,7 @@ public class AgentBaseTestsHelper {
fileTaskConfig.setEndTime(endTime);
// mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf
fileTaskConfig.setDataContentStyle("mix");
- // 124 is the ASCII code of '|'
- fileTaskConfig.setDataSeparator("124");
+ fileTaskConfig.setDataSeparator("|");
dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
return dataConfig;
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index 3a87eac388..cf8128a0be 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -19,13 +19,13 @@ package org.apache.inlong.agent.plugin.task;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.task.TaskManager;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.task.file.LogFileTask;
import org.apache.inlong.common.enums.TaskStateEnum;
-import com.google.gson.Gson;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -41,7 +41,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
+import java.util.List;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -55,15 +58,9 @@ public class TestLogFileTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(TestLogFileTask.class);
private static final ClassLoader LOADER =
TestLogFileTask.class.getClassLoader();
- private static LogFileTask task;
private static AgentBaseTestsHelper helper;
- private static final Gson GSON = new Gson();
private static TaskManager manager;
- private static MockInstanceManager instanceManager = new
MockInstanceManager();
- private static String tempResourceName;
- private static String resourceName;
- private static String fileName;
- private static String dataTime;
+ private static String resourceParentPath;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
@@ -71,52 +68,76 @@ public class TestLogFileTask {
new AgentThreadFactory("TestLogfileCollectTask"));
@BeforeClass
- public static void setup() {
+ public static void setup() throws Exception {
helper = new
AgentBaseTestsHelper(TestLogFileTask.class.getName()).setupAgentHome();
- resourceName =
LOADER.getResource("testScan/20230928_1/test_1.txt").getPath();
- tempResourceName = LOADER.getResource("testScan/temp.txt").getPath();
- File f = new File(tempResourceName);
- String pattern = f.getParent() + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt";
- TaskProfile taskProfile = helper.getTaskProfile(1, pattern, true, 0L,
0L, TaskStateEnum.RUNNING, "D",
+ resourceParentPath = new
File(LOADER.getResource("testScan/temp.txt").getPath()).getParent();
+ manager = new TaskManager();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ helper.teardownAgentHome();
+ }
+
+ @Test
+ public void testScan() throws Exception {
+ doTest(1, Arrays.asList("testScan/20230928_1/test_1.txt"),
+ resourceParentPath + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt",
CycleUnitType.DAY, Arrays.asList("20230928"),
+ "2023-09-28 00:00:00", "2023-09-30 23:00:00");
+ doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"),
+ resourceParentPath + "/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt",
+ CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023-09-28
00:00:00", "2023-09-30 23:00:00");
+ doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt",
"testScan/202309301059_1/test_1.txt"),
+ resourceParentPath + "/YYYYMMDDhhmm_[0-9]+/test_[0-9]+.txt",
+ CycleUnitType.MINUTE, Arrays.asList("202309281030",
"202309301059"), "2023-09-28 00:00:00",
+ "2023-09-30 23:00:00");
+ doTest(4, Arrays.asList("testScan/20241030/23/59.txt"),
+ resourceParentPath + "/YYYYMMDD/hh/mm.txt",
+ CycleUnitType.MINUTE, Arrays.asList("202410302359"),
"2024-10-30 00:00:00", "2024-10-31 00:00:00");
+ }
+
+ private void doTest(int taskId, List<String> resources, String pattern,
String cycle, List<String> srcDataTimes,
+ String startTime, String endTime)
+ throws Exception {
+ List<String> resourceName = new ArrayList<>();
+ for (int i = 0; i < resources.size(); i++) {
+ resourceName.add(LOADER.getResource(resources.get(i)).getPath());
+ }
+ TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, true,
0L, 0L, TaskStateEnum.RUNNING, cycle,
"GMT+8:00");
+ LogFileTask dayTask = null;
+ final List<String> fileName = new ArrayList();
+ final List<String> dataTime = new ArrayList();
try {
- String startStr = "2023-09-20 00:00:00";
- String endStr = "2023-09-30 00:00:00";
- Date parse = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").parse(startStr);
+
+ Date parse = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").parse(startTime);
long start = parse.getTime();
- parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr);
+ parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endTime);
long end = parse.getTime();
taskProfile.setLong(TaskConstants.TASK_START_TIME, start);
taskProfile.setLong(TaskConstants.TASK_END_TIME, end);
- manager = new TaskManager();
- task = PowerMockito.spy(new LogFileTask());
+ dayTask = PowerMockito.spy(new LogFileTask());
PowerMockito.doAnswer(invocation -> {
- fileName = invocation.getArgument(0);
- dataTime = invocation.getArgument(1);
+ fileName.add(invocation.getArgument(0));
+ dataTime.add(invocation.getArgument(1));
return null;
- }).when(task, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
- Assert.assertTrue(task.isProfileValid(taskProfile));
+ }).when(dayTask, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ Assert.assertTrue(dayTask.isProfileValid(taskProfile));
manager.getTaskStore().storeTask(taskProfile);
- task.init(manager, taskProfile, manager.getInstanceBasicStore());
- EXECUTOR_SERVICE.submit(task);
+ dayTask.init(manager, taskProfile,
manager.getInstanceBasicStore());
+ EXECUTOR_SERVICE.submit(dayTask);
} catch (Exception e) {
LOGGER.error("source init error {}", e);
Assert.assertTrue("source init error", false);
}
- }
-
- @AfterClass
- public static void teardown() throws Exception {
- task.destroy();
- helper.teardownAgentHome();
- }
-
- @Test
- public void testTaskManager() throws Exception {
- await().atMost(2, TimeUnit.SECONDS).until(() -> fileName != null &&
dataTime != null);
- Assert.assertTrue(fileName.compareTo(resourceName) == 0);
- Assert.assertTrue(dataTime.compareTo("20230928") == 0);
- PowerMockito.verifyPrivate(task, Mockito.times(1))
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> fileName.size() == resources.size() &&
dataTime.size() == resources.size());
+ for (int i = 0; i < fileName.size(); i++) {
+ Assert.assertEquals(0,
fileName.get(i).compareTo(resourceName.get(i)));
+ Assert.assertEquals(0,
dataTime.get(i).compareTo(srcDataTimes.get(i)));
+ }
+ PowerMockito.verifyPrivate(dayTask, Mockito.times(resources.size()))
.invoke("addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ dayTask.destroy();
}
}
\ No newline at end of file
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index ea575e613d..cf1b7c8f95 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.plugin.utils;
+import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
+import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.metric.MetricRegister;
@@ -34,7 +36,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
+import java.text.ParseException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
@@ -54,37 +59,67 @@ public class TestUtils {
Assert.assertTrue(DateTransUtils.calcOffset("") == 0);
}
- public static String getTestTriggerProfile() {
- return "{\n"
- + " \"job\": {\n"
- + " \"fileJob\": {\n"
- + " \"additionStr\": \"m=15&file=test\",\n"
- + " \"trigger\":
\"org.apache.inlong.agent.plugin.trigger.DirectoryTrigger\",\n"
- + " \"dir\": {\n"
- + " \"path\": \"\",\n"
- + " \"patterns\": \"/AgentBaseTestsHelper/"
- +
"org.apache.tubemq.inlong.plugin.fetcher.TestTdmFetcher/test.dat\"\n"
- + " },\n"
- + " \"thread\" : {\n"
- + "\"running\": {\n"
- + "\"core\": \"4\"\n"
- + "}\n"
- + "} \n"
- + " },\n"
- + " \"id\": 1,\n"
- + " \"op\": 0,\n"
- + " \"ip\": \"127.0.0.1\",\n"
- + " \"groupId\": \"groupId\",\n"
- + " \"streamId\": \"streamId\",\n"
- + " \"name\": \"fileAgentTest\",\n"
- + " \"source\":
\"org.apache.inlong.agent.plugin.sources.TextFileSource\",\n"
- + " \"sink\":
\"org.apache.inlong.agent.plugin.sinks.MockSink\",\n"
- + " \"channel\":
\"org.apache.inlong.agent.plugin.channel.MemoryChannel\",\n"
- + " \"standalone\": true,\n"
- + " \"deliveryTime\": \"1231313\",\n"
- + " \"splitter\": \"&\"\n"
- + " }\n"
- + " }";
+ @Test
+ public void testPattern() throws ParseException {
+ /*
+ * Condition:
YYYY(?:.MM|MM)?(?:.DD|DD)?(?:.hh|hh)?(?:.mm|mm)?(?:.ss|ss)? The date expression
as a whole must
+ * meet this condition in order to match Need to start with YYYY, and
month, day, hour, minute can only be
+ * separated by one character
+ */
+ testReplaceDateExpression("/YYYYMMDDhhmm.log", "/202406251007.log");
+ testReplaceDateExpression("/YYYY.log", "/2024.log");
+ testReplaceDateExpression("/YYYYhhmm.log", "/20241007.log");
+ testReplaceDateExpression("/YYYY/YYYYMMDDhhmm.log",
"/2024/202406251007.log");
+ testReplaceDateExpression("/YYYY/MMDD/hhmm.log",
"/2024/0625/1007.log");
+ testReplaceDateExpression("/data/YYYYMMDD.hh/mm.log_[0-9]+",
"/data/20240625.10/07.log_[0-9]+");
+ // error cases
+ testReplaceDateExpression("/YYY.log", "/YYY.log");
+ testReplaceDateExpression("/MMDDhhmm.log", "/MMDDhhmm.log");
+ testReplaceDateExpression("/MMDD/hhmm.log", "/MMDD/hhmm.log");
+ testReplaceDateExpression("/data/YYYYMMDD..hh/mm.log_[0-9]+",
"/data/20240625..hh/mm.log_[0-9]+");
+
+ /*
+ * 1 cut the file name 2 cut the path contains wildcard
+ */
+ testCutDirectoryByWildcard("/data/123/YYYYMMDDhhmm.log",
+ Arrays.asList("/data/123", "", "YYYYMMDDhhmm.log"));
+ testCutDirectoryByWildcard("/data/YYYYMMDDhhmm/test.log",
+ Arrays.asList("/data/YYYYMMDDhhmm", "", "test.log"));
+ testCutDirectoryByWildcard("/data/YYYYMMDDhhmm*/test.log",
+ Arrays.asList("/data", "YYYYMMDDhhmm*", "test.log"));
+
testCutDirectoryByWildcard("/data/log_minute/minute_YYYYMMDDhh*/mm.log_[0-9]+",
+ Arrays.asList("/data/log_minute", "minute_YYYYMMDDhh*",
"mm.log_[0-9]+"));
+ testCutDirectoryByWildcard("/data/123+/YYYYMMDDhhmm.log",
+ Arrays.asList("/data", "123+", "YYYYMMDDhhmm.log"));
+
+ /*
+ * 1 cut the file name 2 cut the path contains wildcard or date
expression
+ */
+
testCutDirectoryByWildcardAndDateExpression("/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm.log",
+ Arrays.asList("/data", "YYYYMM/YYYaaMM", "YYYYMMDDhhmm.log"));
+ testCutDirectoryByWildcardAndDateExpression("/data/YYYY/test.log",
+ Arrays.asList("/data", "YYYY", "test.log"));
+ testCutDirectoryByWildcardAndDateExpression("/data/123*/MMDD/test.log",
+ Arrays.asList("/data", "123*/MMDD", "test.log"));
+
testCutDirectoryByWildcardAndDateExpression("/data/YYYYMMDD/123*/test.log",
+ Arrays.asList("/data", "YYYYMMDD/123*", "test.log"));
+ }
+
+ private void testReplaceDateExpression(String src, String dst) throws
ParseException {
+ Calendar calendar = Calendar.getInstance();
+ Long dataTime = DateTransUtils.timeStrConvertToMillSec("202406251007",
"m");
+ calendar.setTimeInMillis(dataTime);
+ Assert.assertEquals(NewDateUtils.replaceDateExpression(calendar, src),
dst);
+ }
+
+ private void testCutDirectoryByWildcard(String src, List<String> dst) {
+ ArrayList<String> directories =
FilePathUtil.cutDirectoryByWildcard(src);
+ Assert.assertEquals(directories, dst);
+ }
+
+ private void testCutDirectoryByWildcardAndDateExpression(String src,
List<String> dst) {
+ ArrayList<String> directoryLayers =
FilePathUtil.cutDirectoryByWildcardAndDateExpression(src);
+ Assert.assertEquals(directoryLayers, dst);
}
public static void createHugeFiles(String fileName, String rootDir, String
record) throws Exception {
diff --git
a/inlong-agent/agent-plugins/src/test/resources/testScan/202309281030_1/test_1.txt
b/inlong-agent/agent-plugins/src/test/resources/testScan/202309281030_1/test_1.txt
new file mode 100644
index 0000000000..780b09709f
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/resources/testScan/202309281030_1/test_1.txt
@@ -0,0 +1,3 @@
+hello line-end-symbol aa
+world line-end-symbol
+agent line-end-symbol
diff --git
a/inlong-agent/agent-plugins/src/test/resources/testScan/2023092810_1/test_1.txt
b/inlong-agent/agent-plugins/src/test/resources/testScan/2023092810_1/test_1.txt
new file mode 100644
index 0000000000..780b09709f
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/resources/testScan/2023092810_1/test_1.txt
@@ -0,0 +1,3 @@
+hello line-end-symbol aa
+world line-end-symbol
+agent line-end-symbol
diff --git
a/inlong-agent/agent-plugins/src/test/resources/testScan/202309301059_1/test_1.txt
b/inlong-agent/agent-plugins/src/test/resources/testScan/202309301059_1/test_1.txt
new file mode 100644
index 0000000000..780b09709f
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/resources/testScan/202309301059_1/test_1.txt
@@ -0,0 +1,3 @@
+hello line-end-symbol aa
+world line-end-symbol
+agent line-end-symbol
diff --git
a/inlong-agent/agent-plugins/src/test/resources/testScan/20241030/23/59.txt
b/inlong-agent/agent-plugins/src/test/resources/testScan/20241030/23/59.txt
new file mode 100644
index 0000000000..780b09709f
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/testScan/20241030/23/59.txt
@@ -0,0 +1,3 @@
+hello line-end-symbol aa
+world line-end-symbol
+agent line-end-symbol