This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f58de13ab0 [INLONG-9721][Agent] Add a common cycle parameter to the
task configuration (#9722)
f58de13ab0 is described below
commit f58de13ab035c1c703acdd36146a528493b34067
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Feb 22 10:05:02 2024 +0800
[INLONG-9721][Agent] Add a common cycle parameter to the task configuration
(#9722)
---
.../java/org/apache/inlong/agent/constant/TaskConstants.java | 3 ++-
.../main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java | 4 ++++
.../org/apache/inlong/agent/plugin/task/file/LogFileTask.java | 9 +++++++++
.../task/{TestLogfileCollectTask.java => TestLogFileTask.java} | 9 +++++----
4 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 37dd2d5da3..eaa71f4751 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -66,7 +66,8 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_FILE_TIME_OFFSET =
"task.fileTask.timeOffset";
public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
public static final String TASK_FILE_MAX_WAIT =
"task.fileTask.file.max.wait";
- public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
+ public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
+ public static final String FILE_TASK_CYCLE_UNIT =
"task.fileTask.cycleUnit";
public static final String TASK_FILE_TRIGGER_TYPE =
"task.fileTask.collectType";
public static final String JOB_FILE_LINE_END_PATTERN =
"job.fileTask.line.endPattern";
public static final String JOB_FILE_CONTENT_COLLECT_TYPE =
"job.fileTask.contentCollectType";
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 9ef55522ec..2730ce8f60 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.pojo;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.Line;
import org.apache.inlong.common.constant.MQType;
@@ -411,6 +412,7 @@ public class TaskProfileDto {
task.setVersion(dataConfig.getVersion());
task.setState(dataConfig.getState());
task.setPredefinedFields(dataConfig.getPredefinedFields());
+ task.setCycleUnit(CycleUnitType.REAL_TIME);
// set sink type
if (dataConfig.getDataReportType() ==
NORMAL_SEND_TO_DATAPROXY.ordinal()) {
@@ -443,6 +445,7 @@ public class TaskProfileDto {
case FILE:
task.setTaskClass(DEFAULT_FILE_TASK);
FileTask fileTask = getFileJob(dataConfig);
+ task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
task.setSource(DEFAULT_SOURCE);
profileDto.setTask(task);
@@ -519,6 +522,7 @@ public class TaskProfileDto {
private String taskClass;
private String predefinedFields;
private Integer state;
+ private String cycleUnit;
private FileTask fileTask;
private BinlogJob binlogJob;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 03f1baab7e..2f7b8e500f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -134,10 +134,19 @@ public class LogFileTask extends Task {
LOGGER.error("task profile needs all required key");
return false;
}
+ if (!profile.hasKey(TaskConstants.FILE_TASK_CYCLE_UNIT)) {
+ LOGGER.error("task profile needs file cycle unit");
+ return false;
+ }
if (!profile.hasKey(TaskConstants.TASK_CYCLE_UNIT)) {
LOGGER.error("task profile needs cycle unit");
return false;
}
+ if (profile.get(TaskConstants.TASK_CYCLE_UNIT)
+ .compareTo(profile.get(TaskConstants.FILE_TASK_CYCLE_UNIT)) !=
0) {
+ LOGGER.error("task profile cycle unit must be consistent");
+ return false;
+ }
if (!profile.hasKey(TaskConstants.TASK_FILE_TIME_ZONE)) {
LOGGER.error("task profile needs time zone");
return false;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
similarity index 94%
rename from
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
rename to
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index ae5343256f..37fcb158a6 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -52,10 +52,10 @@ import static org.awaitility.Awaitility.await;
@RunWith(PowerMockRunner.class)
@PrepareForTest(LogFileTask.class)
@PowerMockIgnore({"javax.management.*"})
-public class TestLogfileCollectTask {
+public class TestLogFileTask {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TestLogfileCollectTask.class);
- private static final ClassLoader LOADER =
TestLogfileCollectTask.class.getClassLoader();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TestLogFileTask.class);
+ private static final ClassLoader LOADER =
TestLogFileTask.class.getClassLoader();
private static LogFileTask task;
private static AgentBaseTestsHelper helper;
private static final Gson GSON = new Gson();
@@ -73,7 +73,7 @@ public class TestLogfileCollectTask {
@BeforeClass
public static void setup() {
- helper = new
AgentBaseTestsHelper(TestLogfileCollectTask.class.getName()).setupAgentHome();
+ helper = new
AgentBaseTestsHelper(TestLogFileTask.class.getName()).setupAgentHome();
Db basicDb = TaskManager.initDb("/localdb");
resourceName =
LOADER.getResource("testScan/20230928_1/test_1.txt").getPath();
tempResourceName = LOADER.getResource("testScan/temp.txt").getPath();
@@ -96,6 +96,7 @@ public class TestLogfileCollectTask {
dataTime = invocation.getArgument(1);
return null;
}).when(task, "addToEvenMap", Mockito.anyString(),
Mockito.anyString());
+ Assert.assertTrue(task.isProfileValid(taskProfile));
task.init(manager, taskProfile, basicDb);
EXECUTOR_SERVICE.submit(task);
} catch (Exception e) {