This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8a97c353dd [INLONG-9335][Agent] Bring cycle parameters when creating
an instance (#9336)
8a97c353dd is described below
commit 8a97c353ddcabf70e319d9e006293dda383d15ae
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Nov 27 14:43:23 2023 +0800
[INLONG-9335][Agent] Bring cycle parameters when creating an instance
(#9336)
* [INLONG-9335][Agent] Bring cycle parameters when creating an instance
* [INLONG-9335][Agent] Bring cycle parameters when creating an instance
---
.../org/apache/inlong/agent/conf/TaskProfile.java | 7 ++-
.../inlong/agent/constant/CycleUnitType.java | 25 ++++++++
.../inlong/agent/constant/FileTriggerType.java | 66 ---------------------
.../apache/inlong/agent/utils/DateTransUtils.java | 2 +-
.../agent/core/instance/TestInstanceManager.java | 6 +-
.../agent/plugin/task/filecollect/FileScanner.java | 20 +++----
.../task/filecollect/LogFileCollectTask.java | 68 +++++++++++++++-------
.../inlong/agent/plugin/utils/file/DateUtils.java | 6 +-
.../agent/plugin/utils/file/NewDateUtils.java | 3 +
.../sinks/filecollect/TestSenderManager.java | 2 +-
.../agent/plugin/sources/TestLogFileSource.java | 4 +-
.../agent/plugin/sources/TestMqttConnect.java | 3 +-
.../inlong/agent/plugin/utils/TestUtils.java | 1 +
13 files changed, 100 insertions(+), 113 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index cbad21e499..de863a7aa0 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -62,7 +62,7 @@ public class TaskProfile extends AbstractConfiguration {
}
public String getTimeOffset() {
- return get(TaskConstants.TASK_FILE_TIME_OFFSET);
+ return get(TaskConstants.TASK_FILE_TIME_OFFSET, "");
}
public String getTimeZone() {
@@ -118,7 +118,8 @@ public class TaskProfile extends AbstractConfiguration {
return GSON.toJson(getConfigStorage());
}
- public InstanceProfile createInstanceProfile(String instanceClass, String
fileName, String dataTime,
+ public InstanceProfile createInstanceProfile(String instanceClass, String
fileName, String cycleUnit,
+ String dataTime,
long fileUpdateTime) {
InstanceProfile instanceProfile =
InstanceProfile.parseJsonStr(toJsonStr());
instanceProfile.setInstanceClass(instanceClass);
@@ -126,7 +127,7 @@ public class TaskProfile extends AbstractConfiguration {
instanceProfile.setSourceDataTime(dataTime);
Long sinkDataTime = 0L;
try {
- sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime,
getCycleUnit(),
+ sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime,
cycleUnit,
TimeZone.getTimeZone(getTimeZone()));
} catch (ParseException e) {
logger.error("createInstanceProfile ParseException error: ", e);
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
new file mode 100644
index 0000000000..d12e825b85
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CycleUnitType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.constant;
+
+public class CycleUnitType {
+
+ public static final String DAY = "D";
+ public static final String HOUR = "h";
+ public static final String REAL_TIME = "R";
+}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java
deleted file mode 100644
index adc7109fa5..0000000000
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FileTriggerType.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.constant;
-
-/**
- * Collection type of file data
- */
-public class FileTriggerType {
-
- /**
- * Increment only collect newly created files content.
- *
- * <p>Here is an example. Collect task submit at '2022-01-01 23:00:00'
with pattern '/bin/*.sh'.
- * <blockquote><pre>
- * .
- * └── [2022-01-01 20:49:42] bin
- * ├── [2022-01-01 20:10:00] managerctl
- * ├── [2022-01-01 21:10:00] restart.sh
- * ├── [2022-01-01 22:10:00] shutdown.sh
- * └── [2022-01-01 23:49:00] startup.sh
- * </pre></blockquote>
- *
- * <p>It Finally collect file is:
- * <blockquote><pre>
- * ./bin/startup.sh
- * </pre></blockquote>
- */
- public static final String INCREMENT = "INCREMENT";
-
- /**
- * FULL collect existing files, as well as newly created files.
- *
- * <p>Here is an example. Collect task submit at '2022-01-01 23:00:00'
with pattern '/bin/*.sh'.
- * <blockquote><pre>
- * .
- * └── [2022-01-01 20:49:42] bin
- * ├── [2022-01-01 20:10:00] managerctl
- * ├── [2022-01-01 21:10:00] restart.sh
- * ├── [2022-01-01 22:10:00] shutdown.sh
- * └── [2022-01-01 23:49:00] startup.sh
- * </pre></blockquote>
- *
- * <p>It Finally collect file is:
- * <blockquote><pre>
- * ./bin/startup.sh
- * ./bin/shutdown.sh
- * ./bin/startup.sh
- * </pre></blockquote>
- */
- public static final String FULL = "FULL";
-}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index 2aa08742e8..55182c7dd8 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -56,7 +56,7 @@ public class DateTransUtils {
} else if (cycleUnit.contains("m") && time.length() == 12) {
df = new SimpleDateFormat("yyyyMMddHHmm");
} else {
- logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit);
+ logger.error("time {}, cycleUnit {} can't parse!", time,
cycleUnit);
throw new ParseException(time, 0);
}
try {
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 910a32a46a..558bed0204 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -68,7 +68,8 @@ public class TestInstanceManager {
public void testInstanceManager() {
long timeBefore = AgentUtils.getCurrentTime();
InstanceProfile profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- helper.getTestRootDir() + "/2023092710_1.txt", "2023092710",
AgentUtils.getCurrentTime());
+ helper.getTestRootDir() + "/2023092710_1.txt",
taskProfile.getCycleUnit(), "2023092710",
+ AgentUtils.getCurrentTime());
String sinkDataTime = String.valueOf(profile.getSinkDataTime());
try {
String add2TimeZone = String.valueOf(
@@ -98,7 +99,8 @@ public class TestInstanceManager {
// test continue
profile =
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
- helper.getTestRootDir() + "/2023092710_1.txt", "2023092710",
AgentUtils.getCurrentTime());
+ helper.getTestRootDir() + "/2023092710_1.txt",
taskProfile.getCycleUnit(), "2023092710",
+ AgentUtils.getCurrentTime());
action = new InstanceAction();
action.setActionType(ActionType.ADD);
action.setProfile(profile);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index 8f7d2d9d80..fc989b3bcf 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -17,7 +17,6 @@
package org.apache.inlong.agent.plugin.task.filecollect;
-import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
import org.apache.inlong.agent.plugin.utils.file.Files;
@@ -58,19 +57,19 @@ public class FileScanner {
private static final Logger logger =
LoggerFactory.getLogger(FileScanner.class);
- public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf,
String originPattern, long startTime,
+ public static List<BasicFileInfo> scanTaskBetweenTimes(String
originPattern, String cycleUnit, String timeOffset,
+ long startTime,
long endTime, boolean isRetry) {
- String cycleUnit = conf.getCycleUnit();
if (!isRetry) {
- startTime += NewDateUtils.calcOffset(conf.getTimeOffset());
- endTime += NewDateUtils.calcOffset(conf.getTimeOffset());
+ startTime += NewDateUtils.calcOffset(timeOffset);
+ endTime += NewDateUtils.calcOffset(timeOffset);
}
String strStartTime =
DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime,
cycleUnit);
- logger.info("task {} this scan time is between {} and {}.",
- new Object[]{conf.getTaskId(), strStartTime, strEndTime});
+ logger.info("{} scan time is between {} and {}",
+ new Object[]{originPattern, strStartTime, strEndTime});
- return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern,
strStartTime, strEndTime);
+ return scanTaskBetweenTimes(cycleUnit, originPattern, strStartTime,
strEndTime);
}
/* Scan log files and create tasks between two times. */
@@ -91,8 +90,7 @@ public class FileScanner {
// TODO the time is not YYYYMMDDHH
String dataTime = DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit);
BasicFileInfo info = new BasicFileInfo(file, dataTime);
- logger.info("scan new task fileName {} ,dataTime {}", file,
- DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit));
+ logger.info("scan new task fileName {} ,dataTime {}", file,
dataTime);
infos.add(info);
}
}
@@ -114,11 +112,9 @@ public class FileScanner {
String fileName, long depth, int maxFileNum) {
ArrayList<String> ret = new ArrayList<String>();
ArrayList<File> readyFiles = new ArrayList<File>();
-
if (!new File(firstDir).isDirectory()) {
return ret;
}
-
for (File pathname : Files.find(firstDir).yieldFilesAndDirectories()
.recursive().withDepth((int) depth).withDirNameRegex(secondDir)
.withFileNameRegex(fileName)) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index dc183881c1..9dc7d26c11 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.task.filecollect;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.instance.ActionType;
import org.apache.inlong.agent.core.instance.InstanceAction;
@@ -85,6 +86,7 @@ public class LogFileCollectTask extends Task {
private boolean retry;
private long startTime;
private long endTime;
+ private boolean isRealTime = false;
private boolean initOK = false;
private Set<String> originPatterns;
private long lastScanTime = 0;
@@ -96,7 +98,7 @@ public class LogFileCollectTask extends Task {
@Override
public void init(Object srcManager, TaskProfile taskProfile, Db basicDb)
throws IOException {
if (!isProfileValid(taskProfile)) {
- LOGGER.error("task profile invalid {}", taskProfile);
+ LOGGER.error("task profile invalid {}", taskProfile.toJsonStr());
return;
}
taskManager = (TaskManager) srcManager;
@@ -115,12 +117,15 @@ public class LogFileCollectTask extends Task {
retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false);
originPatterns =
Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(","))
.collect(Collectors.toSet());
+ if
(taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ isRealTime = true;
+ }
instanceManager = new InstanceManager(taskProfile.getTaskId(),
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
basicDb);
try {
instanceManager.start();
} catch (Exception e) {
- LOGGER.error("start instance manager error {}", e.getMessage());
+ LOGGER.error("start instance manager error: ", e);
}
}
@@ -131,12 +136,16 @@ public class LogFileCollectTask extends Task {
}
boolean ret =
profile.hasKey(TaskConstants.FILE_DIR_FILTER_PATTERNS)
- && profile.hasKey(TaskConstants.TASK_FILE_TIME_OFFSET)
&& profile.hasKey(TaskConstants.FILE_MAX_NUM);
if (!ret) {
LOGGER.error("task profile needs file keys");
return false;
}
+ if
(profile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) != 0 &&
!profile.hasKey(
+ TaskConstants.TASK_FILE_TIME_OFFSET)) {
+ LOGGER.error("task profile needs time offset");
+ return false;
+ }
if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) {
long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0);
long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0);
@@ -176,8 +185,11 @@ public class LogFileCollectTask extends Task {
* linux regular expression, we have to replace * to ., and
replace . with \\. .
*/
WatchService watchService =
FileSystems.getDefault().newWatchService();
- WatchEntity entity = new WatchEntity(watchService, originPattern,
taskProfile.getCycleUnit(),
- taskProfile.getTimeOffset());
+ String timeOffset = "";
+ if (!isRealTime) {
+ timeOffset = taskProfile.getTimeOffset();
+ }
+ WatchEntity entity = new WatchEntity(watchService, originPattern,
taskProfile.getCycleUnit(), timeOffset);
entity.registerRecursively();
watchers.put(originPattern, entity);
watchFailedDirs.remove(originPattern);
@@ -187,6 +199,8 @@ public class LogFileCollectTask extends Task {
} else {
LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e);
}
+ } catch (Exception e) {
+ LOGGER.error("addPathPattern:", e);
}
}
@@ -311,7 +325,13 @@ public class LogFileCollectTask extends Task {
startScanTime = currentTime + offset;
endScanTime = currentTime;
}
- return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern,
startScanTime, endScanTime, retry);
+ if (isRealTime) {
+ return FileScanner.scanTaskBetweenTimes(originPattern,
CycleUnitType.HOUR, taskProfile.getTimeOffset(),
+ startScanTime, endScanTime, retry);
+ } else {
+ return FileScanner.scanTaskBetweenTimes(originPattern,
taskProfile.getCycleUnit(),
+ taskProfile.getTimeOffset(), startScanTime, endScanTime,
retry);
+ }
}
private void runForWatching() {
@@ -336,17 +356,7 @@ public class LogFileCollectTask extends Task {
if (sameDataTimeEvents.isEmpty()) {
continue;
}
- /*
- * Calculate whether the event needs to be processed at the
current time based on its data time, business
- * cycle, and offset
- */
- String dataTime = entry.getKey();
- String shouldStartTime =
- NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
- String currentTime = getCurrentTime();
- if (currentTime.compareTo(shouldStartTime) >= 0) {
- LOGGER.info("submit now taskId {}, dataTime {}, currentTime
{}, shouldStartTime {}",
- new Object[]{getTaskId(), dataTime, currentTime,
shouldStartTime});
+ if (isRealTime || shouldStartNow(entry.getKey())) {
/* These codes will sort the FileCreationEvents by create
time. */
Set<InstanceProfile> sortedEvents = new
TreeSet<>(sameDataTimeEvents.values());
/* Check the file end with event creation time in asc order. */
@@ -360,15 +370,23 @@ public class LogFileCollectTask extends Task {
}
sameDataTimeEvents.remove(fileName);
}
- } else {
- LOGGER.info("submit later taskId {}, dataTime {}, currentTime
{}, shouldStartTime {}",
- new Object[]{getTaskId(), dataTime, currentTime,
shouldStartTime});
}
}
}
+ /*
+ * Calculate whether the event needs to be processed at the current time
based on its data time, business cycle, and
+ * offset
+ */
+ private boolean shouldStartNow(String dataTime) {
+ String shouldStartTime =
+ NewDateUtils.getShouldStartTime(dataTime,
taskProfile.getCycleUnit(), taskProfile.getTimeOffset());
+ String currentTime = getCurrentTime();
+ return currentTime.compareTo(shouldStartTime) >= 0;
+ }
+
private void removeTimeoutEven(Map<String, Map<String, InstanceProfile>>
eventMap, boolean isRetry) {
- if (isRetry) {
+ if (isRetry || isRealTime) {
return;
}
for (Map.Entry<String, Map<String, InstanceProfile>> entry :
eventMap.entrySet()) {
@@ -484,8 +502,14 @@ public class LogFileCollectTask extends Task {
LOGGER.error("should not happen! may be {} has been deleted and
add again", fileName);
return;
}
+ String cycleUnit = "";
+ if (isRealTime) {
+ cycleUnit = CycleUnitType.HOUR;
+ } else {
+ cycleUnit = taskProfile.getCycleUnit();
+ }
InstanceProfile instanceProfile =
taskProfile.createInstanceProfile(DEFAULT_FILE_INSTANCE,
- fileName, dataTime, fileUpdateTime);
+ fileName, cycleUnit, dataTime, fileUpdateTime);
sameDataTimeEvents.put(fileName, instanceProfile);
LOGGER.info("add to eventMap taskId {} dataTime {} fileName {}",
taskProfile.getTaskId(), dataTime, fileName);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
index 2280b2db5a..ae57acbca8 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java
@@ -78,9 +78,6 @@ public class DateUtils {
ret = oneMatch;
}
}
- if (ret.isEmpty()) {
- throw new IllegalArgumentException("time pattern " + " not find in
" + src);
- }
return ret;
}
@@ -91,6 +88,9 @@ public class DateUtils {
}
String longestPattern = extractLongestTimeRegex(src);
+ if (longestPattern.isEmpty()) {
+ return new PathDateExpression(longestPattern,
NonRegexPatternPosition.NONE);
+ }
String regexSign = "\\^$*+?{(|[)]";
String range = "+?*{";
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 62207acc81..706167788f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -242,6 +242,9 @@ public class NewDateUtils {
* @return
*/
public static long calcOffset(String timeOffset) {
+ if (timeOffset.length() == 0) {
+ return 0;
+ }
String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
int startIndex;
int symbol;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 0fae339400..4e8ce6b413 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -72,7 +72,7 @@ public class TestSenderManager {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
profile = taskProfile.createInstanceProfile("", fileName,
- "20230927", AgentUtils.getCurrentTime());
+ taskProfile.getCycleUnit(), "20230927",
AgentUtils.getCurrentTime());
}
@AfterClass
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index b8e6d60fef..3397f9f58a 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -61,7 +61,7 @@ public class TestLogFileSource {
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L,
0L, TaskStateEnum.RUNNING);
instanceProfile = taskProfile.createInstanceProfile("",
- fileName, "20230928", AgentUtils.getCurrentTime());
+ fileName, taskProfile.getCycleUnit(), "20230928",
AgentUtils.getCurrentTime());
}
private LogFileSource getSource() {
@@ -110,7 +110,7 @@ public class TestLogFileSource {
msg = source.read();
cnt++;
}
- await().atMost(6, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+ await().atMost(30, TimeUnit.SECONDS).until(() ->
source.sourceFinish());
source.destroy();
Assert.assertTrue(cnt == 3);
Assert.assertTrue(srcLen == readLen);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
index 877f360ca8..89aa196ac2 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
@@ -61,7 +61,8 @@ public class TestMqttConnect {
@Override
public void run() {
- reader.init(jobProfile.createInstanceProfile("", "", "",
AgentUtils.getCurrentTime()));
+ reader.init(jobProfile.createInstanceProfile("", "",
jobProfile.getCycleUnit(), "",
+ AgentUtils.getCurrentTime()));
while (!reader.isFinished()) {
Message message = reader.read();
if (Objects.nonNull(message)) {
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index 81d2433349..46860ae20a 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -51,6 +51,7 @@ public class TestUtils {
Assert.assertTrue(NewDateUtils.calcOffset("0") == 0);
Assert.assertTrue(NewDateUtils.calcOffset("1") == 0);
Assert.assertTrue(NewDateUtils.calcOffset("10") == 0);
+ Assert.assertTrue(NewDateUtils.calcOffset("") == 0);
}
public static String getTestTriggerProfile() {