This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
new d628628 [TUBEMQ-483] add agent/job/task manager for data collection
managerment.
d628628 is described below
commit d62862865b567d1bda8e877f593a23290ea47705
Author: yuanbo <[email protected]>
AuthorDate: Fri Dec 25 11:27:01 2020 +0800
[TUBEMQ-483] add agent/job/task manager for data collection managerment.
---
.../apache/tubemq/agent/common/AbstractDaemon.java | 87 ++++++++
.../tubemq/agent/common/AgentThreadFactory.java | 40 ++++
.../org/apache/tubemq/agent/common/Service.java | 35 ++++
.../org/apache/tubemq/agent/db/JobProfileDB.java | 114 ++++++++++
.../org/apache/tubemq/agent/db/KeyValueEntity.java | 19 ++
.../apache/tubemq/agent/db/TriggerProfileDB.java | 77 +++++++
.../org/apache/tubemq/agent/core/AgentMain.java | 27 +++
.../org/apache/tubemq/agent/core/AgentManager.java | 106 ++++++++++
.../java/org/apache/tubemq/agent/core/job/Job.java | 92 +++++++++
.../apache/tubemq/agent/core/job/JobManager.java | 166 +++++++++++++++
.../apache/tubemq/agent/core/job/JobWrapper.java | 122 +++++++++++
.../org/apache/tubemq/agent/core/task/Task.java | 73 +++++++
.../apache/tubemq/agent/core/task/TaskManager.java | 229 +++++++++++++++++++++
.../apache/tubemq/agent/core/task/TaskMetrics.java | 45 ++++
.../apache/tubemq/agent/core/task/TaskWrapper.java | 177 ++++++++++++++++
15 files changed, 1409 insertions(+)
diff --git
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AbstractDaemon.java
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AbstractDaemon.java
new file mode 100644
index 0000000..5982658
--- /dev/null
+++
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AbstractDaemon.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Providing work threads management, those threads run
+ * periodically until agent is stopped.
+ */
+public abstract class AbstractDaemon implements Service {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractDaemon.class);
+
+ // worker thread pool, share it
+ private static final ExecutorService workerServices =
Executors.newCachedThreadPool();
+ private final List<CompletableFuture<?>> workerFutures;
+ private boolean runnable = true;
+
+ public AbstractDaemon() {
+ this.workerFutures = new ArrayList<>();
+ }
+
+ /**
+ * Whether threads can in running state with while loop.
+ *
+ * @return - true if threads can run
+ */
+ public boolean isRunnable() {
+ return runnable;
+ }
+
+ /**
+ * Stop running threads.
+ */
+ public void stopRunningThreads() {
+ runnable = false;
+ }
+
+ /**
+ * Submit work thread to thread pool.
+ *
+ * @param worker - work thread
+ */
+ public void submitWorker(Runnable worker) {
+ CompletableFuture<?> future = CompletableFuture.runAsync(worker,
workerServices);
+ workerFutures.add(future);
+ LOGGER.info("{} running worker number is {}",
this.getClass().getName(),
+ workerFutures.size());
+ }
+
+ /**
+ * Wait for threads finish.
+ */
+ public void join() {
+ for (CompletableFuture<?> future : workerFutures) {
+ future.join();
+ }
+ }
+
+ /**
+ * Stop thread pool and running threads if they're in the running state.
+ */
+ public void waitForTerminate() {
+ // stop running threads.
+ if (isRunnable()) {
+ stopRunningThreads();
+ }
+ }
+}
diff --git
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AgentThreadFactory.java
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AgentThreadFactory.java
new file mode 100644
index 0000000..150f163
--- /dev/null
+++
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AgentThreadFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.common;
+
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AgentThreadFactory implements ThreadFactory {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AgentThreadFactory.class);
+
+ private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+ private final String threadType;
+
+ public AgentThreadFactory(String threadType) {
+ this.threadType = threadType;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, threadType + "-running-thread-" +
mThreadNum.getAndIncrement());
+ LOGGER.info("{} created", t.getName());
+ return t;
+ }
+}
\ No newline at end of file
diff --git
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/Service.java
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/Service.java
new file mode 100644
index 0000000..7a79775
--- /dev/null
+++
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/Service.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.common;
+
+/**
+ * Service lifecycle interface.
+ */
+public interface Service {
+
+ /**
+ * start service
+ */
+ void start() throws Exception;
+
+ /**
+ * stop service
+ */
+ void stop() throws Exception;
+
+ /**
+ * join and wait until getting signal.
+ */
+ void join() throws Exception;
+}
diff --git
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/JobProfileDB.java
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/JobProfileDB.java
new file mode 100644
index 0000000..986802d
--- /dev/null
+++
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/JobProfileDB.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_ID;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper for job conf persistence.
+ */
+public class JobProfileDB {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JobProfileDB.class);
+
+ private static final String JOB_ID_PREFIX = "job_";
+ private final DB db;
+
+ public JobProfileDB(DB db) {
+ this.db = db;
+ }
+
+ /**
+ * get job which in accepted state
+ * @return null or job conf
+ */
+ public JobProfile getAcceptedJob() {
+ return getJob(StateSearchKey.ACCEPTED);
+ }
+
+ public List<JobProfile> getAcceptedJobs() {
+ return getJobs(StateSearchKey.ACCEPTED);
+ }
+
+ /**
+ * update job state and search it by key name
+ * @param jobId - job key name
+ * @param stateSearchKey - job state
+ */
+ public void updateJobState(String jobId, StateSearchKey stateSearchKey) {
+ KeyValueEntity entity = db.get(JOB_ID_PREFIX + jobId);
+ if (entity != null) {
+ entity.setStateSearchKey(stateSearchKey);
+ db.put(entity);
+ }
+ }
+
+ /**
+ * store job profile
+ * @param jobProfile - job profile
+ */
+ public void storeJob(JobProfile jobProfile) {
+
+ if (jobProfile.allRequiredKeyExist()) {
+ String keyName = JOB_ID_PREFIX + jobProfile.get(JOB_ID);
+ KeyValueEntity entity = new KeyValueEntity(keyName,
jobProfile.toJsonStr());
+ db.put(entity);
+ }
+ }
+
+ public void storeJob(List<JobProfile> jobProfileList) {
+ if (jobProfileList != null && jobProfileList.size() > 0) {
+ for (JobProfile jobProfile : jobProfileList) {
+ storeJob(jobProfile);
+ }
+ }
+ }
+
+ public void deleteJob(String id) {
+ String keyName = JOB_ID_PREFIX + id;
+ db.remove(keyName);
+ }
+
+ /**
+ * get job conf by state
+ * @param stateSearchKey - state index for searching.
+ * @return
+ */
+ public JobProfile getJob(StateSearchKey stateSearchKey) {
+ KeyValueEntity entity = db.searchOne(stateSearchKey);
+ if (entity != null) {
+ return entity.getAsJobProfile();
+ }
+ return null;
+ }
+
+ /**
+ * get list of job profiles.
+ * @param stateSearchKey - state search key.
+ * @return - list of job profile.
+ */
+ public List<JobProfile> getJobs(StateSearchKey stateSearchKey) {
+ List<KeyValueEntity> entityList = db.search(stateSearchKey);
+ List<JobProfile> profileList = new ArrayList<>();
+ for (KeyValueEntity entity : entityList) {
+ profileList.add(entity.getAsJobProfile());
+ }
+ return profileList;
+ }
+}
diff --git
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
index cd87f2f..c43d7aa 100644
---
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
+++
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
@@ -17,6 +17,8 @@ import com.sleepycat.persist.model.Entity;
import com.sleepycat.persist.model.PrimaryKey;
import com.sleepycat.persist.model.Relationship;
import com.sleepycat.persist.model.SecondaryKey;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.conf.TriggerProfile;
/**
* key value entity. key is string and value is a json
@@ -65,4 +67,21 @@ public class KeyValueEntity {
this.jsonValue = jsonValue;
return this;
}
+
+ /**
+ * convert keyValue to job profile
+ * @return JobConfiguration
+ */
+ public JobProfile getAsJobProfile() {
+ // convert jsonValue to jobConfiguration
+ return JobProfile.parseJsonStr(getJsonValue());
+ }
+
+ /**
+ * convert keyValue to trigger profile
+ * @return
+ */
+ public TriggerProfile getAsTriggerProfile() {
+ return TriggerProfile.parseJsonStr(getJsonValue());
+ }
}
diff --git
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/TriggerProfileDB.java
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/TriggerProfileDB.java
new file mode 100644
index 0000000..14abd1a
--- /dev/null
+++
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/TriggerProfileDB.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tubemq.agent.conf.TriggerProfile;
+import org.apache.tubemq.agent.constants.JobConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * db interface for trigger profile.
+ */
+public class TriggerProfileDB {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TriggerProfileDB.class);
+
+ // prefix for trigger profile.
+ private static final String TRIGGER_ID_PREFIX = "trigger_";
+
+ private final DB db;
+
+ public TriggerProfileDB(DB db) {
+ this.db = db;
+ }
+
+ /**
+ * get trigger list from db.
+ * @return - list of trigger
+ */
+ public List<TriggerProfile> getTriggers() {
+ // potential performance issue, needs to find out the speed.
+ List<KeyValueEntity> result = this.db.findAll(TRIGGER_ID_PREFIX);
+ List<TriggerProfile> triggerList = new ArrayList<>();
+ for (KeyValueEntity entity : result) {
+ triggerList.add(entity.getAsTriggerProfile());
+ }
+ return triggerList;
+ }
+
+ /**
+ * store trigger profile.
+ * @param trigger - trigger
+ */
+ public void storeTrigger(TriggerProfile trigger) {
+ if (trigger.allRequiredKeyExist()) {
+ String keyName = TRIGGER_ID_PREFIX +
trigger.get(JobConstants.JOB_ID);
+ KeyValueEntity entity = new KeyValueEntity(keyName,
trigger.toJsonStr());
+ KeyValueEntity oldEntity = db.put(entity);
+ if (oldEntity != null) {
+ LOGGER.warn("trigger profile {} has been replaced",
oldEntity.getKey());
+ }
+ }
+ }
+
+ /**
+ * delete trigger by id.
+ * @param id
+ */
+ public void deleteTrigger(String id) {
+ String triggerKey = TRIGGER_ID_PREFIX + id;
+ db.remove(triggerKey);
+ }
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
index 119cc0c..d019316 100644
---
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
* Agent entrance class
*/
public class AgentMain {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(AgentMain.class);
/**
@@ -77,6 +78,22 @@ public class AgentMain {
}
/**
+ * Stopping agent gracefully if get killed.
+ *
+ * @param manager - agent manager
+ */
+ private static void stopManagerIfKilled(AgentManager manager) {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ LOGGER.info("stopping agent gracefully");
+ manager.stop();
+ } catch (Exception ex) {
+ LOGGER.error("exception while stopping threads", ex);
+ }
+ }));
+ }
+
+ /**
* Main entrance.
*
* @param args - arguments
@@ -86,5 +103,15 @@ public class AgentMain {
CommandLine cl = initOptions(args);
assert cl != null;
initAgentConf(cl);
+ AgentManager manager = new AgentManager();
+ try {
+ manager.start();
+ stopManagerIfKilled(manager);
+ manager.join();
+ } catch (Exception ex) {
+ LOGGER.error("exception caught", ex);
+ } finally {
+ manager.stop();
+ }
}
}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
new file mode 100644
index 0000000..1c8bdc6
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core;
+
+
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.job.JobManager;
+import org.apache.tubemq.agent.core.task.TaskManager;
+import org.apache.tubemq.agent.db.DB;
+import org.apache.tubemq.agent.db.JobProfileDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Agent Manager, the bridge for job manager, task manager, db e.t.c it
manages agent level
+ * operations and communicates with outside system.
+ */
+public class AgentManager extends AbstractDaemon {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AgentManager.class);
+ private static JobManager jobManager;
+ private static TaskManager taskManager;
+
+
+ private final long waitTime;
+ private final AgentConfiguration conf;
+ private final DB db;
+
+ public AgentManager() {
+ conf = AgentConfiguration.getAgentConf();
+ this.db = initDB();
+ jobManager = new JobManager(this, new JobProfileDB(db));
+ taskManager = new TaskManager(this);
+
+ this.waitTime = conf.getLong(
+ AgentConstants.THREAD_POOL_AWAIT_TIME,
AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
+ }
+
+
+ /**
+ * init db by class name
+ *
+ * @return db
+ */
+ private DB initDB() {
+ try {
+ // db is a required component, so if not init correctly,
+ // throw exception and stop running.
+ return (DB) Class.forName(conf.get(
+ AgentConstants.AGENT_DB_CLASSNAME,
AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
+ .newInstance();
+ } catch (Exception ex) {
+ throw new UnsupportedClassVersionError(ex.getMessage());
+ }
+ }
+
+ public JobManager getJobManager() {
+ return jobManager;
+ }
+
+ public TaskManager getTaskManager() {
+ return taskManager;
+ }
+
+ @Override
+ public void join() {
+ super.join();
+ jobManager.join();
+ taskManager.join();
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.info("starting agent manager");
+ jobManager.start();
+ taskManager.start();
+ }
+
+ /**
+ * It should guarantee thread-safe, and can be invoked many times.
+ *
+ * @throws Exception exceptions
+ */
+ @Override
+ public void stop() throws Exception {
+ // TODO: change job state which is in running state.
+ LOGGER.info("stopping agent manager");
+ // close in order: trigger -> job -> task
+ jobManager.stop();
+ taskManager.stop();
+ this.db.close();
+ }
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/Job.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/Job.java
new file mode 100644
index 0000000..276cac9
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/Job.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.job;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.JobConstants;
+import org.apache.tubemq.agent.core.task.Task;
+import org.apache.tubemq.agent.plugin.Channel;
+import org.apache.tubemq.agent.plugin.Reader;
+import org.apache.tubemq.agent.plugin.Sink;
+import org.apache.tubemq.agent.plugin.Source;
+
+/**
+ * job meta definition, job will be split into several tasks.
+ */
+public class Job {
+
+ private final JobProfile jobConf;
+ // job name
+ private String name;
+ // job description
+ private String description;
+ private String jobId;
+
+ public Job(JobProfile jobConf) {
+ this.jobConf = jobConf;
+ this.name = jobConf.get(JobConstants.JOB_NAME,
JobConstants.DEFAULT_JOB_NAME);
+ this.description = jobConf.get(
+ JobConstants.JOB_DESCRIPTION,
JobConstants.DEFAULT_JOB_DESCRIPTION);
+ this.jobId = jobConf.get(JobConstants.JOB_ID);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public List<Task> createTasks() {
+ List<Task> taskList = new ArrayList<>();
+ int index = 0;
+ try {
+ Source source = (Source)
Class.forName(jobConf.get(JobConstants.JOB_SOURCE)).newInstance();
+ for (Reader reader : source.split(jobConf)) {
+ Sink writer = (Sink)
Class.forName(jobConf.get(JobConstants.JOB_SINK)).newInstance();
+ Channel channel = (Channel)
Class.forName(jobConf.get(JobConstants.JOB_CHANNEL)).newInstance();
+ String taskId = String.format("%s_%d", jobId, index++);
+ taskList.add(new Task(taskId, reader, writer, channel,
getJobConf()));
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ return taskList;
+ }
+
+ public JobProfile getJobConf() {
+ return this.jobConf;
+ }
+
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobManager.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobManager.java
new file mode 100644
index 0000000..c98b441
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobManager.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.job;
+
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.common.AgentThreadFactory;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.db.JobProfileDB;
+import org.apache.tubemq.agent.db.StateSearchKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JobManager maintains lots of jobs, and communicate between server and task
manager.
+ */
+public class JobManager extends AbstractDaemon {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JobManager.class);
+
+ // key is job instance id.
+ private final ConcurrentHashMap<String, JobWrapper> jobs;
+ // jobs which are not accepted by running pool.
+ private final ConcurrentHashMap<String, Job> pendingJobs;
+ // job thread pool
+ private final ThreadPoolExecutor runningPool;
+ private final AgentManager agentManager;
+ private final int monitorInterval;
+
+ private final JobProfileDB jobConfDB;
+
+ /**
+ * init job manager
+ *
+ * @param agentManager - agent manager
+ */
+ public JobManager(AgentManager agentManager, JobProfileDB jobConfDB) {
+ this.jobConfDB = jobConfDB;
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ this.agentManager = agentManager;
+ // job thread pool for running
+ this.runningPool = new ThreadPoolExecutor(
+ conf.getInt(AgentConstants.JOB_RUNNING_THREAD_CORE_SIZE,
+ AgentConstants.DEFAULT_JOB_RUNNING_THREAD_CORE_SIZE),
+ conf.getInt(
+ AgentConstants.JOB_RUNNING_THREAD_MAX_SIZE,
AgentConstants.DEFAULT_JOB_RUNNING_THREAD_MAX_SIZE),
+ conf.getLong(AgentConstants.JOB_RUNNING_THREAD_KEEP_ALIVE,
+ AgentConstants.DEFAULT_JOB_RUNNING_THREAD_KEEP_ALIVE),
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(
+ conf.getInt(
+ AgentConstants.JOB_PENDING_MAX,
AgentConstants.DEFAULT_JOB_PENDING_MAX)),
+ new AgentThreadFactory("job"));
+ this.jobs = new ConcurrentHashMap<>();
+ this.pendingJobs = new ConcurrentHashMap<>();
+ this.monitorInterval = conf
+ .getInt(
+ AgentConstants.JOB_MONITOR_INTERVAL,
AgentConstants.DEFAULT_JOB_MONITOR_INTERVAL);
+
+ }
+
+ /**
+ * submit job to work thread.
+ *
+ * @param job - job
+ * @return - whether submitting job successfully.
+ */
+ private void addJob(Job job) {
+ try {
+ JobWrapper jobWrapper = new JobWrapper(agentManager, job);
+ this.runningPool.execute(jobWrapper);
+ jobs.putIfAbsent(jobWrapper.getJob().getJobId(), jobWrapper);
+ } catch (Exception rje) {
+ LOGGER.error("reject job {}", job.getJobId(), rje);
+ pendingJobs.putIfAbsent(job.getJobId(), job);
+ }
+ }
+
+ /**
+ * add job profile
+ * @param profile - job profile.
+ */
+ public void submitJobProfile(JobProfile profile) {
+ if (profile != null && profile.allRequiredKeyExist()) {
+ getJobConfDB().storeJob(profile);
+ addJob(new Job(profile));
+ }
+ }
+
+ /**
+ * start all accepted jobs.
+ */
+ private void startJobs() {
+ List<JobProfile> profileList = getJobConfDB().getAcceptedJobs();
+ for (JobProfile profile : profileList) {
+ addJob(new Job(profile));
+ }
+ }
+
+ public Runnable jobStateCheckThread() {
+ return () -> {
+ while (isRunnable()) {
+ try {
+ // check pending jobs and try to submit again.
+ for (String jobId : pendingJobs.keySet()) {
+ Job job = pendingJobs.remove(jobId);
+ if (job != null) {
+ addJob(job);
+ }
+ }
+ TimeUnit.SECONDS.sleep(monitorInterval);
+ } catch (Exception ex) {
+ LOGGER.error("error caught", ex);
+ }
+ }
+ };
+ }
+
+ /**
+ * mark job as success by job id.
+ * @param jobId - job id
+ */
+ public void markJobAsSuccess(String jobId) {
+ JobWrapper wrapper = jobs.remove(jobId);
+ if (wrapper != null) {
+ LOGGER.info("job instance {} is success", jobId);
+ // mark job as success.
+ jobConfDB.updateJobState(jobId, StateSearchKey.SUCCESS);
+ }
+ }
+
+ public JobProfileDB getJobConfDB() {
+ return jobConfDB;
+ }
+
+ @Override
+ public void start() {
+ submitWorker(jobStateCheckThread());
+ startJobs();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ waitForTerminate();
+ this.runningPool.shutdown();
+ }
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobWrapper.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobWrapper.java
new file mode 100644
index 0000000..cd5a327
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobWrapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.core.task.Task;
+import org.apache.tubemq.agent.core.task.TaskManager;
+import org.apache.tubemq.agent.state.AbstractStateWrapper;
+import org.apache.tubemq.agent.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JobWrapper is used in JobManager, it defines the life cycle of
+ * running job and maintains the state of job.
+ */
+public class JobWrapper extends AbstractStateWrapper {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JobWrapper.class);
+
+ private final AgentConfiguration agentConf;
+ private final TaskManager taskManager;
+ private final JobManager jobManager;
+ private final Job job;
+
+ private final List<Task> allTasks;
+
+ public JobWrapper(AgentManager manager, Job job) {
+ super();
+ this.agentConf = AgentConfiguration.getAgentConf();
+ this.taskManager = manager.getTaskManager();
+ this.jobManager = manager.getJobManager();
+ this.job = job;
+ this.allTasks = new ArrayList<>();
+ doChangeState(State.ACCEPTED);
+ }
+
+ /**
+ * check states of all tasks, wait if one of them not finished.
+ */
+ private void checkAllTasksStateAndWait() throws Exception {
+ boolean isFinished;
+
+ long checkInterval = agentConf.getLong(
+ AgentConstants.JOB_FINISH_CHECK_INTERVAL,
AgentConstants.DEFAULT_JOB_FINISH_CHECK_INTERVAL);
+ do {
+ // check whether all tasks have finished.
+ isFinished = allTasks.stream().allMatch(task ->
+ taskManager.isTaskFinished(task.getTaskId()));
+ TimeUnit.SECONDS.sleep(checkInterval);
+ } while (!isFinished);
+ LOGGER.info("all tasks of {} has been checked", job.getJobId());
+ doChangeState(State.SUCCEEDED);
+ }
+
+ /**
+ * submit all tasks
+ */
+ private void submitAllTasks() {
+ List<Task> tasks = job.createTasks();
+ tasks.forEach(task -> {
+ allTasks.add(task);
+ taskManager.submitTask(task);
+ });
+ }
+
+ /**
+ * get job
+ * @return job
+ */
+ public Job getJob() {
+ return job;
+ }
+
+ /**
+ * cleanup job
+ */
+ private void cleanup() {
+ allTasks.forEach(task -> taskManager.removeTask(task.getTaskId()));
+ }
+
+ @Override
+ public void run() {
+ try {
+ doChangeState(State.RUNNING);
+ submitAllTasks();
+ checkAllTasksStateAndWait();
+ cleanup();
+ } catch (Exception ex) {
+ doChangeState(State.FAILED);
+ LOGGER.error("error caught: {}, message: {}",
+ job.getJobConf().toJsonStr(), ex.getMessage());
+ }
+ }
+
+ @Override
+ public void addCallbacks() {
+ this.addCallback(State.ACCEPTED, State.RUNNING, (before, after) -> {
+
+ }).addCallback(State.RUNNING, State.FAILED, (before, after) -> {
+
+ }).addCallback(State.RUNNING, State.SUCCEEDED, ((before, after) -> {
+ jobManager.markJobAsSuccess(job.getJobId());
+ }));
+ }
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/Task.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/Task.java
new file mode 100644
index 0000000..6f79f42
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/Task.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.plugin.Channel;
+import org.apache.tubemq.agent.plugin.Reader;
+import org.apache.tubemq.agent.plugin.Sink;
+
+
+/**
+ * task meta definition which contains reader -> channel -> sink and job
config information
+ */
+public class Task {
+
+ private final String taskId;
+ private final Reader reader;
+ private final Sink sink;
+ private final Channel channel;
+ private final JobProfile jobConf;
+
+ public Task(String taskId, Reader reader, Sink sink, Channel channel,
+ JobProfile jobConf) {
+ this.reader = reader;
+ this.sink = sink;
+ this.taskId = taskId;
+ this.channel = channel;
+ this.jobConf = jobConf;
+ }
+
+ public boolean isReadFinished() {
+ return reader.isFinished();
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public Reader getReader() {
+ return reader;
+ }
+
+ public Sink getSink() {
+ return sink;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public void init() {
+ this.channel.init(jobConf);
+ this.sink.init(jobConf);
+ this.reader.init(jobConf);
+ }
+
+ public void destroy() {
+ this.reader.destroy();
+ this.sink.destroy();
+ this.channel.destroy();
+ }
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskManager.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskManager.java
new file mode 100644
index 0000000..988904c
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskManager.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.common.AgentThreadFactory;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.core.job.JobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task manager maintains lots of tasks and communicate with job level
components.
+ * It also provide functions to execute commands from job level like
killing/submit tasks.
+ */
+public class TaskManager extends AbstractDaemon {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JobManager.class);
+
+ // task thread pool;
+ private final ThreadPoolExecutor runningPool;
+ private final AgentManager agentManager;
+ private final TaskMetrics taskMetrics;
+ private final ConcurrentHashMap<String, TaskWrapper> tasks;
+ private final BlockingQueue<TaskWrapper> retryTasks;
+ private final int monitorInterval;
+ private final int taskMaxCapacity;
+ private final int taskRetryMaxTime;
+ private final long waitTime;
+
+ /**
+ * Init task manager.
+ *
+ * @param agentManager - agent manager
+ */
+ public TaskManager(AgentManager agentManager) {
+ this.agentManager = agentManager;
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ // control running tasks by setting pool size.
+ this.runningPool = new ThreadPoolExecutor(
+ conf.getInt(
+ AgentConstants.TASK_RUNNING_THREAD_CORE_SIZE,
AgentConstants.DEFAULT_TASK_RUNNING_THREAD_CORE_SIZE),
+ conf.getInt(
+ AgentConstants.TASK_RUNNING_THREAD_MAX_SIZE,
AgentConstants.DEFAULT_TASK_RUNNING_THREAD_MAX_SIZE),
+ conf.getLong(AgentConstants.TASK_RUNNING_THREAD_KEEP_ALIVE,
+ AgentConstants.DEFAULT_TASK_RUNNING_THREAD_KEEP_ALIVE),
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(
+ conf.getInt(
+ AgentConstants.TASK_PENDING_MAX,
AgentConstants.DEFAULT_TASK_PENDING_MAX)),
+ new AgentThreadFactory("task"));
+ // metric for task level
+ taskMetrics = TaskMetrics.getMetrics();
+ tasks = new ConcurrentHashMap<>();
+ retryTasks = new LinkedBlockingQueue<>(
+ conf.getInt(
+ AgentConstants.TASK_RETRY_MAX_CAPACITY,
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY));
+ monitorInterval = conf.getInt(
+ AgentConstants.TASK_MONITOR_INTERVAL,
AgentConstants.DEFAULT_TASK_MONITOR_INTERVAL);
+ taskRetryMaxTime = conf
+ .getInt(AgentConstants.TASK_RETRY_SUBMIT_WAIT_SECONDS,
+ AgentConstants.DEFAULT_TASK_RETRY_SUBMIT_WAIT_SECONDS);
+ taskMaxCapacity = conf.getInt(
+ AgentConstants.TASK_RETRY_MAX_CAPACITY,
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY);
+ waitTime = conf.getLong(
+ AgentConstants.THREAD_POOL_AWAIT_TIME,
AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
+ }
+
+ /**
+ * Get task metrics
+ *
+ * @return task metrics
+ */
+ public TaskMetrics getTaskMetrics() {
+ return taskMetrics;
+ }
+
+ public TaskWrapper getTaskWrapper(String taskId) {
+ return tasks.get(taskId);
+ }
+
+ /**
+ * submit task, wait if task queue is full.
+ *
+ * @param task - task
+ */
+ public void submitTask(Task task) {
+ try {
+ TaskWrapper taskWrapper = new TaskWrapper(agentManager, task);
+ taskWrapper =
+ tasks.putIfAbsent(task.getTaskId(), taskWrapper) == null ?
taskWrapper : null;
+ if (taskWrapper != null) {
+ this.runningPool.submit(taskWrapper).get();
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("reject task {}", task.getTaskId(), ex);
+ }
+ }
+
+ /**
+ * retry task.
+ *
+ * @param wrapper - task wrapper
+ */
+ void retryTask(TaskWrapper wrapper) {
+ LOGGER.info("retry submit task {}", wrapper.getTask().getTaskId());
+ try {
+ boolean success = retryTasks.offer(wrapper, taskRetryMaxTime,
TimeUnit.SECONDS);
+ if (!success) {
+ LOGGER.error("cannot submit to retry queue, max {}, current
{}", taskMaxCapacity,
+ retryTasks.size());
+ }
+ } catch (Exception ex) {
+ LOGGER.error("error while offer task", ex);
+ }
+ }
+
+ /**
+ * Check whether task is finished
+ *
+ * @param taskId - task id
+ * @return - true if task is finished otherwise false
+ */
+ public boolean isTaskFinished(String taskId) {
+ TaskWrapper wrapper = tasks.get(taskId);
+ if (wrapper != null) {
+ return wrapper.isFinished();
+ }
+ return false;
+ }
+
+ /**
+ * Check if task is success
+ *
+ * @param taskId task id
+ * @return true if task is success otherwise false
+ */
+ public boolean isTaskSuccess(String taskId) {
+ TaskWrapper wrapper = tasks.get(taskId);
+ if (wrapper != null) {
+ return wrapper.isSuccess();
+ }
+ return false;
+ }
+
+ /**
+ * Remove task by task id
+ *
+ * @param taskId - task id
+ */
+ public void removeTask(String taskId) {
+ tasks.remove(taskId);
+ }
+
+ /**
+ * kill task
+ *
+ * @param task task
+ * @return
+ */
+ public boolean killTask(Task task) {
+ // kill running tasks.
+ TaskWrapper taskWrapper = tasks.get(task.getTaskId());
+ if (taskWrapper != null) {
+ taskWrapper.kill();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Thread for checking whether task should retry.
+ *
+ * @return - runnable thread
+ */
+ public Runnable createTaskMonitorThread() {
+ return () -> {
+ while (isRunnable()) {
+ try {
+ for (String taskId : tasks.keySet()) {
+ TaskWrapper wrapper = tasks.get(taskId);
+ if (wrapper != null && wrapper.isFailed() &&
wrapper.shouldRetry()) {
+ retryTask(wrapper);
+ }
+ }
+ TimeUnit.SECONDS.sleep(monitorInterval);
+ } catch (Exception ex) {
+ LOGGER.error("Exception caught", ex);
+ }
+ }
+ };
+ }
+
+ /**
+ * start service.
+ */
+ @Override
+ public void start() {
+ submitWorker(createTaskMonitorThread());
+ }
+
+ /**
+ * stop service.
+ */
+ @Override
+ public void stop() throws Exception {
+ waitForTerminate();
+ this.runningPool.shutdown();
+ }
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskMetrics.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskMetrics.java
new file mode 100644
index 0000000..d3b5334
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskMetrics.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+import org.apache.tubemq.agent.metrics.Metric;
+import org.apache.tubemq.agent.metrics.Metrics;
+import org.apache.tubemq.agent.metrics.MetricsRegister;
+import org.apache.tubemq.agent.metrics.counter.CounterLong;
+import org.apache.tubemq.agent.metrics.gauge.GaugeInt;
+
+/**
+ * Metric collector for task level.
+ */
+@Metrics
+public class TaskMetrics {
+
+ private static final TaskMetrics METRICS = new TaskMetrics();
+
+ @Metric
+ GaugeInt runningTasks;
+
+ @Metric
+ CounterLong fatalTasks;
+
+
+ private TaskMetrics() {
+ // every metric should register, otherwise not working.
+ MetricsRegister.register(this);
+ }
+
+ public static TaskMetrics getMetrics() {
+ return METRICS;
+ }
+}
diff --git
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskWrapper.java
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskWrapper.java
new file mode 100644
index 0000000..7b026d0
--- /dev/null
+++
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskWrapper.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tubemq.agent.common.AgentThreadFactory;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.message.EndMessage;
+import org.apache.tubemq.agent.plugin.Message;
+import org.apache.tubemq.agent.state.AbstractStateWrapper;
+import org.apache.tubemq.agent.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TaskWrapper is used in taskManager, it maintains the life cycle of
+ * running task.
+ */
+public class TaskWrapper extends AbstractStateWrapper {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TaskWrapper.class);
+
+ private final TaskManager taskManager;
+ private final Task task;
+
+ private final AtomicInteger retryTime = new AtomicInteger(0);
+ private final int maxRetryTime;
+ private final int pushMaxWaitTime;
+ private final int pullMaxWaitTime;
+ private ExecutorService executorService;
+
+ public TaskWrapper(AgentManager manager, Task task) {
+ super();
+ this.taskManager = manager.getTaskManager();
+ this.task = task;
+ AgentConfiguration conf = AgentConfiguration.getAgentConf();
+ maxRetryTime = conf.getInt(
+ AgentConstants.TASK_MAX_RETRY_TIME,
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
+ pushMaxWaitTime = conf.getInt(
+ AgentConstants.TASK_PUSH_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
+ pullMaxWaitTime = conf.getInt(
+ AgentConstants.TASK_PULL_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
+ if (executorService == null) {
+ executorService = Executors.newCachedThreadPool(
+ new AgentThreadFactory("task-reader-writer"));
+ }
+ doChangeState(State.ACCEPTED);
+ }
+
+ /**
+ * submit read thread
+ *
+ * @return CompletableFuture
+ */
+ private CompletableFuture<?> submitReadThread() {
+ return CompletableFuture.runAsync(() -> {
+ Message message = null;
+ while (!task.isReadFinished() && !isException()) {
+ if (message == null || task.getChannel()
+ .push(message, pushMaxWaitTime, TimeUnit.SECONDS)) {
+ message = task.getReader().read();
+ }
+ }
+ // write end message
+ task.getChannel().push(new EndMessage());
+ }, executorService);
+ }
+
+ /**
+ * submit write thread
+ *
+ * @return CompletableFuture
+ */
+ private CompletableFuture<?> submitWriteThread() {
+ return CompletableFuture.runAsync(() -> {
+ while (!isException()) {
+ Message message = task.getChannel().pull(pullMaxWaitTime,
TimeUnit.SECONDS);
+ if (message instanceof EndMessage) {
+ break;
+ }
+ task.getSink().write(message);
+ }
+ }, executorService);
+ }
+
+ /**
+ * submit reader/writer
+ */
+ private void submitThreadsAndWait() {
+ task.init();
+ CompletableFuture<?> reader = submitReadThread();
+ CompletableFuture<?> writer = submitWriteThread();
+ CompletableFuture.anyOf(reader, writer)
+ .exceptionally(ex -> {
+ doChangeState(State.FAILED);
+ LOGGER.error("exception caught", ex);
+ return null;
+ }).join();
+ }
+
+ /**
+ * kill task
+ */
+ void kill() {
+ doChangeState(State.KILLED);
+ }
+
+ /**
+ * whether task retry times exceed max retry time.
+ *
+ * @return - whether should retry
+ */
+ boolean shouldRetry() {
+ return retryTime.get() < maxRetryTime;
+ }
+
+ Task getTask() {
+ return task;
+ }
+
+ @Override
+ public void addCallbacks() {
+ this.addCallback(State.ACCEPTED, State.RUNNING, (before, after) -> {
+ taskManager.getTaskMetrics().runningTasks.incr();
+ }).addCallback(State.RUNNING, State.FAILED, (before, after) -> {
+ retryTime.incrementAndGet();
+ if (!shouldRetry()) {
+ doChangeState(State.FATAL);
+ taskManager.getTaskMetrics().fatalTasks.incr();
+ }
+ }).addCallback(State.FAILED, State.FATAL, (before, after) -> {
+
+ }).addCallback(State.FAILED, State.ACCEPTED, (before, after) -> {
+ retryTime.incrementAndGet();
+ }).addCallback(State.RUNNING, State.SUCCEEDED, (before, after) -> {
+ taskManager.getTaskMetrics().runningTasks.decr();
+ });
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ LOGGER.info("start to run {}", task.getTaskId());
+ task.init();
+ doChangeState(State.RUNNING);
+ submitThreadsAndWait();
+ if (!isException()) {
+ doChangeState(State.SUCCEEDED);
+ }
+ task.destroy();
+ } catch (Exception ex) {
+ LOGGER.error("error while running wrapper", ex);
+ doChangeState(State.FAILED);
+ }
+ }
+
+
+}