This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f62b2e87a1 [INLONG-9364][Agent] Remove expired instance from db (#9365)
f62b2e87a1 is described below
commit f62b2e87a12d5c7c2d75417715d3edb422230dab
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 30 11:16:23 2023 +0800
[INLONG-9364][Agent] Remove expired instance from db (#9365)
---
.../apache/inlong/agent/utils/DateTransUtils.java | 40 ++++++++++++
.../agent/core/instance/InstanceManager.java | 74 ++++++++++++++++++----
.../inlong/agent/core/task/file/TaskManager.java | 22 +++++--
.../agent/core/instance/TestInstanceManager.java | 6 +-
.../agent/plugin/task/filecollect/FileScanner.java | 4 +-
.../task/filecollect/LogFileCollectTask.java | 6 +-
.../agent/plugin/utils/file/NewDateUtils.java | 44 +------------
.../inlong/agent/plugin/utils/TestUtils.java | 14 ++--
.../apache/inlong/common/enums/TaskStateEnum.java | 4 +-
9 files changed, 139 insertions(+), 75 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index 55182c7dd8..fe6257d64e 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -111,4 +111,44 @@ public class DateTransUtils {
return retTime;
}
+ /**
+ * Calculate offset time based on offset
+ * The current offset will only be offset forward, or it can be offset
backward to be compatible with the previous
+ * calculation method (subtraction).
+ * When it is offset backward, it returns negative;
+ * When offset forward, return positive
+ *
+ * @param timeOffset offset,such as -1d,-4h,-10m;
+ * @return
+ */
+ public static long calcOffset(String timeOffset) {
+ if (timeOffset.length() == 0) {
+ return 0;
+ }
+ String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
+ int startIndex;
+ int symbol;
+ if (timeOffset.charAt(0) == '-') {
+ symbol = -1;
+ startIndex = 1;
+ } else {
+ symbol = 1;
+ startIndex = 0;
+ }
+
+ String strOffset = timeOffset.substring(startIndex,
timeOffset.length() - 1);
+ if (strOffset.length() == 0) {
+ return 0;
+ }
+ int offsetTime = Integer.parseInt(strOffset);
+ if ("d".equalsIgnoreCase(offsetUnit)) {
+ return offsetTime * 24 * 3600 * 1000 * symbol;
+ } else if ("h".equalsIgnoreCase(offsetUnit)) {
+ return offsetTime * 3600 * 1000 * symbol;
+ } else if ("m".equalsIgnoreCase(offsetUnit)) {
+ return offsetTime * 60 * 1000 * symbol;
+ }
+ return 0;
+ }
+
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index a80ce8b53b..260e5a477f 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -21,16 +21,22 @@ import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.InstanceDb;
+import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +44,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* handle the instance created by task, including add, delete, update etc.
@@ -47,11 +54,14 @@ public class InstanceManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceManager.class);
private static final int ACTION_QUEUE_CAPACITY = 100;
+ public static final int CLEAN_INSTANCE_ONCE_LIMIT = 10;
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
- public static final int CORE_THREAD_PRINT_TIME = 10000;
- private long lastPrintTime = 0;
- // task in db
+ public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000;
+ private long lastCleanTime = 0;
+ public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "-3";
+ // instance in db
private final InstanceDb instanceDb;
+ TaskProfileDb taskProfileDb;
// task in memory
private final ConcurrentHashMap<String, Instance> instanceMap;
// instance profile queue.
@@ -105,9 +115,10 @@ public class InstanceManager extends AbstractDaemon {
/**
* Init task manager.
*/
- public InstanceManager(String taskId, int instanceLimit, Db basicDb) {
+ public InstanceManager(String taskId, int instanceLimit, Db basicDb,
TaskProfileDb taskProfileDb) {
this.taskId = taskId;
instanceDb = new InstanceDb(basicDb);
+ this.taskProfileDb = taskProfileDb;
this.agentConf = AgentConfiguration.getAgentConf();
instanceMap = new ConcurrentHashMap<>();
this.instanceLimit = instanceLimit;
@@ -145,11 +156,11 @@ public class InstanceManager extends AbstractDaemon {
while (isRunnable()) {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
- printInstanceDetail();
+ cleanDbInstance();
dealWithActionQueue(actionQueue);
keepPaceWithDb();
} catch (Throwable ex) {
- LOGGER.error("coreThread {}", ex.getMessage());
+ LOGGER.error("coreThread {}", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
}
runAtLeastOneTime = true;
@@ -158,9 +169,10 @@ public class InstanceManager extends AbstractDaemon {
};
}
- private void printInstanceDetail() {
- if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ private void cleanDbInstance() {
+ if (AgentUtils.getCurrentTime() - lastCleanTime >
INSTANCE_DB_CLEAN_INTERVAL_MS) {
List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+ doCleanDbInstance(instances);
InstancePrintStat stat = new InstancePrintStat();
for (int i = 0; i < instances.size(); i++) {
InstanceProfile instance = instances.get(i);
@@ -169,7 +181,45 @@ public class InstanceManager extends AbstractDaemon {
LOGGER.info(
"instanceManager running! taskId {} mem {} db total {} {}
action count {}",
taskId, instanceMap.size(), instances.size(), stat,
actionQueue.size());
- lastPrintTime = AgentUtils.getCurrentTime();
+ lastCleanTime = AgentUtils.getCurrentTime();
+ }
+ }
+
+ private void doCleanDbInstance(List<InstanceProfile> instances) {
+ AtomicInteger cleanCount = new AtomicInteger();
+ Iterator<InstanceProfile> iterator = instances.iterator();
+ while (iterator.hasNext()) {
+ if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
+ return;
+ }
+ InstanceProfile instanceFromDb = iterator.next();
+ if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
+ return;
+ }
+ TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
+ if (taskFromDb != null) {
+ if
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+ return;
+ }
+ if (taskFromDb.isRetry()) {
+ if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) {
+ return;
+ }
+ } else {
+ if (instanceFromDb.getState() !=
InstanceStateEnum.FINISHED) {
+ return;
+ }
+ }
+ }
+ long expireTime =
DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT +
taskFromDb.getCycleUnit());
+ if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() >
expireTime) {
+ cleanCount.getAndIncrement();
+ LOGGER.info("instance has expired, delete from db dataTime {}
taskId {} instanceId {}",
+ instanceFromDb.getSourceDataTime(),
instanceFromDb.getTaskId(),
+ instanceFromDb.getInstanceId());
+ instanceDb.deleteInstance(instanceFromDb.getTaskId(),
instanceFromDb.getInstanceId());
+ iterator.remove();
+ }
}
}
@@ -181,10 +231,10 @@ public class InstanceManager extends AbstractDaemon {
private void traverseDbTasksToMemory() {
instanceDb.getInstances(taskId).forEach((profileFromDb) -> {
InstanceStateEnum dbState = profileFromDb.getState();
- Instance task = instanceMap.get(profileFromDb.getInstanceId());
+ Instance instance = instanceMap.get(profileFromDb.getInstanceId());
switch (dbState) {
case DEFAULT: {
- if (task == null) {
+ if (instance == null) {
LOGGER.info("traverseDbTasksToMemory add instance to
mem taskId {} instanceId {}",
profileFromDb.getTaskId(),
profileFromDb.getInstanceId());
addToMemory(profileFromDb);
@@ -193,7 +243,7 @@ public class InstanceManager extends AbstractDaemon {
}
case FINISHED:
DELETE: {
- if (task != null) {
+ if (instance != null) {
LOGGER.info("traverseDbTasksToMemory delete
instance from mem taskId {} instanceId {}",
profileFromDb.getTaskId(),
profileFromDb.getInstanceId());
deleteFromMemory(profileFromDb.getInstanceId());
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 4d8bc9fea6..7027b798f8 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -63,6 +63,8 @@ public class TaskManager extends AbstractDaemon {
private final Db taskBasicDb;
// instance basic db
private final Db instanceBasicDb;
+ // offset basic db
+ private final Db offsetBasicDb;
// task in db
private final TaskProfileDb taskDb;
// task in memory
@@ -100,7 +102,7 @@ public class TaskManager extends AbstractDaemon {
frozenCount++;
break;
}
- case FINISH: {
+ case RETRY_FINISH: {
finishedCount++;
break;
}
@@ -122,11 +124,13 @@ public class TaskManager extends AbstractDaemon {
*/
public TaskManager() {
this.agentConf = AgentConfiguration.getAgentConf();
- this.taskBasicDb = initDb(
+ taskBasicDb = initDb(
agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_TASK));
- this.instanceBasicDb = initDb(
- agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
taskDb = new TaskProfileDb(taskBasicDb);
+ instanceBasicDb = initDb(
+ agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
+ offsetBasicDb =
+ initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH,
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
this.runningPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -139,6 +143,10 @@ public class TaskManager extends AbstractDaemon {
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
}
+ public TaskProfileDb getTaskDb() {
+ return taskDb;
+ }
+
/**
* init db by class name
*
@@ -284,7 +292,7 @@ public class TaskManager extends AbstractDaemon {
if (managerState == dbState) {
return;
}
- if (dbState == TaskStateEnum.FINISH) {
+ if (dbState == TaskStateEnum.RETRY_FINISH) {
LOGGER.info("traverseManagerTasksToDb task {} dbState {}
retry {}, do nothing",
taskFromDb.getTaskId(), dbState,
taskFromDb.isRetry());
@@ -335,7 +343,7 @@ public class TaskManager extends AbstractDaemon {
deleteFromMemory(profileFromDb.getTaskId());
}
} else {
- if (dbState != TaskStateEnum.FINISH) {
+ if (dbState != TaskStateEnum.RETRY_FINISH) {
LOGGER.error("task {} invalid state {}",
profileFromDb.getTaskId(), dbState);
}
}
@@ -394,7 +402,7 @@ public class TaskManager extends AbstractDaemon {
}
private void finishTask(TaskProfile taskProfile) {
- taskProfile.setState(TaskStateEnum.FINISH);
+ taskProfile.setState(TaskStateEnum.RETRY_FINISH);
updateToDb(taskProfile);
deleteFromMemory(taskProfile.getTaskId());
}
diff --git
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 558bed0204..262565022e 100755
---
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -19,9 +19,11 @@ package org.apache.inlong.agent.core.instance;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentBaseTestsHelper;
import org.apache.inlong.agent.core.task.file.TaskManager;
import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
@@ -53,7 +55,9 @@ public class TestInstanceManager {
String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
Db basicDb = TaskManager.initDb("/localdb");
taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L,
TaskStateEnum.RUNNING, "GMT+6:00");
- manager = new InstanceManager("1", 2, basicDb);
+ Db taskBasicDb =
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+ TaskProfileDb taskDb = new TaskProfileDb(taskBasicDb);
+ manager = new InstanceManager("1", 2, basicDb, taskDb);
manager.CORE_THREAD_SLEEP_TIME_MS = 100;
manager.start();
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index fc989b3bcf..9985214873 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -61,8 +61,8 @@ public class FileScanner {
long startTime,
long endTime, boolean isRetry) {
if (!isRetry) {
- startTime += NewDateUtils.calcOffset(timeOffset);
- endTime += NewDateUtils.calcOffset(timeOffset);
+ startTime += DateTransUtils.calcOffset(timeOffset);
+ endTime += DateTransUtils.calcOffset(timeOffset);
}
String strStartTime =
DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime,
cycleUnit);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 355bf3909f..86ce040210 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -34,6 +34,7 @@ import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
import org.apache.inlong.agent.state.State;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.agent.utils.file.FileUtils;
import org.slf4j.Logger;
@@ -70,6 +71,7 @@ public class LogFileCollectTask extends Task {
public static final String DEFAULT_FILE_INSTANCE =
"org.apache.inlong.agent.plugin.instance.FileInstance";
private static final Logger LOGGER =
LoggerFactory.getLogger(LogFileCollectTask.class);
+ public static final String SCAN_CYCLE_RANCE = "-2";
private TaskProfile taskProfile;
private Db basicDb;
private TaskManager taskManager;
@@ -117,7 +119,7 @@ public class LogFileCollectTask extends Task {
isRealTime = true;
}
instanceManager = new InstanceManager(taskProfile.getTaskId(),
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
- basicDb);
+ basicDb, taskManager.getTaskDb());
try {
instanceManager.start();
} catch (Exception e) {
@@ -318,7 +320,7 @@ public class LogFileCollectTask extends Task {
if (!retry) {
long currentTime = System.currentTimeMillis();
// only scan two cycle, like two hours or two days
- long offset = NewDateUtils.calcOffset("-2" +
taskProfile.getCycleUnit());
+ long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE +
taskProfile.getCycleUnit());
startScanTime = currentTime + offset;
endScanTime = currentTime;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 706167788f..c38eb57be8 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -223,54 +223,14 @@ public class NewDateUtils {
}
if (timeOffset.startsWith("-")) {
- timeInterval -= calcOffset(timeOffset);
+ timeInterval -= DateTransUtils.calcOffset(timeOffset);
} else {
- timeInterval += calcOffset(timeOffset);
+ timeInterval += DateTransUtils.calcOffset(timeOffset);
}
return isValidCreationTime(dataTime, timeInterval);
}
- /**
- * Calculate offset time based on offset
- * The current offset will only be offset forward, or it can be offset
backward to be compatible with the previous
- * calculation method (subtraction).
- * When it is offset backward, it returns negative;
- * When offset forward, return positive
- *
- * @param timeOffset offset,such as -1d,-4h,-10m;
- * @return
- */
- public static long calcOffset(String timeOffset) {
- if (timeOffset.length() == 0) {
- return 0;
- }
- String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
- int startIndex;
- int symbol;
- if (timeOffset.charAt(0) == '-') {
- symbol = -1;
- startIndex = 1;
- } else {
- symbol = 1;
- startIndex = 0;
- }
-
- String strOffset = timeOffset.substring(startIndex,
timeOffset.length() - 1);
- if (strOffset.length() == 0) {
- return 0;
- }
- int offsetTime = Integer.parseInt(strOffset);
- if ("d".equalsIgnoreCase(offsetUnit)) {
- return offsetTime * 24 * 3600 * 1000 * symbol;
- } else if ("h".equalsIgnoreCase(offsetUnit)) {
- return offsetTime * 3600 * 1000 * symbol;
- } else if ("m".equalsIgnoreCase(offsetUnit)) {
- return offsetTime * 60 * 1000 * symbol;
- }
- return 0;
- }
-
/*
* Check whether the data time is between curTime - interval and curTime +
interval.
*/
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index 46860ae20a..ea575e613d 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,7 +17,7 @@
package org.apache.inlong.agent.plugin.utils;
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.commons.io.FileUtils;
@@ -46,12 +46,12 @@ public class TestUtils {
@Test
public void testCalcOffset() {
- Assert.assertTrue(NewDateUtils.calcOffset("-1h") == -3600 * 1000);
- Assert.assertTrue(NewDateUtils.calcOffset("1D") == 24 * 3600 * 1000);
- Assert.assertTrue(NewDateUtils.calcOffset("0") == 0);
- Assert.assertTrue(NewDateUtils.calcOffset("1") == 0);
- Assert.assertTrue(NewDateUtils.calcOffset("10") == 0);
- Assert.assertTrue(NewDateUtils.calcOffset("") == 0);
+ Assert.assertTrue(DateTransUtils.calcOffset("-1h") == -3600 * 1000);
+ Assert.assertTrue(DateTransUtils.calcOffset("1D") == 24 * 3600 * 1000);
+ Assert.assertTrue(DateTransUtils.calcOffset("0") == 0);
+ Assert.assertTrue(DateTransUtils.calcOffset("1") == 0);
+ Assert.assertTrue(DateTransUtils.calcOffset("10") == 0);
+ Assert.assertTrue(DateTransUtils.calcOffset("") == 0);
}
public static String getTestTriggerProfile() {
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
index 6401fa8ffb..583604b32b 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
@@ -25,7 +25,7 @@ public enum TaskStateEnum {
NEW(0),
RUNNING(1),
FROZEN(2),
- FINISH(3);
+ RETRY_FINISH(3);
private final int state;
@@ -42,7 +42,7 @@ public enum TaskStateEnum {
case 2:
return FROZEN;
case 3:
- return FINISH;
+ return RETRY_FINISH;
default:
throw new RuntimeException("Unsupported task state " + state);
}