This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
     new d628628  [TUBEMQ-483] add agent/job/task manager for data collection 
managerment.
d628628 is described below

commit d62862865b567d1bda8e877f593a23290ea47705
Author: yuanbo <[email protected]>
AuthorDate: Fri Dec 25 11:27:01 2020 +0800

    [TUBEMQ-483] add agent/job/task manager for data collection managerment.
---
 .../apache/tubemq/agent/common/AbstractDaemon.java |  87 ++++++++
 .../tubemq/agent/common/AgentThreadFactory.java    |  40 ++++
 .../org/apache/tubemq/agent/common/Service.java    |  35 ++++
 .../org/apache/tubemq/agent/db/JobProfileDB.java   | 114 ++++++++++
 .../org/apache/tubemq/agent/db/KeyValueEntity.java |  19 ++
 .../apache/tubemq/agent/db/TriggerProfileDB.java   |  77 +++++++
 .../org/apache/tubemq/agent/core/AgentMain.java    |  27 +++
 .../org/apache/tubemq/agent/core/AgentManager.java | 106 ++++++++++
 .../java/org/apache/tubemq/agent/core/job/Job.java |  92 +++++++++
 .../apache/tubemq/agent/core/job/JobManager.java   | 166 +++++++++++++++
 .../apache/tubemq/agent/core/job/JobWrapper.java   | 122 +++++++++++
 .../org/apache/tubemq/agent/core/task/Task.java    |  73 +++++++
 .../apache/tubemq/agent/core/task/TaskManager.java | 229 +++++++++++++++++++++
 .../apache/tubemq/agent/core/task/TaskMetrics.java |  45 ++++
 .../apache/tubemq/agent/core/task/TaskWrapper.java | 177 ++++++++++++++++
 15 files changed, 1409 insertions(+)

diff --git 
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AbstractDaemon.java
 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AbstractDaemon.java
new file mode 100644
index 0000000..5982658
--- /dev/null
+++ 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AbstractDaemon.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Providing work threads management, those threads run
+ * periodically until agent is stopped.
+ */
+public abstract class AbstractDaemon implements Service {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractDaemon.class);
+
+    // worker thread pool, share it
+    private static final ExecutorService workerServices = 
Executors.newCachedThreadPool();
+    private final List<CompletableFuture<?>> workerFutures;
+    private boolean runnable = true;
+
+    public AbstractDaemon() {
+        this.workerFutures = new ArrayList<>();
+    }
+
+    /**
+     * Whether threads can in running state with while loop.
+     *
+     * @return - true if threads can run
+     */
+    public boolean isRunnable() {
+        return runnable;
+    }
+
+    /**
+     * Stop running threads.
+     */
+    public void stopRunningThreads() {
+        runnable = false;
+    }
+
+    /**
+     * Submit work thread to thread pool.
+     *
+     * @param worker - work thread
+     */
+    public void submitWorker(Runnable worker) {
+        CompletableFuture<?> future = CompletableFuture.runAsync(worker, 
workerServices);
+        workerFutures.add(future);
+        LOGGER.info("{} running worker number is {}", 
this.getClass().getName(),
+                workerFutures.size());
+    }
+
+    /**
+     * Wait for threads finish.
+     */
+    public void join() {
+        for (CompletableFuture<?> future : workerFutures) {
+            future.join();
+        }
+    }
+
+    /**
+     * Stop thread pool and running threads if they're in the running state.
+     */
+    public void waitForTerminate() {
+        // stop running threads.
+        if (isRunnable()) {
+            stopRunningThreads();
+        }
+    }
+}
diff --git 
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AgentThreadFactory.java
 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AgentThreadFactory.java
new file mode 100644
index 0000000..150f163
--- /dev/null
+++ 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/AgentThreadFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.common;
+
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AgentThreadFactory implements ThreadFactory {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentThreadFactory.class);
+
+    private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+    private final String threadType;
+
+    public AgentThreadFactory(String threadType) {
+        this.threadType = threadType;
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(r, threadType + "-running-thread-" + 
mThreadNum.getAndIncrement());
+        LOGGER.info("{} created", t.getName());
+        return t;
+    }
+}
\ No newline at end of file
diff --git 
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/Service.java
 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/Service.java
new file mode 100644
index 0000000..7a79775
--- /dev/null
+++ 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/common/Service.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.common;
+
+/**
+ * Service lifecycle interface.
+ */
+public interface Service {
+
+    /**
+     * start service
+     */
+    void start() throws Exception;
+
+    /**
+     * stop service
+     */
+    void stop() throws Exception;
+
+    /**
+     * join and wait until getting signal.
+     */
+    void join() throws Exception;
+}
diff --git 
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/JobProfileDB.java
 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/JobProfileDB.java
new file mode 100644
index 0000000..986802d
--- /dev/null
+++ 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/JobProfileDB.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_ID;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper for job conf persistence.
+ */
+public class JobProfileDB {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobProfileDB.class);
+
+    private static final String JOB_ID_PREFIX = "job_";
+    private final DB db;
+
+    public JobProfileDB(DB db) {
+        this.db = db;
+    }
+
+    /**
+     * get job which in accepted state
+     * @return null or job conf
+     */
+    public JobProfile getAcceptedJob() {
+        return getJob(StateSearchKey.ACCEPTED);
+    }
+
+    public List<JobProfile> getAcceptedJobs() {
+        return getJobs(StateSearchKey.ACCEPTED);
+    }
+
+    /**
+     * update job state and search it by key name
+     * @param jobId - job key name
+     * @param stateSearchKey - job state
+     */
+    public void updateJobState(String jobId, StateSearchKey stateSearchKey) {
+        KeyValueEntity entity = db.get(JOB_ID_PREFIX + jobId);
+        if (entity != null) {
+            entity.setStateSearchKey(stateSearchKey);
+            db.put(entity);
+        }
+    }
+
+    /**
+     * store job profile
+     * @param jobProfile - job profile
+     */
+    public void storeJob(JobProfile jobProfile) {
+
+        if (jobProfile.allRequiredKeyExist()) {
+            String keyName = JOB_ID_PREFIX + jobProfile.get(JOB_ID);
+            KeyValueEntity entity = new KeyValueEntity(keyName, 
jobProfile.toJsonStr());
+            db.put(entity);
+        }
+    }
+
+    public void storeJob(List<JobProfile> jobProfileList) {
+        if (jobProfileList != null && jobProfileList.size() > 0) {
+            for (JobProfile jobProfile : jobProfileList) {
+                storeJob(jobProfile);
+            }
+        }
+    }
+
+    public void deleteJob(String id) {
+        String keyName = JOB_ID_PREFIX + id;
+        db.remove(keyName);
+    }
+
+    /**
+     * get job conf by state
+     * @param stateSearchKey - state index for searching.
+     * @return
+     */
+    public JobProfile getJob(StateSearchKey stateSearchKey) {
+        KeyValueEntity entity = db.searchOne(stateSearchKey);
+        if (entity != null) {
+            return entity.getAsJobProfile();
+        }
+        return null;
+    }
+
+    /**
+     * get list of job profiles.
+     * @param stateSearchKey - state search key.
+     * @return - list of job profile.
+     */
+    public List<JobProfile> getJobs(StateSearchKey stateSearchKey) {
+        List<KeyValueEntity> entityList = db.search(stateSearchKey);
+        List<JobProfile> profileList = new ArrayList<>();
+        for (KeyValueEntity entity : entityList) {
+            profileList.add(entity.getAsJobProfile());
+        }
+        return profileList;
+    }
+}
diff --git 
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
index cd87f2f..c43d7aa 100644
--- 
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
+++ 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/KeyValueEntity.java
@@ -17,6 +17,8 @@ import com.sleepycat.persist.model.Entity;
 import com.sleepycat.persist.model.PrimaryKey;
 import com.sleepycat.persist.model.Relationship;
 import com.sleepycat.persist.model.SecondaryKey;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.conf.TriggerProfile;
 
 /**
  * key value entity. key is string and value is a json
@@ -65,4 +67,21 @@ public class KeyValueEntity {
         this.jsonValue = jsonValue;
         return this;
     }
+
+    /**
+     * convert keyValue to job profile
+     * @return JobConfiguration
+     */
+    public JobProfile getAsJobProfile() {
+        // convert jsonValue to jobConfiguration
+        return JobProfile.parseJsonStr(getJsonValue());
+    }
+
+    /**
+     * convert keyValue to trigger profile
+     * @return
+     */
+    public TriggerProfile getAsTriggerProfile() {
+        return TriggerProfile.parseJsonStr(getJsonValue());
+    }
 }
diff --git 
a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/TriggerProfileDB.java
 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/TriggerProfileDB.java
new file mode 100644
index 0000000..14abd1a
--- /dev/null
+++ 
b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/db/TriggerProfileDB.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.db;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tubemq.agent.conf.TriggerProfile;
+import org.apache.tubemq.agent.constants.JobConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * db interface for trigger profile.
+ */
+public class TriggerProfileDB {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TriggerProfileDB.class);
+
+    // prefix for trigger profile.
+    private static final String TRIGGER_ID_PREFIX = "trigger_";
+
+    private final DB db;
+
+    public TriggerProfileDB(DB db) {
+        this.db = db;
+    }
+
+    /**
+     * get trigger list from db.
+     * @return - list of trigger
+     */
+    public List<TriggerProfile> getTriggers() {
+        // potential performance issue, needs to find out the speed.
+        List<KeyValueEntity> result = this.db.findAll(TRIGGER_ID_PREFIX);
+        List<TriggerProfile> triggerList = new ArrayList<>();
+        for (KeyValueEntity entity : result) {
+            triggerList.add(entity.getAsTriggerProfile());
+        }
+        return triggerList;
+    }
+
+    /**
+     * store trigger profile.
+     * @param trigger - trigger
+     */
+    public void storeTrigger(TriggerProfile trigger) {
+        if (trigger.allRequiredKeyExist()) {
+            String keyName = TRIGGER_ID_PREFIX + 
trigger.get(JobConstants.JOB_ID);
+            KeyValueEntity entity = new KeyValueEntity(keyName, 
trigger.toJsonStr());
+            KeyValueEntity oldEntity = db.put(entity);
+            if (oldEntity != null) {
+                LOGGER.warn("trigger profile {} has been replaced", 
oldEntity.getKey());
+            }
+        }
+    }
+
+    /**
+     * delete trigger by id.
+     * @param id
+     */
+    public void deleteTrigger(String id) {
+        String triggerKey = TRIGGER_ID_PREFIX + id;
+        db.remove(triggerKey);
+    }
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
index 119cc0c..d019316 100644
--- 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
  * Agent entrance class
  */
 public class AgentMain {
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentMain.class);
 
     /**
@@ -77,6 +78,22 @@ public class AgentMain {
     }
 
     /**
+     * Stopping agent gracefully if get killed.
+     *
+     * @param manager - agent manager
+     */
+    private static void stopManagerIfKilled(AgentManager manager) {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                LOGGER.info("stopping agent gracefully");
+                manager.stop();
+            } catch (Exception ex) {
+                LOGGER.error("exception while stopping threads", ex);
+            }
+        }));
+    }
+
+    /**
      * Main entrance.
      *
      * @param args - arguments
@@ -86,5 +103,15 @@ public class AgentMain {
         CommandLine cl = initOptions(args);
         assert cl != null;
         initAgentConf(cl);
+        AgentManager manager = new AgentManager();
+        try {
+            manager.start();
+            stopManagerIfKilled(manager);
+            manager.join();
+        } catch (Exception ex) {
+            LOGGER.error("exception caught", ex);
+        } finally {
+            manager.stop();
+        }
     }
 }
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
new file mode 100644
index 0000000..1c8bdc6
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core;
+
+
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.job.JobManager;
+import org.apache.tubemq.agent.core.task.TaskManager;
+import org.apache.tubemq.agent.db.DB;
+import org.apache.tubemq.agent.db.JobProfileDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Agent Manager, the bridge for job manager, task manager, db e.t.c it 
manages agent level
+ * operations and communicates with outside system.
+ */
+public class AgentManager extends AbstractDaemon {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentManager.class);
+    private static JobManager jobManager;
+    private static TaskManager taskManager;
+
+
+    private final long waitTime;
+    private final AgentConfiguration conf;
+    private final DB db;
+
+    public AgentManager() {
+        conf = AgentConfiguration.getAgentConf();
+        this.db = initDB();
+        jobManager = new JobManager(this, new JobProfileDB(db));
+        taskManager = new TaskManager(this);
+
+        this.waitTime = conf.getLong(
+            AgentConstants.THREAD_POOL_AWAIT_TIME, 
AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
+    }
+
+
+    /**
+     * init db by class name
+     *
+     * @return db
+     */
+    private DB initDB() {
+        try {
+            // db is a required component, so if not init correctly,
+            // throw exception and stop running.
+            return (DB) Class.forName(conf.get(
+                AgentConstants.AGENT_DB_CLASSNAME, 
AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
+                .newInstance();
+        } catch (Exception ex) {
+            throw new UnsupportedClassVersionError(ex.getMessage());
+        }
+    }
+
+    public JobManager getJobManager() {
+        return jobManager;
+    }
+
+    public TaskManager getTaskManager() {
+        return taskManager;
+    }
+
+    @Override
+    public void join() {
+        super.join();
+        jobManager.join();
+        taskManager.join();
+    }
+
+    @Override
+    public void start() throws Exception {
+        LOGGER.info("starting agent manager");
+        jobManager.start();
+        taskManager.start();
+    }
+
+    /**
+     * It should guarantee thread-safe, and can be invoked many times.
+     *
+     * @throws Exception exceptions
+     */
+    @Override
+    public void stop() throws Exception {
+        // TODO: change job state which is in running state.
+        LOGGER.info("stopping agent manager");
+        // close in order: trigger -> job -> task
+        jobManager.stop();
+        taskManager.stop();
+        this.db.close();
+    }
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/Job.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/Job.java
new file mode 100644
index 0000000..276cac9
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/Job.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.job;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.JobConstants;
+import org.apache.tubemq.agent.core.task.Task;
+import org.apache.tubemq.agent.plugin.Channel;
+import org.apache.tubemq.agent.plugin.Reader;
+import org.apache.tubemq.agent.plugin.Sink;
+import org.apache.tubemq.agent.plugin.Source;
+
+/**
+ * job meta definition, job will be split into several tasks.
+ */
+public class Job {
+
+    private final JobProfile jobConf;
+    // job name
+    private String name;
+    // job description
+    private String description;
+    private String jobId;
+
+    public Job(JobProfile jobConf) {
+        this.jobConf = jobConf;
+        this.name = jobConf.get(JobConstants.JOB_NAME, 
JobConstants.DEFAULT_JOB_NAME);
+        this.description = jobConf.get(
+            JobConstants.JOB_DESCRIPTION, 
JobConstants.DEFAULT_JOB_DESCRIPTION);
+        this.jobId = jobConf.get(JobConstants.JOB_ID);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public List<Task> createTasks() {
+        List<Task> taskList = new ArrayList<>();
+        int index = 0;
+        try {
+            Source source = (Source) 
Class.forName(jobConf.get(JobConstants.JOB_SOURCE)).newInstance();
+            for (Reader reader : source.split(jobConf)) {
+                Sink writer = (Sink) 
Class.forName(jobConf.get(JobConstants.JOB_SINK)).newInstance();
+                Channel channel = (Channel) 
Class.forName(jobConf.get(JobConstants.JOB_CHANNEL)).newInstance();
+                String taskId = String.format("%s_%d", jobId, index++);
+                taskList.add(new Task(taskId, reader, writer, channel, 
getJobConf()));
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+        return taskList;
+    }
+
+    public JobProfile getJobConf() {
+        return this.jobConf;
+    }
+
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobManager.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobManager.java
new file mode 100644
index 0000000..c98b441
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobManager.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.job;
+
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.common.AgentThreadFactory;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.db.JobProfileDB;
+import org.apache.tubemq.agent.db.StateSearchKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JobManager maintains lots of jobs, and communicate between server and task 
manager.
+ */
+public class JobManager extends AbstractDaemon {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobManager.class);
+
+    // key is job instance id.
+    private final ConcurrentHashMap<String, JobWrapper> jobs;
+    // jobs which are not accepted by running pool.
+    private final ConcurrentHashMap<String, Job> pendingJobs;
+    // job thread pool
+    private final ThreadPoolExecutor runningPool;
+    private final AgentManager agentManager;
+    private final int monitorInterval;
+
+    private final JobProfileDB jobConfDB;
+
+    /**
+     * init job manager
+     *
+     * @param agentManager - agent manager
+     */
+    public JobManager(AgentManager agentManager, JobProfileDB jobConfDB) {
+        this.jobConfDB = jobConfDB;
+        AgentConfiguration conf = AgentConfiguration.getAgentConf();
+        this.agentManager = agentManager;
+        // job thread pool for running
+        this.runningPool = new ThreadPoolExecutor(
+                conf.getInt(AgentConstants.JOB_RUNNING_THREAD_CORE_SIZE,
+                        AgentConstants.DEFAULT_JOB_RUNNING_THREAD_CORE_SIZE),
+                conf.getInt(
+                    AgentConstants.JOB_RUNNING_THREAD_MAX_SIZE, 
AgentConstants.DEFAULT_JOB_RUNNING_THREAD_MAX_SIZE),
+                conf.getLong(AgentConstants.JOB_RUNNING_THREAD_KEEP_ALIVE,
+                        AgentConstants.DEFAULT_JOB_RUNNING_THREAD_KEEP_ALIVE),
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(
+                        conf.getInt(
+                            AgentConstants.JOB_PENDING_MAX, 
AgentConstants.DEFAULT_JOB_PENDING_MAX)),
+                new AgentThreadFactory("job"));
+        this.jobs = new ConcurrentHashMap<>();
+        this.pendingJobs = new ConcurrentHashMap<>();
+        this.monitorInterval = conf
+                .getInt(
+                    AgentConstants.JOB_MONITOR_INTERVAL, 
AgentConstants.DEFAULT_JOB_MONITOR_INTERVAL);
+
+    }
+
+    /**
+     * submit job to work thread.
+     *
+     * @param job - job
+     * @return - whether submitting job successfully.
+     */
+    private void addJob(Job job) {
+        try {
+            JobWrapper jobWrapper = new JobWrapper(agentManager, job);
+            this.runningPool.execute(jobWrapper);
+            jobs.putIfAbsent(jobWrapper.getJob().getJobId(), jobWrapper);
+        } catch (Exception rje) {
+            LOGGER.error("reject job {}", job.getJobId(), rje);
+            pendingJobs.putIfAbsent(job.getJobId(), job);
+        }
+    }
+
+    /**
+     * add job profile
+     * @param profile - job profile.
+     */
+    public void submitJobProfile(JobProfile profile) {
+        if (profile != null && profile.allRequiredKeyExist()) {
+            getJobConfDB().storeJob(profile);
+            addJob(new Job(profile));
+        }
+    }
+
+    /**
+     * start all accepted jobs.
+     */
+    private void startJobs() {
+        List<JobProfile> profileList = getJobConfDB().getAcceptedJobs();
+        for (JobProfile profile : profileList) {
+            addJob(new Job(profile));
+        }
+    }
+
+    public Runnable jobStateCheckThread() {
+        return () -> {
+            while (isRunnable()) {
+                try {
+                    // check pending jobs and try to submit again.
+                    for (String jobId : pendingJobs.keySet()) {
+                        Job job = pendingJobs.remove(jobId);
+                        if (job != null) {
+                            addJob(job);
+                        }
+                    }
+                    TimeUnit.SECONDS.sleep(monitorInterval);
+                } catch (Exception ex) {
+                    LOGGER.error("error caught", ex);
+                }
+            }
+        };
+    }
+
+    /**
+     * mark job as success by job id.
+     * @param jobId - job id
+     */
+    public void markJobAsSuccess(String jobId) {
+        JobWrapper wrapper = jobs.remove(jobId);
+        if (wrapper != null) {
+            LOGGER.info("job instance {} is success", jobId);
+            // mark job as success.
+            jobConfDB.updateJobState(jobId, StateSearchKey.SUCCESS);
+        }
+    }
+
+    public JobProfileDB getJobConfDB() {
+        return jobConfDB;
+    }
+
+    @Override
+    public void start() {
+        submitWorker(jobStateCheckThread());
+        startJobs();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        waitForTerminate();
+        this.runningPool.shutdown();
+    }
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobWrapper.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobWrapper.java
new file mode 100644
index 0000000..cd5a327
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/job/JobWrapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.job;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.core.task.Task;
+import org.apache.tubemq.agent.core.task.TaskManager;
+import org.apache.tubemq.agent.state.AbstractStateWrapper;
+import org.apache.tubemq.agent.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JobWrapper is used in JobManager, it defines the life cycle of
+ * running job and maintains the state of job.
+ */
+public class JobWrapper extends AbstractStateWrapper {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobWrapper.class);
+
+    private final AgentConfiguration agentConf;
+    private final TaskManager taskManager;
+    private final JobManager jobManager;
+    private final Job job;
+
+    private final List<Task> allTasks;
+
+    public JobWrapper(AgentManager manager, Job job) {
+        super();
+        this.agentConf = AgentConfiguration.getAgentConf();
+        this.taskManager = manager.getTaskManager();
+        this.jobManager = manager.getJobManager();
+        this.job = job;
+        this.allTasks = new ArrayList<>();
+        doChangeState(State.ACCEPTED);
+    }
+
+    /**
+     * check states of all tasks, wait if one of them not finished.
+     */
+    private void checkAllTasksStateAndWait() throws Exception {
+        boolean isFinished;
+
+        long checkInterval = agentConf.getLong(
+                AgentConstants.JOB_FINISH_CHECK_INTERVAL, 
AgentConstants.DEFAULT_JOB_FINISH_CHECK_INTERVAL);
+        do {
+            // check whether all tasks have finished.
+            isFinished = allTasks.stream().allMatch(task ->
+                    taskManager.isTaskFinished(task.getTaskId()));
+            TimeUnit.SECONDS.sleep(checkInterval);
+        } while (!isFinished);
+        LOGGER.info("all tasks of {} has been checked", job.getJobId());
+        doChangeState(State.SUCCEEDED);
+    }
+
+    /**
+     * submit all tasks
+     */
+    private void submitAllTasks() {
+        List<Task> tasks = job.createTasks();
+        tasks.forEach(task -> {
+            allTasks.add(task);
+            taskManager.submitTask(task);
+        });
+    }
+
+    /**
+     * get job
+     * @return job
+     */
+    public Job getJob() {
+        return job;
+    }
+
+    /**
+     * cleanup job
+     */
+    private void cleanup() {
+        allTasks.forEach(task -> taskManager.removeTask(task.getTaskId()));
+    }
+
+    @Override
+    public void run() {
+        try {
+            doChangeState(State.RUNNING);
+            submitAllTasks();
+            checkAllTasksStateAndWait();
+            cleanup();
+        } catch (Exception ex) {
+            doChangeState(State.FAILED);
+            LOGGER.error("error caught: {}, message: {}",
+                    job.getJobConf().toJsonStr(), ex.getMessage());
+        }
+    }
+
+    @Override
+    public void addCallbacks() {
+        this.addCallback(State.ACCEPTED, State.RUNNING, (before, after) -> {
+
+        }).addCallback(State.RUNNING, State.FAILED, (before, after) -> {
+
+        }).addCallback(State.RUNNING, State.SUCCEEDED, ((before, after) -> {
+            jobManager.markJobAsSuccess(job.getJobId());
+        }));
+    }
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/Task.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/Task.java
new file mode 100644
index 0000000..6f79f42
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/Task.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.plugin.Channel;
+import org.apache.tubemq.agent.plugin.Reader;
+import org.apache.tubemq.agent.plugin.Sink;
+
+
+/**
+ * task meta definition which contains reader -> channel -> sink and job 
config information
+ */
+public class Task {
+
+    private final String taskId;
+    private final Reader reader;
+    private final Sink sink;
+    private final Channel channel;
+    private final JobProfile jobConf;
+
+    public Task(String taskId, Reader reader, Sink sink, Channel channel,
+            JobProfile jobConf) {
+        this.reader = reader;
+        this.sink = sink;
+        this.taskId = taskId;
+        this.channel = channel;
+        this.jobConf = jobConf;
+    }
+
+    public boolean isReadFinished() {
+        return reader.isFinished();
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public Reader getReader() {
+        return reader;
+    }
+
+    public Sink getSink() {
+        return sink;
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public void init() {
+        this.channel.init(jobConf);
+        this.sink.init(jobConf);
+        this.reader.init(jobConf);
+    }
+
+    public void destroy() {
+        this.reader.destroy();
+        this.sink.destroy();
+        this.channel.destroy();
+    }
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskManager.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskManager.java
new file mode 100644
index 0000000..988904c
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskManager.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.common.AbstractDaemon;
+import org.apache.tubemq.agent.common.AgentThreadFactory;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.core.job.JobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task manager maintains lots of tasks and communicate with job level 
components.
+ * It also provide functions to execute commands from job level like 
killing/submit tasks.
+ */
+public class TaskManager extends AbstractDaemon {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobManager.class);
+
+    // task thread pool;
+    private final ThreadPoolExecutor runningPool;
+    private final AgentManager agentManager;
+    private final TaskMetrics taskMetrics;
+    private final ConcurrentHashMap<String, TaskWrapper> tasks;
+    private final BlockingQueue<TaskWrapper> retryTasks;
+    private final int monitorInterval;
+    private final int taskMaxCapacity;
+    private final int taskRetryMaxTime;
+    private final long waitTime;
+
+    /**
+     * Init task manager.
+     *
+     * @param agentManager - agent manager
+     */
+    public TaskManager(AgentManager agentManager) {
+        this.agentManager = agentManager;
+        AgentConfiguration conf = AgentConfiguration.getAgentConf();
+        // control running tasks by setting pool size.
+        this.runningPool = new ThreadPoolExecutor(
+                conf.getInt(
+                    AgentConstants.TASK_RUNNING_THREAD_CORE_SIZE, 
AgentConstants.DEFAULT_TASK_RUNNING_THREAD_CORE_SIZE),
+                conf.getInt(
+                    AgentConstants.TASK_RUNNING_THREAD_MAX_SIZE, 
AgentConstants.DEFAULT_TASK_RUNNING_THREAD_MAX_SIZE),
+                conf.getLong(AgentConstants.TASK_RUNNING_THREAD_KEEP_ALIVE,
+                        AgentConstants.DEFAULT_TASK_RUNNING_THREAD_KEEP_ALIVE),
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(
+                        conf.getInt(
+                            AgentConstants.TASK_PENDING_MAX, 
AgentConstants.DEFAULT_TASK_PENDING_MAX)),
+                new AgentThreadFactory("task"));
+        // metric for task level
+        taskMetrics = TaskMetrics.getMetrics();
+        tasks = new ConcurrentHashMap<>();
+        retryTasks = new LinkedBlockingQueue<>(
+                conf.getInt(
+                    AgentConstants.TASK_RETRY_MAX_CAPACITY, 
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY));
+        monitorInterval = conf.getInt(
+            AgentConstants.TASK_MONITOR_INTERVAL, 
AgentConstants.DEFAULT_TASK_MONITOR_INTERVAL);
+        taskRetryMaxTime = conf
+                .getInt(AgentConstants.TASK_RETRY_SUBMIT_WAIT_SECONDS,
+                    AgentConstants.DEFAULT_TASK_RETRY_SUBMIT_WAIT_SECONDS);
+        taskMaxCapacity = conf.getInt(
+            AgentConstants.TASK_RETRY_MAX_CAPACITY, 
AgentConstants.DEFAULT_TASK_RETRY_MAX_CAPACITY);
+        waitTime = conf.getLong(
+            AgentConstants.THREAD_POOL_AWAIT_TIME, 
AgentConstants.DEFAULT_THREAD_POOL_AWAIT_TIME);
+    }
+
+    /**
+     * Get task metrics
+     *
+     * @return task metrics
+     */
+    public TaskMetrics getTaskMetrics() {
+        return taskMetrics;
+    }
+
+    public TaskWrapper getTaskWrapper(String taskId) {
+        return tasks.get(taskId);
+    }
+
+    /**
+     * submit task, wait if task queue is full.
+     *
+     * @param task - task
+     */
+    public void submitTask(Task task) {
+        try {
+            TaskWrapper taskWrapper = new TaskWrapper(agentManager, task);
+            taskWrapper =
+                    tasks.putIfAbsent(task.getTaskId(), taskWrapper) == null ? 
taskWrapper : null;
+            if (taskWrapper != null) {
+                this.runningPool.submit(taskWrapper).get();
+            }
+        } catch (Exception ex) {
+            LOGGER.warn("reject task {}", task.getTaskId(), ex);
+        }
+    }
+
+    /**
+     * retry task.
+     *
+     * @param wrapper - task wrapper
+     */
+    void retryTask(TaskWrapper wrapper) {
+        LOGGER.info("retry submit task {}", wrapper.getTask().getTaskId());
+        try {
+            boolean success = retryTasks.offer(wrapper, taskRetryMaxTime, 
TimeUnit.SECONDS);
+            if (!success) {
+                LOGGER.error("cannot submit to retry queue, max {}, current 
{}", taskMaxCapacity,
+                        retryTasks.size());
+            }
+        } catch (Exception ex) {
+            LOGGER.error("error while offer task", ex);
+        }
+    }
+
+    /**
+     * Check whether task is finished
+     *
+     * @param taskId - task id
+     * @return - true if task is finished otherwise false
+     */
+    public boolean isTaskFinished(String taskId) {
+        TaskWrapper wrapper = tasks.get(taskId);
+        if (wrapper != null) {
+            return wrapper.isFinished();
+        }
+        return false;
+    }
+
+    /**
+     * Check if task is success
+     *
+     * @param taskId task id
+     * @return true if task is success otherwise false
+     */
+    public boolean isTaskSuccess(String taskId) {
+        TaskWrapper wrapper = tasks.get(taskId);
+        if (wrapper != null) {
+            return wrapper.isSuccess();
+        }
+        return false;
+    }
+
+    /**
+     * Remove task by task id
+     *
+     * @param taskId - task id
+     */
+    public void removeTask(String taskId) {
+        tasks.remove(taskId);
+    }
+
+    /**
+     * kill task
+     *
+     * @param task task
+     * @return
+     */
+    public boolean killTask(Task task) {
+        // kill running tasks.
+        TaskWrapper taskWrapper = tasks.get(task.getTaskId());
+        if (taskWrapper != null) {
+            taskWrapper.kill();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * Thread for checking whether task should retry.
+     *
+     * @return - runnable thread
+     */
+    public Runnable createTaskMonitorThread() {
+        return () -> {
+            while (isRunnable()) {
+                try {
+                    for (String taskId : tasks.keySet()) {
+                        TaskWrapper wrapper = tasks.get(taskId);
+                        if (wrapper != null && wrapper.isFailed() && 
wrapper.shouldRetry()) {
+                            retryTask(wrapper);
+                        }
+                    }
+                    TimeUnit.SECONDS.sleep(monitorInterval);
+                } catch (Exception ex) {
+                    LOGGER.error("Exception caught", ex);
+                }
+            }
+        };
+    }
+
+    /**
+     * start service.
+     */
+    @Override
+    public void start() {
+        submitWorker(createTaskMonitorThread());
+    }
+
+    /**
+     * stop service.
+     */
+    @Override
+    public void stop() throws Exception {
+        waitForTerminate();
+        this.runningPool.shutdown();
+    }
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskMetrics.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskMetrics.java
new file mode 100644
index 0000000..d3b5334
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskMetrics.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+import org.apache.tubemq.agent.metrics.Metric;
+import org.apache.tubemq.agent.metrics.Metrics;
+import org.apache.tubemq.agent.metrics.MetricsRegister;
+import org.apache.tubemq.agent.metrics.counter.CounterLong;
+import org.apache.tubemq.agent.metrics.gauge.GaugeInt;
+
+/**
+ * Metric collector for task level.
+ */
+@Metrics
+public class TaskMetrics {
+
+    private static final TaskMetrics METRICS = new TaskMetrics();
+
+    @Metric
+    GaugeInt runningTasks;
+
+    @Metric
+    CounterLong fatalTasks;
+
+
+    private TaskMetrics() {
+        // every metric should register, otherwise not working.
+        MetricsRegister.register(this);
+    }
+
+    public static TaskMetrics getMetrics() {
+        return METRICS;
+    }
+}
diff --git 
a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskWrapper.java
 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskWrapper.java
new file mode 100644
index 0000000..7b026d0
--- /dev/null
+++ 
b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/task/TaskWrapper.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.core.task;
+
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tubemq.agent.common.AgentThreadFactory;
+import org.apache.tubemq.agent.conf.AgentConfiguration;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.core.AgentManager;
+import org.apache.tubemq.agent.message.EndMessage;
+import org.apache.tubemq.agent.plugin.Message;
+import org.apache.tubemq.agent.state.AbstractStateWrapper;
+import org.apache.tubemq.agent.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TaskWrapper is used in taskManager, it maintains the life cycle of
+ * running task.
+ */
+public class TaskWrapper extends AbstractStateWrapper {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskWrapper.class);
+
+    private final TaskManager taskManager;
+    private final Task task;
+
+    private final AtomicInteger retryTime = new AtomicInteger(0);
+    private final int maxRetryTime;
+    private final int pushMaxWaitTime;
+    private final int pullMaxWaitTime;
+    private ExecutorService executorService;
+
+    public TaskWrapper(AgentManager manager, Task task) {
+        super();
+        this.taskManager = manager.getTaskManager();
+        this.task = task;
+        AgentConfiguration conf = AgentConfiguration.getAgentConf();
+        maxRetryTime = conf.getInt(
+            AgentConstants.TASK_MAX_RETRY_TIME, 
AgentConstants.DEFAULT_TASK_MAX_RETRY_TIME);
+        pushMaxWaitTime = conf.getInt(
+            AgentConstants.TASK_PUSH_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
+        pullMaxWaitTime = conf.getInt(
+            AgentConstants.TASK_PULL_MAX_SECOND, 
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
+        if (executorService == null) {
+            executorService = Executors.newCachedThreadPool(
+                    new AgentThreadFactory("task-reader-writer"));
+        }
+        doChangeState(State.ACCEPTED);
+    }
+
+    /**
+     * submit read thread
+     *
+     * @return CompletableFuture
+     */
+    private CompletableFuture<?> submitReadThread() {
+        return CompletableFuture.runAsync(() -> {
+            Message message = null;
+            while (!task.isReadFinished() && !isException()) {
+                if (message == null || task.getChannel()
+                        .push(message, pushMaxWaitTime, TimeUnit.SECONDS)) {
+                    message = task.getReader().read();
+                }
+            }
+            // write end message
+            task.getChannel().push(new EndMessage());
+        }, executorService);
+    }
+
+    /**
+     * submit write thread
+     *
+     * @return CompletableFuture
+     */
+    private CompletableFuture<?> submitWriteThread() {
+        return CompletableFuture.runAsync(() -> {
+            while (!isException()) {
+                Message message = task.getChannel().pull(pullMaxWaitTime, 
TimeUnit.SECONDS);
+                if (message instanceof EndMessage) {
+                    break;
+                }
+                task.getSink().write(message);
+            }
+        }, executorService);
+    }
+
+    /**
+     * submit reader/writer
+     */
+    private void submitThreadsAndWait() {
+        task.init();
+        CompletableFuture<?> reader = submitReadThread();
+        CompletableFuture<?> writer = submitWriteThread();
+        CompletableFuture.anyOf(reader, writer)
+                .exceptionally(ex -> {
+                    doChangeState(State.FAILED);
+                    LOGGER.error("exception caught", ex);
+                    return null;
+                }).join();
+    }
+
+    /**
+     * kill task
+     */
+    void kill() {
+        doChangeState(State.KILLED);
+    }
+
+    /**
+     * whether task retry times exceed max retry time.
+     *
+     * @return - whether should retry
+     */
+    boolean shouldRetry() {
+        return retryTime.get() < maxRetryTime;
+    }
+
+    Task getTask() {
+        return task;
+    }
+
+    @Override
+    public void addCallbacks() {
+        this.addCallback(State.ACCEPTED, State.RUNNING, (before, after) -> {
+            taskManager.getTaskMetrics().runningTasks.incr();
+        }).addCallback(State.RUNNING, State.FAILED, (before, after) -> {
+            retryTime.incrementAndGet();
+            if (!shouldRetry()) {
+                doChangeState(State.FATAL);
+                taskManager.getTaskMetrics().fatalTasks.incr();
+            }
+        }).addCallback(State.FAILED, State.FATAL, (before, after) -> {
+
+        }).addCallback(State.FAILED, State.ACCEPTED, (before, after) -> {
+            retryTime.incrementAndGet();
+        }).addCallback(State.RUNNING, State.SUCCEEDED, (before, after) -> {
+            taskManager.getTaskMetrics().runningTasks.decr();
+        });
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            LOGGER.info("start to run {}", task.getTaskId());
+            task.init();
+            doChangeState(State.RUNNING);
+            submitThreadsAndWait();
+            if (!isException()) {
+                doChangeState(State.SUCCEEDED);
+            }
+            task.destroy();
+        } catch (Exception ex) {
+            LOGGER.error("error while running wrapper", ex);
+            doChangeState(State.FAILED);
+        }
+    }
+
+
+}

Reply via email to