This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bc35ea2  [INLONG-3297][Agent] Add version control in Agent 
CommandEntity
bc35ea2 is described below

commit bc35ea2dee80e5b1aa91807f7a54b4f75d4630a8
Author: Schnapps <[email protected]>
AuthorDate: Tue Mar 22 19:58:05 2022 +0800

    [INLONG-3297][Agent] Add version control in Agent CommandEntity
---
 .../apache/inlong/agent/constant/AgentConstants.java   |  3 +++
 .../java/org/apache/inlong/agent/db/CommandDb.java     |  4 ++++
 .../org/apache/inlong/agent/pojo/JobProfileDto.java    |  2 ++
 .../org/apache/inlong/agent/core/job/JobWrapper.java   | 18 ++++++++++++++++++
 .../org/apache/inlong/agent/core/task/TaskWrapper.java | 10 ----------
 5 files changed, 27 insertions(+), 10 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 83f747b..2f42d60 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -182,4 +182,7 @@ public class AgentConstants {
     public static final String AGENT_HISTORY_PATH = "agent.history.path";
     public static final String DEFAULT_AGENT_HISTORY_PATH = ".history";
 
+    public static final String JOB_VERSION = "job.version";
+    public static final Integer DEFAULT_JOB_VERSION = 1;
+
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
index 66d0697..2d759eb 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.agent.db;
 
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
+
 import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.db.CommandEntity;
@@ -57,6 +60,7 @@ public class CommandDb {
         entity.setTaskId(Integer.parseInt(profile.getTriggerId()));
         entity.setDeliveryTime(profile.getDeliveryTime());
         entity.setCommandResult(success ? Constants.RESULT_SUCCESS : 
Constants.RESULT_FAIL);
+        entity.setVersion(profile.getInt(JOB_VERSION, DEFAULT_JOB_VERSION));
         entity.setAcked(false);
         storeCommand(entity);
     }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index c53206f..6a75879 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -176,6 +176,7 @@ public class JobProfileDto {
         job.setDeliveryTime(dataConfigs.getDeliveryTime());
         job.setUuid(dataConfigs.getUuid());
         job.setSink(DEFAULT_DATAPROXY_SINK);
+        job.setVersion(dataConfigs.getVersion());
         TaskTypeEnum taskType = 
TaskTypeEnum.getTaskType(dataConfigs.getTaskType());
         switch (requireNonNull(taskType)) {
             case SQL:
@@ -216,6 +217,7 @@ public class JobProfileDto {
         private String retryTime;
         private String deliveryTime;
         private String uuid;
+        private Integer version;
 
         private FileJob fileJob;
         private BinlogJob binlogJob;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index 32868f2..b8111ad 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.agent.core.job;
 
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER;
 
 import java.util.ArrayList;
@@ -27,8 +29,11 @@ import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.core.task.Task;
 import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.CommandDb;
 import org.apache.inlong.agent.state.AbstractStateWrapper;
 import org.apache.inlong.agent.state.State;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.db.CommandEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +49,7 @@ public class JobWrapper extends AbstractStateWrapper {
     private final TaskManager taskManager;
     private final JobManager jobManager;
     private final Job job;
+    private CommandDb db;
 
     private final List<Task> allTasks;
 
@@ -54,6 +60,7 @@ public class JobWrapper extends AbstractStateWrapper {
         this.jobManager = manager.getJobManager();
         this.job = job;
         this.allTasks = new ArrayList<>();
+        this.db = manager.getCommandDb();
         doChangeState(State.ACCEPTED);
     }
 
@@ -76,9 +83,20 @@ public class JobWrapper extends AbstractStateWrapper {
             doChangeState(State.SUCCEEDED);
         } else {
             doChangeState(State.FAILED);
+            saveFailedCommand();
         }
     }
 
+    private void saveFailedCommand() {
+        CommandEntity entity = new CommandEntity();
+        entity.setId(job.getJobInstanceId());
+        entity.setAcked(false);
+        entity.setTaskId(Integer.valueOf(job.getJobInstanceId()));
+        entity.setCommandResult(Constants.RESULT_FAIL);
+        entity.setVersion(job.getJobConf().getInt(JOB_VERSION, 
DEFAULT_JOB_VERSION));
+        db.storeCommand(entity);
+    }
+
     /**
      * submit all tasks
      */
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index 53eac5c..ed1740d 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -28,13 +28,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.db.CommandDb;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.state.AbstractStateWrapper;
 import org.apache.inlong.agent.state.State;
-import org.apache.inlong.common.constant.Constants;
-import org.apache.inlong.common.db.CommandEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +52,6 @@ public class TaskWrapper extends AbstractStateWrapper {
     private final int pushMaxWaitTime;
     private final int pullMaxWaitTime;
     private ExecutorService executorService;
-    private CommandDb db;
 
     public TaskWrapper(AgentManager manager, Task task) {
         super();
@@ -75,7 +71,6 @@ public class TaskWrapper extends AbstractStateWrapper {
                     new AgentThreadFactory("task-reader-writer"));
         }
         doChangeState(State.ACCEPTED);
-        this.db = manager.getCommandDb();
     }
 
     /**
@@ -207,11 +202,6 @@ public class TaskWrapper extends AbstractStateWrapper {
             submitThreadsAndWait();
             if (!isException()) {
                 doChangeState(State.SUCCEEDED);
-            } else {
-                CommandEntity command = new CommandEntity();
-                command.setTaskId(Integer.valueOf(task.getTaskId()));
-                command.setCommandResult(Constants.RESULT_FAIL);
-                db.storeCommand(command);
             }
             LOGGER.info("start to destroy task {}", task.getTaskId());
             task.destroy();

Reply via email to