This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f58de13ab0 [INLONG-9721][Agent] Add a common cycle parameter to the 
task configuration (#9722)
f58de13ab0 is described below

commit f58de13ab035c1c703acdd36146a528493b34067
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Feb 22 10:05:02 2024 +0800

    [INLONG-9721][Agent] Add a common cycle parameter to the task configuration 
(#9722)
---
 .../java/org/apache/inlong/agent/constant/TaskConstants.java     | 3 ++-
 .../main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java   | 4 ++++
 .../org/apache/inlong/agent/plugin/task/file/LogFileTask.java    | 9 +++++++++
 .../task/{TestLogfileCollectTask.java => TestLogFileTask.java}   | 9 +++++----
 4 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 37dd2d5da3..eaa71f4751 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -66,7 +66,8 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_FILE_TIME_OFFSET = 
"task.fileTask.timeOffset";
     public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
     public static final String TASK_FILE_MAX_WAIT = 
"task.fileTask.file.max.wait";
-    public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
+    public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
+    public static final String FILE_TASK_CYCLE_UNIT = 
"task.fileTask.cycleUnit";
     public static final String TASK_FILE_TRIGGER_TYPE = 
"task.fileTask.collectType";
     public static final String JOB_FILE_LINE_END_PATTERN = 
"job.fileTask.line.endPattern";
     public static final String JOB_FILE_CONTENT_COLLECT_TYPE = 
"job.fileTask.contentCollectType";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 9ef55522ec..2730ce8f60 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.pojo;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
 import org.apache.inlong.agent.pojo.FileTask.Line;
 import org.apache.inlong.common.constant.MQType;
@@ -411,6 +412,7 @@ public class TaskProfileDto {
         task.setVersion(dataConfig.getVersion());
         task.setState(dataConfig.getState());
         task.setPredefinedFields(dataConfig.getPredefinedFields());
+        task.setCycleUnit(CycleUnitType.REAL_TIME);
 
         // set sink type
         if (dataConfig.getDataReportType() == 
NORMAL_SEND_TO_DATAPROXY.ordinal()) {
@@ -443,6 +445,7 @@ public class TaskProfileDto {
             case FILE:
                 task.setTaskClass(DEFAULT_FILE_TASK);
                 FileTask fileTask = getFileJob(dataConfig);
+                task.setCycleUnit(fileTask.getCycleUnit());
                 task.setFileTask(fileTask);
                 task.setSource(DEFAULT_SOURCE);
                 profileDto.setTask(task);
@@ -519,6 +522,7 @@ public class TaskProfileDto {
         private String taskClass;
         private String predefinedFields;
         private Integer state;
+        private String cycleUnit;
 
         private FileTask fileTask;
         private BinlogJob binlogJob;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 03f1baab7e..2f7b8e500f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -134,10 +134,19 @@ public class LogFileTask extends Task {
             LOGGER.error("task profile needs all required key");
             return false;
         }
+        if (!profile.hasKey(TaskConstants.FILE_TASK_CYCLE_UNIT)) {
+            LOGGER.error("task profile needs file cycle unit");
+            return false;
+        }
         if (!profile.hasKey(TaskConstants.TASK_CYCLE_UNIT)) {
             LOGGER.error("task profile needs cycle unit");
             return false;
         }
+        if (profile.get(TaskConstants.TASK_CYCLE_UNIT)
+                .compareTo(profile.get(TaskConstants.FILE_TASK_CYCLE_UNIT)) != 
0) {
+            LOGGER.error("task profile cycle unit must be consistent");
+            return false;
+        }
         if (!profile.hasKey(TaskConstants.TASK_FILE_TIME_ZONE)) {
             LOGGER.error("task profile needs time zone");
             return false;
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
similarity index 94%
rename from 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
rename to 
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
index ae5343256f..37fcb158a6 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogfileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java
@@ -52,10 +52,10 @@ import static org.awaitility.Awaitility.await;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(LogFileTask.class)
 @PowerMockIgnore({"javax.management.*"})
-public class TestLogfileCollectTask {
+public class TestLogFileTask {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestLogfileCollectTask.class);
-    private static final ClassLoader LOADER = 
TestLogfileCollectTask.class.getClassLoader();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestLogFileTask.class);
+    private static final ClassLoader LOADER = 
TestLogFileTask.class.getClassLoader();
     private static LogFileTask task;
     private static AgentBaseTestsHelper helper;
     private static final Gson GSON = new Gson();
@@ -73,7 +73,7 @@ public class TestLogfileCollectTask {
 
     @BeforeClass
     public static void setup() {
-        helper = new 
AgentBaseTestsHelper(TestLogfileCollectTask.class.getName()).setupAgentHome();
+        helper = new 
AgentBaseTestsHelper(TestLogFileTask.class.getName()).setupAgentHome();
         Db basicDb = TaskManager.initDb("/localdb");
         resourceName = 
LOADER.getResource("testScan/20230928_1/test_1.txt").getPath();
         tempResourceName = LOADER.getResource("testScan/temp.txt").getPath();
@@ -96,6 +96,7 @@ public class TestLogfileCollectTask {
                 dataTime = invocation.getArgument(1);
                 return null;
             }).when(task, "addToEvenMap", Mockito.anyString(), 
Mockito.anyString());
+            Assert.assertTrue(task.isProfileValid(taskProfile));
             task.init(manager, taskProfile, basicDb);
             EXECUTOR_SERVICE.submit(task);
         } catch (Exception e) {

Reply via email to