This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bc35ea2 [INLONG-3297][Agent] Add version control in Agent
CommandEntity
bc35ea2 is described below
commit bc35ea2dee80e5b1aa91807f7a54b4f75d4630a8
Author: Schnapps <[email protected]>
AuthorDate: Tue Mar 22 19:58:05 2022 +0800
[INLONG-3297][Agent] Add version control in Agent CommandEntity
---
.../apache/inlong/agent/constant/AgentConstants.java | 3 +++
.../java/org/apache/inlong/agent/db/CommandDb.java | 4 ++++
.../org/apache/inlong/agent/pojo/JobProfileDto.java | 2 ++
.../org/apache/inlong/agent/core/job/JobWrapper.java | 18 ++++++++++++++++++
.../org/apache/inlong/agent/core/task/TaskWrapper.java | 10 ----------
5 files changed, 27 insertions(+), 10 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 83f747b..2f42d60 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -182,4 +182,7 @@ public class AgentConstants {
public static final String AGENT_HISTORY_PATH = "agent.history.path";
public static final String DEFAULT_AGENT_HISTORY_PATH = ".history";
+ public static final String JOB_VERSION = "job.version";
+ public static final Integer DEFAULT_JOB_VERSION = 1;
+
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
index 66d0697..2d759eb 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
@@ -17,6 +17,9 @@
package org.apache.inlong.agent.db;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
+
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.db.CommandEntity;
@@ -57,6 +60,7 @@ public class CommandDb {
entity.setTaskId(Integer.parseInt(profile.getTriggerId()));
entity.setDeliveryTime(profile.getDeliveryTime());
entity.setCommandResult(success ? Constants.RESULT_SUCCESS :
Constants.RESULT_FAIL);
+ entity.setVersion(profile.getInt(JOB_VERSION, DEFAULT_JOB_VERSION));
entity.setAcked(false);
storeCommand(entity);
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index c53206f..6a75879 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -176,6 +176,7 @@ public class JobProfileDto {
job.setDeliveryTime(dataConfigs.getDeliveryTime());
job.setUuid(dataConfigs.getUuid());
job.setSink(DEFAULT_DATAPROXY_SINK);
+ job.setVersion(dataConfigs.getVersion());
TaskTypeEnum taskType =
TaskTypeEnum.getTaskType(dataConfigs.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
@@ -216,6 +217,7 @@ public class JobProfileDto {
private String retryTime;
private String deliveryTime;
private String uuid;
+ private Integer version;
private FileJob fileJob;
private BinlogJob binlogJob;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index 32868f2..b8111ad 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.core.job;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_VERSION;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_OFFSET_DELIMITER;
import java.util.ArrayList;
@@ -27,8 +29,11 @@ import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.task.Task;
import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.state.AbstractStateWrapper;
import org.apache.inlong.agent.state.State;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.db.CommandEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +49,7 @@ public class JobWrapper extends AbstractStateWrapper {
private final TaskManager taskManager;
private final JobManager jobManager;
private final Job job;
+ private CommandDb db;
private final List<Task> allTasks;
@@ -54,6 +60,7 @@ public class JobWrapper extends AbstractStateWrapper {
this.jobManager = manager.getJobManager();
this.job = job;
this.allTasks = new ArrayList<>();
+ this.db = manager.getCommandDb();
doChangeState(State.ACCEPTED);
}
@@ -76,9 +83,20 @@ public class JobWrapper extends AbstractStateWrapper {
doChangeState(State.SUCCEEDED);
} else {
doChangeState(State.FAILED);
+ saveFailedCommand();
}
}
+ private void saveFailedCommand() {
+ CommandEntity entity = new CommandEntity();
+ entity.setId(job.getJobInstanceId());
+ entity.setAcked(false);
+ entity.setTaskId(Integer.valueOf(job.getJobInstanceId()));
+ entity.setCommandResult(Constants.RESULT_FAIL);
+ entity.setVersion(job.getJobConf().getInt(JOB_VERSION,
DEFAULT_JOB_VERSION));
+ db.storeCommand(entity);
+ }
+
/**
* submit all tasks
*/
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index 53eac5c..ed1740d 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -28,13 +28,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.state.AbstractStateWrapper;
import org.apache.inlong.agent.state.State;
-import org.apache.inlong.common.constant.Constants;
-import org.apache.inlong.common.db.CommandEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +52,6 @@ public class TaskWrapper extends AbstractStateWrapper {
private final int pushMaxWaitTime;
private final int pullMaxWaitTime;
private ExecutorService executorService;
- private CommandDb db;
public TaskWrapper(AgentManager manager, Task task) {
super();
@@ -75,7 +71,6 @@ public class TaskWrapper extends AbstractStateWrapper {
new AgentThreadFactory("task-reader-writer"));
}
doChangeState(State.ACCEPTED);
- this.db = manager.getCommandDb();
}
/**
@@ -207,11 +202,6 @@ public class TaskWrapper extends AbstractStateWrapper {
submitThreadsAndWait();
if (!isException()) {
doChangeState(State.SUCCEEDED);
- } else {
- CommandEntity command = new CommandEntity();
- command.setTaskId(Integer.valueOf(task.getTaskId()));
- command.setCommandResult(Constants.RESULT_FAIL);
- db.storeCommand(command);
}
LOGGER.info("start to destroy task {}", task.getTaskId());
task.destroy();