add function of streaming job and job state management

1.add streaming job function
 2.add job state management function
3.update part ut
4.fix few bugs

Author: ahutsunshine <[email protected]>

Closes #285 from ahutsunshine/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/6be53303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/6be53303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/6be53303

Branch: refs/heads/master
Commit: 6be53303523ef67513354070b15981ee645941ac
Parents: 32f080d
Author: ahutsunshine <[email protected]>
Authored: Thu May 31 15:12:40 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Thu May 31 15:12:40 2018 +0800

----------------------------------------------------------------------
 .../core/config/EclipseLinkJpaConfig.java       |    2 -
 .../griffin/core/config/PropertiesConfig.java   |   15 +-
 .../core/exception/GriffinExceptionMessage.java |   13 +-
 .../griffin/core/job/BatchJobOperatorImpl.java  |  310 +++++
 .../griffin/core/job/FileExistPredicator.java   |    8 +-
 .../apache/griffin/core/job/JobController.java  |   24 +-
 .../apache/griffin/core/job/JobInstance.java    |  130 +-
 .../apache/griffin/core/job/JobOperator.java    |   38 +
 .../org/apache/griffin/core/job/JobService.java |   18 +-
 .../apache/griffin/core/job/JobServiceImpl.java |  678 +++++-----
 .../apache/griffin/core/job/SparkSubmitJob.java |   81 +-
 .../core/job/StreamingJobOperatorImpl.java      |  265 ++++
 .../griffin/core/job/entity/AbstractJob.java    |   52 +-
 .../griffin/core/job/entity/BatchJob.java       |   43 +
 .../griffin/core/job/entity/GriffinJob.java     |   86 --
 .../griffin/core/job/entity/JobDataBean.java    |   24 +-
 .../core/job/entity/JobInstanceBean.java        |   66 +-
 .../griffin/core/job/entity/JobSchedule.java    |   29 +-
 .../griffin/core/job/entity/JobState.java       |   65 +
 .../core/job/entity/LivySessionStates.java      |  117 +-
 .../core/job/entity/SegmentPredicate.java       |   11 +-
 .../griffin/core/job/entity/StreamingJob.java   |   36 +
 .../griffin/core/job/entity/VirtualJob.java     |    2 +
 .../griffin/core/job/repo/BatchJobRepo.java     |   25 +
 .../griffin/core/job/repo/GriffinJobRepo.java   |   25 -
 .../griffin/core/job/repo/JobInstanceRepo.java  |   10 +-
 .../griffin/core/job/repo/StreamingJobRepo.java |    6 +
 .../measure/ExternalMeasureOperationImpl.java   |   80 --
 .../measure/ExternalMeasureOperatorImpl.java    |   81 ++
 .../measure/GriffinMeasureOperationImpl.java    |   66 -
 .../measure/GriffinMeasureOperatorImpl.java     |   68 +
 .../griffin/core/measure/MeasureController.java |   14 +-
 .../griffin/core/measure/MeasureOperation.java  |   33 -
 .../griffin/core/measure/MeasureOperator.java   |   34 +
 .../griffin/core/measure/MeasureService.java    |    7 +-
 .../core/measure/MeasureServiceImpl.java        |   34 +-
 .../core/measure/entity/DataConnector.java      |   51 +-
 .../griffin/core/measure/entity/DataSource.java |   54 +-
 .../griffin/core/measure/entity/DqType.java     |   32 +
 .../core/measure/entity/EvaluateRule.java       |    2 +-
 .../core/measure/entity/ExternalMeasure.java    |    3 +
 .../core/measure/entity/GriffinMeasure.java     |   19 +-
 .../griffin/core/measure/entity/Measure.java    |   15 +-
 .../griffin/core/measure/entity/Rule.java       |   17 +-
 .../measure/entity/StreamingPreProcess.java     |  111 ++
 .../core/metastore/hive/HiveMetaStoreProxy.java |    2 -
 .../hive/HiveMetaStoreServiceImpl.java          |    1 -
 .../griffin/core/metric/model/Metric.java       |   11 +-
 .../org/apache/griffin/core/util/FileUtil.java  |   59 +
 .../org/apache/griffin/core/util/JsonUtil.java  |   18 +
 .../org/apache/griffin/core/util/TimeUtil.java  |    2 +-
 .../apache/griffin/core/util/YarnNetUtil.java   |   70 +
 .../src/main/resources/application.properties   |    3 +
 service/src/main/resources/env/env_batch.json   |   59 +
 .../src/main/resources/env/env_streaming.json   |   60 +
 service/src/main/resources/sparkJob.properties  |    2 +-
 .../griffin/core/job/JobControllerTest.java     |    9 +-
 .../core/job/JobInstanceBeanRepoTest.java       |   41 +-
 .../griffin/core/job/JobInstanceTest.java       |   56 +-
 .../griffin/core/job/JobServiceImplTest.java    | 1205 +++++++++---------
 .../griffin/core/job/SparkSubmitJobTest.java    |  101 +-
 .../core/job/repo/JobInstanceRepoTest.java      |   50 +-
 .../griffin/core/job/repo/JobRepoTest.java      |    6 +-
 .../ExternalMeasureOperationImplTest.java       |  102 --
 .../ExternalMeasureOperatorImplTest.java        |  102 ++
 .../GriffinMeasureOperationImplTest.java        |  109 --
 .../measure/GriffinMeasureOperatorImplTest.java |  109 ++
 .../core/measure/MeasureControllerTest.java     |   10 +-
 .../core/measure/MeasureOrgServiceImplTest.java |    2 +-
 .../core/measure/MeasureServiceImplTest.java    |   33 +-
 .../hive/HiveMetaStoreServiceImplTest.java      |    2 +-
 .../core/metric/MetricControllerTest.java       |    3 +-
 .../apache/griffin/core/util/EntityHelper.java  |   16 +-
 .../apache/griffin/core/util/JsonUtilTest.java  |    5 +-
 74 files changed, 3275 insertions(+), 1873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
 
b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
index 29ff5cc..85a5b25 100644
--- 
a/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
+++ 
b/service/src/main/java/org/apache/griffin/core/config/EclipseLinkJpaConfig.java
@@ -52,8 +52,6 @@ public class EclipseLinkJpaConfig extends 
JpaBaseConfiguration {
         Map<String, Object> map = new HashMap<>();
         map.put(PersistenceUnitProperties.WEAVING, "false");
         map.put(PersistenceUnitProperties.DDL_GENERATION, 
"create-or-extend-tables");
-//        map.put("eclipselink.logging.level", "FINEST");
-//        map.put("eclipselink.logging.parameters", "true");
         return map;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java 
b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
index 0f27513..8106ae2 100644
--- a/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
+++ b/service/src/main/java/org/apache/griffin/core/config/PropertiesConfig.java
@@ -19,6 +19,7 @@ under the License.
 
 package org.apache.griffin.core.config;
 
+import org.apache.griffin.core.util.FileUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -26,7 +27,9 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.io.ClassPathResource;
 
+import javax.annotation.PostConstruct;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.Properties;
 
 import static org.apache.griffin.core.util.PropertiesUtil.getConf;
@@ -40,10 +43,20 @@ public class PropertiesConfig {
     private String location;
 
     public PropertiesConfig(@Value("${external.config.location}") String 
location) {
-        LOGGER.info("external.config.location : {}", location);
+        LOGGER.info("external.config.location : {}", location != null ? 
location : "null");
         this.location = location;
     }
 
+//    @PostConstruct
+//    public void init() throws IOException {
+//        String batchName = "env_batch.json";
+//        String batchPath = "env/" + batchName;
+//        String streamingName = "env_streaming.json";
+//        String streamingPath = "env/" + streamingName;
+//        FileUtil.readBatchEnv(batchPath, batchName);
+//        FileUtil.readStreamingEnv(streamingPath, streamingName);
+//    }
+
 
     @Bean(name = "appConf")
     public Properties appConf() {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
 
b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index 7b9c06c..14d987d 100644
--- 
a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ 
b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -32,13 +32,22 @@ public enum GriffinExceptionMessage {
     INVALID_METRIC_VALUE_FORMAT(40008, "Metric value format is invalid"),
     INVALID_MEASURE_ID(40009, "Property 'measure.id' is invalid"),
     INVALID_CRON_EXPRESSION(40010, "Property 'cron.expression' is invalid"),
+    MEASURE_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such measure 
type."),
+    JOB_TYPE_DOES_NOT_SUPPORT(40011, "We don't support such job type."),
+    STREAMING_JOB_IS_RUNNING(40012, "There is no need to start again as job is 
RUNNING."),
+    STREAMING_JOB_IS_STOPPED(40012, "There is no need to stop again as job is 
STOPPED."),
+    JOB_IS_NOT_SCHEDULED(40013, "The job isn't scheduled."),
+    JOB_IS_NOT_IN_PAUSED_STATUS(40014, "The job isn't in paused status."),
+    JOB_IS_IN_PAUSED_STATUS(40015, "The job is already in paused status."),
 
     //404, "Not Found"
     MEASURE_ID_DOES_NOT_EXIST(40401, "Measure id does not exist"),
     JOB_ID_DOES_NOT_EXIST(40402, "Job id does not exist"),
     JOB_NAME_DOES_NOT_EXIST(40403, "Job name does not exist"),
-    ORGANIZATION_NAME_DOES_NOT_EXIST(40404, "Organization name does not 
exist"),
-    HDFS_FILE_NOT_EXIST(40405, "Hadoop data file not exist"),
+    NO_SUCH_JOB_ACTION(40404, "No such job action"),
+    JOB_KEY_DOES_NOT_EXIST(40405, "Job key which consists of group and name 
does not exist."),
+    ORGANIZATION_NAME_DOES_NOT_EXIST(40406, "Organization name does not 
exist"),
+    HDFS_FILE_NOT_EXIST(40407, "Hadoop data file not exist"),
 
     //409, "Conflict"
     MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"),

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java 
b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
new file mode 100644
index 0000000..e8d0cd9
--- /dev/null
+++ 
b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -0,0 +1,310 @@
+package org.apache.griffin.core.job;
+
+import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.BatchJobRepo;
+import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.measure.entity.DataSource;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.quartz.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
+
+import java.util.*;
+
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.*;
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static org.quartz.CronExpression.isValidExpression;
+import static org.quartz.JobKey.jobKey;
+import static org.quartz.Trigger.TriggerState;
+import static org.quartz.Trigger.TriggerState.*;
+import static org.quartz.TriggerKey.triggerKey;
+
+@Service
+public class BatchJobOperatorImpl implements JobOperator {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BatchJobOperatorImpl.class);
+
+    @Autowired
+    private SchedulerFactoryBean factory;
+    @Autowired
+    private JobInstanceRepo instanceRepo;
+    @Autowired
+    private BatchJobRepo batchJobRepo;
+    @Autowired
+    private JobServiceImpl jobService;
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public JobSchedule add(JobSchedule js, GriffinMeasure measure) throws 
Exception {
+        validateParams(js, measure);
+        String qName = jobService.getQuartzName(js);
+        String qGroup = jobService.getQuartzGroup();
+        TriggerKey triggerKey = jobService.getTriggerKeyIfValid(qName, qGroup);
+        BatchJob batchJob = new BatchJob(js.getMeasureId(), js.getJobName(), 
qName, qGroup, false);
+        batchJob.setJobSchedule(js);
+        batchJob = batchJobRepo.save(batchJob);
+        jobService.addJob(triggerKey, js, batchJob, BATCH);
+        JobSchedule jobSchedule = batchJob.getJobSchedule();
+        jobSchedule.setId(batchJob.getId());
+        return jobSchedule;
+    }
+
+    /**
+     * all states: BLOCKED  COMPLETE ERROR NONE  NORMAL   PAUSED
+     * to start states: PAUSED
+     * to stop states: BLOCKED   NORMAL
+     *
+     * @param job streaming job
+     */
+    @Override
+    public void start(AbstractJob job) {
+        String name = job.getName();
+        String group = job.getGroup();
+        TriggerState state = getTriggerState(name, group);
+        if (state == null) {
+            throw new 
GriffinException.BadRequestException(JOB_IS_NOT_SCHEDULED);
+        }
+        /* If job is not in paused state,we can't start it as it may be 
RUNNING. */
+        if (state != PAUSED) {
+            throw new 
GriffinException.BadRequestException(JOB_IS_NOT_IN_PAUSED_STATUS);
+        }
+        JobKey jobKey = jobKey(name, group);
+        try {
+            factory.getScheduler().resumeJob(jobKey);
+        } catch (SchedulerException e) {
+            throw new GriffinException.ServiceException("Failed to start 
job.", e);
+        }
+    }
+
+    @Override
+    public void stop(AbstractJob job) {
+        pauseJob((BatchJob) job, false);
+    }
+
+    @Override
+    @Transactional
+    public void delete(AbstractJob job) {
+        pauseJob((BatchJob) job, true);
+    }
+
+
+    @Override
+    public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws 
SchedulerException {
+        List<? extends Trigger> triggers = 
jobService.getTriggers(job.getName(), job.getGroup());
+        if (!CollectionUtils.isEmpty(triggers)) {
+            jobHealth.setJobCount(jobHealth.getJobCount() + 1);
+            if (jobService.isJobHealthy(job.getId())) {
+                jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 
1);
+            }
+        }
+        return jobHealth;
+    }
+
+    @Override
+    public JobState getState(AbstractJob job, JobDataBean bean, String action) 
throws SchedulerException {
+        JobState jobState = new JobState();
+        Scheduler scheduler = factory.getScheduler();
+        TriggerKey triggerKey = triggerKey(job.getName(), job.getGroup());
+        TriggerState triggerState = scheduler.getTriggerState(triggerKey);
+        jobState.setState(triggerState.toString());
+        jobState.setToStart(getStartStatus(triggerState));
+        jobState.setToStop(getStopStatus(triggerState));
+        return jobState;
+    }
+
+    /**
+     * only PAUSED state of job can be started
+     * @param state job state
+     * @return true: job can be started, false: job is running which cannot be 
started
+     */
+    private boolean getStartStatus(TriggerState state) {
+        return state == PAUSED;
+    }
+
+    /**
+     * only NORMAL or  BLOCKED state of job can be started
+     * @param state job state
+     * @return true: job can be stopped, false: job is running which cannot be 
stopped
+     */
+    private boolean getStopStatus(TriggerState state) {
+        return state == NORMAL || state == BLOCKED;
+    }
+
+
+    private TriggerState getTriggerState(String name, String group) {
+        try {
+            List<? extends Trigger> triggers = jobService.getTriggers(name, 
group);
+            if (CollectionUtils.isEmpty(triggers)) {
+                return null;
+            }
+            TriggerKey key = triggers.get(0).getKey();
+            return factory.getScheduler().getTriggerState(key);
+        } catch (SchedulerException e) {
+            LOGGER.error("Failed to delete job", e);
+            throw new GriffinException.ServiceException("Failed to delete 
job", e);
+        }
+
+    }
+
+
+    /**
+     * @param job    griffin job
+     * @param delete if job needs to be deleted,set isNeedDelete 
true,otherwise it just will be paused.
+     */
+    private void pauseJob(BatchJob job, boolean delete) {
+        try {
+            pauseJob(job.getGroup(), job.getName());
+            pausePredicateJob(job);
+            job.setDeleted(delete);
+            batchJobRepo.save(job);
+        } catch (Exception e) {
+            LOGGER.error("Job schedule happens exception.", e);
+            throw new GriffinException.ServiceException("Job schedule happens 
exception.", e);
+        }
+    }
+
+    private void pausePredicateJob(BatchJob job) throws SchedulerException {
+        List<JobInstanceBean> instances = 
instanceRepo.findByJobId(job.getId());
+        for (JobInstanceBean instance : instances) {
+            if (!instance.isPredicateDeleted()) {
+                deleteJob(instance.getPredicateGroup(), 
instance.getPredicateName());
+                instance.setPredicateDeleted(true);
+                if 
(instance.getState().equals(LivySessionStates.State.FINDING)) {
+                    instance.setState(LivySessionStates.State.NOT_FOUND);
+                }
+            }
+        }
+    }
+
+    public void deleteJob(String group, String name) throws SchedulerException 
{
+        Scheduler scheduler = factory.getScheduler();
+        JobKey jobKey = new JobKey(name, group);
+        if (!scheduler.checkExists(jobKey)) {
+            LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), 
jobKey.getName());
+            throw new 
GriffinException.NotFoundException(JOB_KEY_DOES_NOT_EXIST);
+        }
+        scheduler.deleteJob(jobKey);
+
+    }
+
+    private void pauseJob(String group, String name) throws SchedulerException 
{
+        if (StringUtils.isEmpty(group) || StringUtils.isEmpty(name)) {
+            return;
+        }
+        Scheduler scheduler = factory.getScheduler();
+        JobKey jobKey = new JobKey(name, group);
+        if (!scheduler.checkExists(jobKey)) {
+            LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), 
jobKey.getName());
+            throw new 
GriffinException.NotFoundException(JOB_KEY_DOES_NOT_EXIST);
+        }
+        scheduler.pauseJob(jobKey);
+    }
+
+    public boolean pauseJobInstances(List<JobInstanceBean> instances) {
+        if (CollectionUtils.isEmpty(instances)) {
+            return true;
+        }
+        List<JobInstanceBean> deletedInstances = new ArrayList<>();
+        boolean pauseStatus = true;
+        for (JobInstanceBean instance : instances) {
+            boolean status = pauseJobInstance(instance, deletedInstances);
+            pauseStatus = pauseStatus && status;
+        }
+        instanceRepo.save(deletedInstances);
+        return pauseStatus;
+    }
+
+    private boolean pauseJobInstance(JobInstanceBean instance, 
List<JobInstanceBean> deletedInstances) {
+        boolean status = true;
+        String pGroup = instance.getPredicateGroup();
+        String pName = instance.getPredicateName();
+        try {
+            if (!instance.isPredicateDeleted()) {
+                deleteJob(pGroup, pName);
+                instance.setPredicateDeleted(true);
+                deletedInstances.add(instance);
+            }
+        } catch (SchedulerException e) {
+            LOGGER.error("Failed to pause predicate job({},{}).", pGroup, 
pName);
+            status = false;
+        }
+        return status;
+    }
+
+    private void validateParams(JobSchedule js, GriffinMeasure measure) {
+        if (!jobService.isValidJobName(js.getJobName())) {
+            throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
+        }
+        if (!isValidCronExpression(js.getCronExpression())) {
+            throw new 
GriffinException.BadRequestException(INVALID_CRON_EXPRESSION);
+        }
+        if (!isValidBaseLine(js.getSegments())) {
+            throw new 
GriffinException.BadRequestException(MISSING_BASELINE_CONFIG);
+        }
+        List<String> names = getConnectorNames(measure);
+        if (!isValidConnectorNames(js.getSegments(), names)) {
+            throw new 
GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
+        }
+    }
+
+    private boolean isValidCronExpression(String cronExpression) {
+        if (StringUtils.isEmpty(cronExpression)) {
+            LOGGER.warn("Cron Expression is empty.");
+            return false;
+        }
+        if (!isValidExpression(cronExpression)) {
+            LOGGER.warn("Cron Expression is invalid.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean isValidBaseLine(List<JobDataSegment> segments) {
+        assert segments != null;
+        for (JobDataSegment jds : segments) {
+            if (jds.isBaseline()) {
+                return true;
+            }
+        }
+        LOGGER.warn("Please set segment timestamp baseline in as.baseline 
field.");
+        return false;
+    }
+
+    private boolean isValidConnectorNames(List<JobDataSegment> segments, 
List<String> names) {
+        assert segments != null;
+        Set<String> sets = new HashSet<>();
+        for (JobDataSegment segment : segments) {
+            String dcName = segment.getDataConnectorName();
+            sets.add(dcName);
+            boolean exist = names.stream().anyMatch(name -> 
name.equals(dcName));
+            if (!exist) {
+                LOGGER.warn("Param {} is a illegal string. Please input one of 
strings in {}.", dcName, names);
+                return false;
+            }
+        }
+        if (sets.size() < segments.size()) {
+            LOGGER.warn("Connector names in job data segment cannot 
duplicate.");
+            return false;
+        }
+        return true;
+    }
+
+    private List<String> getConnectorNames(GriffinMeasure measure) {
+        Set<String> sets = new HashSet<>();
+        List<DataSource> sources = measure.getDataSources();
+        for (DataSource source : sources) {
+            source.getConnectors().forEach(dc -> sets.add(dc.getName()));
+        }
+        if (sets.size() < sources.size()) {
+            LOGGER.warn("Connector names cannot be repeated.");
+            return Collections.emptyList();
+        }
+        return new ArrayList<>(sets);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java 
b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
index 7ee8642..19733be 100644
--- a/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/FileExistPredicator.java
@@ -46,12 +46,12 @@ public class FileExistPredicator implements Predicator {
 
     @Override
     public boolean predicate() throws IOException {
-        Map<String, String> config = predicate.getConfigMap();
+        Map<String, Object> config = predicate.getConfigMap();
         String[] paths = null;
         String rootPath = null;
-        if (config != null && !StringUtils.isEmpty(config.get(PREDICT_PATH))) {
-            paths = config.get(PREDICT_PATH).split(PATH_CONNECTOR_CHARACTER);
-            rootPath = config.get(PREDICT_ROOT_PATH);
+        if (config != null && !StringUtils.isEmpty((String) 
config.get(PREDICT_PATH))) {
+            paths = 
((String)config.get(PREDICT_PATH)).split(PATH_CONNECTOR_CHARACTER);
+            rootPath = (String) config.get(PREDICT_ROOT_PATH);
         }
         if (ArrayUtils.isEmpty(paths) || StringUtils.isEmpty(rootPath)) {
             LOGGER.error("Predicate path is null.Please check predicates 
config root.path and path.");

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/JobController.java 
b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 81b7fb7..5f9b319 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -24,6 +24,7 @@ import org.apache.griffin.core.job.entity.JobHealth;
 import org.apache.griffin.core.job.entity.JobInstanceBean;
 import org.apache.griffin.core.job.entity.JobSchedule;
 import org.apache.griffin.core.util.FSUtil;
+import org.quartz.SchedulerException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.io.InputStreamResource;
 import org.springframework.core.io.Resource;
@@ -43,30 +44,41 @@ public class JobController {
     private JobService jobService;
 
     @RequestMapping(value = "/jobs", method = RequestMethod.GET)
-    public List<JobDataBean> getJobs() {
-        return jobService.getAliveJobs();
+    public List<JobDataBean> getJobs(@RequestParam(value = "type", 
defaultValue = "") String type) {
+        return jobService.getAliveJobs(type);
     }
 
-    @RequestMapping(value = "/jobs/config/{jobName}")
-    public JobSchedule getJobSchedule(@PathVariable("jobName") String jobName) 
{
+    @RequestMapping(value = "/jobs/config", method = RequestMethod.GET)
+    public JobSchedule getJobSchedule(@RequestParam("jobName") String jobName) 
{
         return jobService.getJobSchedule(jobName);
     }
 
+    @RequestMapping(value = "/jobs/config/{jobId}")
+    public JobSchedule getJobSchedule(@PathVariable("jobId") Long jobId) {
+        return jobService.getJobSchedule(jobId);
+    }
+
     @RequestMapping(value = "/jobs", method = RequestMethod.POST)
     @ResponseStatus(HttpStatus.CREATED)
     public JobSchedule addJob(@RequestBody JobSchedule jobSchedule) throws 
Exception {
         return jobService.addJob(jobSchedule);
     }
 
+    @RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT)
+    @ResponseStatus(HttpStatus.OK)
+    public JobDataBean onActions(@PathVariable("id") Long jobId, @RequestParam 
String action) throws Exception {
+        return jobService.onAction(jobId,action);
+    }
+
     @RequestMapping(value = "/jobs", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
-    public void deleteJob(@RequestParam("jobName") String jobName) {
+    public void deleteJob(@RequestParam("jobName") String jobName) throws 
SchedulerException {
         jobService.deleteJob(jobName);
     }
 
     @RequestMapping(value = "/jobs/{id}", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
-    public void deleteJob(@PathVariable("id") Long id) {
+    public void deleteJob(@PathVariable("id") Long id) throws 
SchedulerException {
         jobService.deleteJob(id);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java 
b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index 00becda..047e581 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -19,17 +19,16 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.exception.GriffinException;
 import org.apache.griffin.core.job.entity.*;
-import org.apache.griffin.core.job.repo.GriffinJobRepo;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
-import org.apache.griffin.core.job.repo.JobScheduleRepo;
+import org.apache.griffin.core.job.repo.JobRepo;
 import org.apache.griffin.core.measure.entity.DataConnector;
 import org.apache.griffin.core.measure.entity.DataSource;
 import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
 import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
-import org.apache.griffin.core.util.JsonUtil;
 import org.apache.griffin.core.util.TimeUtil;
 import org.quartz.*;
 import org.slf4j.Logger;
@@ -37,12 +36,18 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.io.IOException;
 import java.util.*;
 
+import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST;
 import static org.apache.griffin.core.job.JobServiceImpl.GRIFFIN_JOB_ID;
-import static org.apache.griffin.core.job.JobServiceImpl.JOB_SCHEDULE_ID;
+import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.FINDING;
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
+import static org.apache.griffin.core.util.JsonUtil.toEntity;
+import static org.apache.griffin.core.util.JsonUtil.toJson;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.JobKey.jobKey;
 import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
@@ -64,9 +69,7 @@ public class JobInstance implements Job {
     @Autowired
     private GriffinMeasureRepo measureRepo;
     @Autowired
-    private GriffinJobRepo jobRepo;
-    @Autowired
-    private JobScheduleRepo jobScheduleRepo;
+    private JobRepo<AbstractJob> jobRepo;
     @Autowired
     private JobInstanceRepo instanceRepo;
     @Autowired
@@ -75,12 +78,13 @@ public class JobInstance implements Job {
 
     private JobSchedule jobSchedule;
     private GriffinMeasure measure;
-    private GriffinJob griffinJob;
+    private AbstractJob job;
     private List<SegmentPredicate> mPredicates;
     private Long jobStartTime;
 
 
     @Override
+    @Transactional
     public void execute(JobExecutionContext context) {
         try {
             initParam(context);
@@ -94,14 +98,12 @@ public class JobInstance implements Job {
     private void initParam(JobExecutionContext context) throws 
SchedulerException {
         mPredicates = new ArrayList<>();
         JobDetail jobDetail = context.getJobDetail();
-        Long jobScheduleId = 
jobDetail.getJobDataMap().getLong(JOB_SCHEDULE_ID);
-        Long griffinJobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
-        jobSchedule = jobScheduleRepo.findOne(jobScheduleId);
+        Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
+        job = jobRepo.findOne(jobId);
+        jobSchedule = job.getJobSchedule();
         Long measureId = jobSchedule.getMeasureId();
-        griffinJob = jobRepo.findOne(griffinJobId);
         measure = measureRepo.findOne(measureId);
         setJobStartTime(jobDetail);
-
     }
 
     @SuppressWarnings("unchecked")
@@ -180,7 +182,7 @@ public class JobInstance implements Job {
      * @param dc       data connector
      * @param sampleTs collection of data split start timestamp
      */
-    private void setConnectorPredicates(DataConnector dc, Long[] sampleTs){
+    private void setConnectorPredicates(DataConnector dc, Long[] sampleTs) {
         List<SegmentPredicate> predicates = dc.getPredicates();
         for (SegmentPredicate predicate : predicates) {
             genConfMap(predicate.getConfigMap(), sampleTs, 
dc.getDataTimeZone());
@@ -202,21 +204,25 @@ public class JobInstance implements Job {
      * @return all config data combine,like {"where": "year=2017 AND month=11 
AND dt=15 AND hour=09,year=2017 AND month=11 AND dt=15 AND hour=10"}
      * or like {"path": 
"/year=2017/month=11/dt=15/hour=09/_DONE,/year=2017/month=11/dt=15/hour=10/_DONE"}
      */
-    private void genConfMap(Map<String, String> conf, Long[] sampleTs, String 
timezone) {
+
+    private void genConfMap(Map<String, Object> conf, Long[] sampleTs, String 
timezone) {
         if (conf == null) {
             LOGGER.warn("Predicate config is null.");
             return;
         }
-        for (Map.Entry<String, String> entry : conf.entrySet()) {
-            String value = entry.getValue();
-            Set<String> set = new HashSet<>();
-            if (StringUtils.isEmpty(value)) {
-                continue;
+        for (Map.Entry<String, Object> entry : conf.entrySet()) {
+            // in case entry value is a json object instead of a string
+            if (entry.getValue() instanceof String) {
+                String value = (String) entry.getValue();
+                Set<String> set = new HashSet<>();
+                if (StringUtils.isEmpty(value)) {
+                    continue;
+                }
+                for (Long timestamp : sampleTs) {
+                    set.add(TimeUtil.format(value, timestamp, 
getTimeZone(timezone)));
+                }
+                conf.put(entry.getKey(), StringUtils.join(set, 
PATH_CONNECTOR_CHARACTER));
             }
-            for (Long timestamp : sampleTs) {
-                set.add(TimeUtil.format(value, timestamp, 
getTimeZone(timezone)));
-            }
-            conf.put(entry.getKey(), StringUtils.join(set, 
PATH_CONNECTOR_CHARACTER));
         }
     }
 
@@ -228,50 +234,46 @@ public class JobInstance implements Job {
     }
 
     @SuppressWarnings("unchecked")
-    private boolean createJobInstance(Map<String, Object> confMap) throws 
Exception {
+    private void createJobInstance(Map<String, Object> confMap) throws 
Exception {
         Map<String, Object> config = (Map<String, Object>) 
confMap.get("checkdonefile.schedule");
         Long interval = TimeUtil.str2Long((String) config.get("interval"));
         Integer repeat = Integer.valueOf(config.get("repeat").toString());
         String groupName = "PG";
-        String jobName = griffinJob.getJobName() + "_predicate_" + 
System.currentTimeMillis();
-        Scheduler scheduler = factory.getScheduler();
-        TriggerKey triggerKey = triggerKey(jobName, groupName);
-        return !(scheduler.checkExists(triggerKey)
-                || !saveJobInstance(jobName, groupName)
-                || !createJobInstance(triggerKey, interval, repeat, jobName));
+        String jobName = job.getJobName() + "_predicate_" + 
System.currentTimeMillis();
+        TriggerKey tk = triggerKey(jobName, groupName);
+        if (factory.getScheduler().checkExists(tk)) {
+            throw new 
GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
+        }
+        saveJobInstance(jobName, groupName);
+        createJobInstance(tk, interval, repeat, jobName);
     }
 
-    private boolean saveJobInstance(String pName, String pGroup) {
+    private void saveJobInstance(String pName, String pGroup) {
+        ProcessType type = measure.getProcessType() == BATCH ? BATCH : 
STREAMING;
         Long tms = System.currentTimeMillis();
         Long expireTms = 
Long.valueOf(appConfProps.getProperty("jobInstance.expired.milliseconds")) + 
tms;
-        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, pName, pGroup, tms, expireTms);
-        instance.setGriffinJob(griffinJob);
+        JobInstanceBean instance = new JobInstanceBean(FINDING, pName, pGroup, 
tms, expireTms, type);
+        instance.setJob(job);
         instanceRepo.save(instance);
-        return true;
     }
 
-    private boolean createJobInstance(TriggerKey triggerKey, Long interval, 
Integer repeatCount, String pJobName) throws Exception {
-        JobDetail jobDetail = addJobDetail(triggerKey, pJobName);
-        factory.getScheduler().scheduleJob(newTriggerInstance(triggerKey, 
jobDetail, interval, repeatCount));
-        return true;
+
+    private void createJobInstance(TriggerKey tk, Long interval, Integer 
repeatCount, String pJobName) throws Exception {
+        JobDetail jobDetail = addJobDetail(tk, pJobName);
+        Trigger trigger = genTriggerInstance(tk, jobDetail, interval, 
repeatCount);
+        factory.getScheduler().scheduleJob(trigger);
     }
 
 
-    private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jd, 
Long interval, Integer repeatCount) {
-        return newTrigger()
-                .withIdentity(triggerKey)
-                .forJob(jd)
-                .startNow()
-                .withSchedule(simpleSchedule()
-                        .withIntervalInMilliseconds(interval)
-                        .withRepeatCount(repeatCount)
-                )
+    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, Long 
interval, Integer repeatCount) {
+        return newTrigger().withIdentity(tk).forJob(jd).startNow()
+                
.withSchedule(simpleSchedule().withIntervalInMilliseconds(interval).withRepeatCount(repeatCount))
                 .build();
     }
 
-    private JobDetail addJobDetail(TriggerKey triggerKey, String pJobName) 
throws SchedulerException, JsonProcessingException {
+    private JobDetail addJobDetail(TriggerKey tk, String pJobName) throws 
SchedulerException, IOException {
         Scheduler scheduler = factory.getScheduler();
-        JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
+        JobKey jobKey = jobKey(tk.getName(), tk.getGroup());
         JobDetail jobDetail;
         Boolean isJobKeyExist = scheduler.checkExists(jobKey);
         if (isJobKeyExist) {
@@ -287,12 +289,30 @@ public class JobInstance implements Job {
         return jobDetail;
     }
 
-    private void setJobDataMap(JobDetail jobDetail, String pJobName) throws 
JsonProcessingException {
+    private void setJobDataMap(JobDetail jobDetail, String pJobName) throws 
IOException {
         JobDataMap dataMap = jobDetail.getJobDataMap();
-        dataMap.put(MEASURE_KEY, JsonUtil.toJson(measure));
-        dataMap.put(PREDICATES_KEY, JsonUtil.toJson(mPredicates));
-        dataMap.put(JOB_NAME, griffinJob.getJobName());
+        preProcessMeasure();
+        String result =toJson(measure);
+        dataMap.put(MEASURE_KEY, result);
+        dataMap.put(PREDICATES_KEY, toJson(mPredicates));
+        dataMap.put(JOB_NAME, job.getJobName());
         dataMap.put(PREDICATE_JOB_NAME, pJobName);
     }
 
+    private void preProcessMeasure() throws IOException {
+        for (DataSource source : measure.getDataSources()) {
+            Map cacheMap = source.getCacheMap();
+            //to skip batch job
+            if (cacheMap == null) {
+                return;
+            }
+            String cache = toJson(cacheMap);
+            cache = cache.replaceAll("\\$\\{JOB_NAME}", job.getJobName());
+            cache = cache.replaceAll("\\$\\{SOURCE_NAME}", source.getName());
+            cache = cache.replaceAll("\\$\\{TARGET_NAME}", source.getName());
+            cacheMap = toEntity(cache,Map.class);
+            source.setCacheMap(cacheMap);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java 
b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
new file mode 100644
index 0000000..40f1c9e
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.quartz.SchedulerException;
+
+public interface JobOperator {
+    JobSchedule add(JobSchedule js, GriffinMeasure measure) throws Exception;
+
+    void start(AbstractJob job) throws Exception;
+
+    void stop(AbstractJob job) throws SchedulerException;
+
+    void delete(AbstractJob job) throws SchedulerException;
+
+    JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws 
SchedulerException;
+
+    JobState getState(AbstractJob job, JobDataBean bean, String action) throws 
SchedulerException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java 
b/service/src/main/java/org/apache/griffin/core/job/JobService.java
index a238311..a2c4762 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobService.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java
@@ -19,27 +19,27 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import org.apache.griffin.core.job.entity.JobDataBean;
-import org.apache.griffin.core.job.entity.JobHealth;
-import org.apache.griffin.core.job.entity.JobInstanceBean;
-import org.apache.griffin.core.job.entity.JobSchedule;
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
 import org.quartz.SchedulerException;
 
 import java.util.List;
 
 public interface JobService {
 
-    List<JobDataBean> getAliveJobs();
+    List<JobDataBean> getAliveJobs(String type);
 
     JobSchedule getJobSchedule(String jobName);
 
-    JobSchedule addJob(JobSchedule jobSchedule) throws Exception;
+    JobSchedule getJobSchedule(Long jobId);
 
-    void pauseJob(String group, String name) throws SchedulerException;
+    JobSchedule addJob(JobSchedule js) throws Exception;
 
-    void deleteJob(Long jobId);
+    JobDataBean onAction(Long jobId,String action) throws Exception;
 
-    void deleteJob(String jobName);
+    void deleteJob(Long jobId) throws SchedulerException;
+
+    void deleteJob(String jobName) throws SchedulerException;
 
     List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size);
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java 
b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index f42bc5c..dce6ae4 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -23,13 +23,15 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.exception.GriffinException;
 import org.apache.griffin.core.job.entity.*;
-import org.apache.griffin.core.job.repo.GriffinJobRepo;
+import org.apache.griffin.core.job.repo.BatchJobRepo;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
-import org.apache.griffin.core.job.repo.JobScheduleRepo;
-import org.apache.griffin.core.measure.entity.DataSource;
+import org.apache.griffin.core.job.repo.JobRepo;
+import org.apache.griffin.core.job.repo.StreamingJobRepo;
 import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
 import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
 import org.apache.griffin.core.util.JsonUtil;
+import org.apache.griffin.core.util.YarnNetUtil;
 import org.quartz.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,51 +43,57 @@ import org.springframework.data.domain.Sort;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.CollectionUtils;
-import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.HttpClientErrorException;
+import org.springframework.web.client.ResourceAccessException;
 import org.springframework.web.client.RestTemplate;
 
-import java.io.IOException;
 import java.util.*;
 
 import static java.util.TimeZone.getTimeZone;
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.*;
-import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.starting;
-import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.not_started;
-import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.recovering;
-import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.running;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.idle;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.busy;
-import static org.quartz.CronExpression.isValidExpression;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.*;
+import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
 import static org.quartz.CronScheduleBuilder.cronSchedule;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.JobKey.jobKey;
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
 import static org.quartz.TriggerBuilder.newTrigger;
 import static org.quartz.TriggerKey.triggerKey;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State;
 
 @Service
 public class JobServiceImpl implements JobService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JobServiceImpl.class);
-    public static final String JOB_SCHEDULE_ID = "jobScheduleId";
     public static final String GRIFFIN_JOB_ID = "griffinJobId";
     private static final int MAX_PAGE_SIZE = 1024;
     private static final int DEFAULT_PAGE_SIZE = 10;
+    static final String START = "start";
+    static final String STOP = "stop";
+    private static final String BATCH_TYPE = "batch";
+    private static final String STREAMING_TYPE = "streaming";
 
     @Autowired
     private SchedulerFactoryBean factory;
     @Autowired
-    private JobInstanceRepo jobInstanceRepo;
+    private JobInstanceRepo instanceRepo;
     @Autowired
     @Qualifier("livyConf")
     private Properties livyConf;
     @Autowired
     private GriffinMeasureRepo measureRepo;
     @Autowired
-    private GriffinJobRepo jobRepo;
+    private BatchJobRepo batchJobRepo;
+    @Autowired
+    private StreamingJobRepo streamingJobRepo;
+    @Autowired
+    private JobRepo<AbstractJob> jobRepo;
+    @Autowired
+    private BatchJobOperatorImpl batchJobOp;
     @Autowired
-    private JobScheduleRepo jobScheduleRepo;
+    private StreamingJobOperatorImpl streamingJobOp;
 
     private RestTemplate restTemplate;
 
@@ -94,332 +102,342 @@ public class JobServiceImpl implements JobService {
     }
 
     @Override
-    public List<JobDataBean> getAliveJobs() {
+    public List<JobDataBean> getAliveJobs(String type) {
+        List<? extends AbstractJob> jobs;
+        if (BATCH_TYPE.equals(type)) {
+            jobs = batchJobRepo.findByDeleted(false);
+        } else if (STREAMING_TYPE.equals(type)) {
+            jobs = streamingJobRepo.findByDeleted(false);
+        } else {
+            jobs = jobRepo.findByDeleted(false);
+        }
+        return getJobDataBeans(jobs);
+    }
+
+    private List<JobDataBean> getJobDataBeans(List<? extends AbstractJob> 
jobs) {
         List<JobDataBean> dataList = new ArrayList<>();
         try {
-            List<GriffinJob> jobs = jobRepo.findByDeleted(false);
-            for (GriffinJob job : jobs) {
-                JobDataBean jobData = genJobData(jobKey(job.getQuartzName(), 
job.getQuartzGroup()), job);
+            for (AbstractJob job : jobs) {
+                JobDataBean jobData = genJobDataBean(job);
                 if (jobData != null) {
                     dataList.add(jobData);
                 }
             }
         } catch (SchedulerException e) {
-            LOGGER.error("Failed to get running jobs.", e);
-            throw new GriffinException.ServiceException("Failed to get running 
jobs.", e);
+            LOGGER.error("Failed to get RUNNING jobs.", e);
+            throw new GriffinException.ServiceException("Failed to get RUNNING 
jobs.", e);
         }
         return dataList;
     }
 
-    @SuppressWarnings("unchecked")
-    private JobDataBean genJobData(JobKey jobKey, GriffinJob job) throws 
SchedulerException {
-        Scheduler scheduler = factory.getScheduler();
-        List<Trigger> triggers = (List<Trigger>) 
scheduler.getTriggersOfJob(jobKey);
-        if (CollectionUtils.isEmpty(triggers)) {
-            return null;
-        }
-        JobDataBean jobData = new JobDataBean();
-        Trigger trigger = triggers.get(0);
-        setTriggerTime(trigger, jobData);
-        jobData.setJobId(job.getId());
-        jobData.setJobName(job.getJobName());
-        jobData.setMeasureId(job.getMeasureId());
-        jobData.setTriggerState(scheduler.getTriggerState(trigger.getKey()));
-        jobData.setCronExpression(getCronExpression(triggers));
-        return jobData;
-    }
-
-    private String getCronExpression(List<Trigger> triggers) {
-        for (Trigger trigger : triggers) {
-            if (trigger instanceof CronTrigger) {
-                return ((CronTrigger) trigger).getCronExpression();
-            }
-        }
-        return null;
-    }
-
-    private void setTriggerTime(Trigger trigger, JobDataBean jobBean) {
-        Date nextFireTime = trigger.getNextFireTime();
-        Date previousFireTime = trigger.getPreviousFireTime();
-        jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() 
: -1);
-        jobBean.setPreviousFireTime(previousFireTime != null ? 
previousFireTime.getTime() : -1);
-    }
-
     @Override
     public JobSchedule getJobSchedule(String jobName) {
-        List<GriffinJob> jobs = jobRepo.findByJobNameAndDeleted(jobName, 
false);
+        List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(jobName, 
false);
         if (jobs.size() == 0) {
             LOGGER.warn("Job name {} does not exist.", jobName);
             throw new 
GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST);
         }
-        return jobs.get(0).getJobSchedule();
+        AbstractJob job = jobs.get(0);
+        return getJobSchedule(job);
     }
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
-    public JobSchedule addJob(JobSchedule js) throws Exception {
-        Long measureId = js.getMeasureId();
-        GriffinMeasure measure = getMeasureIfValid(measureId);
-        validateJobScheduleParams(js, measure);
-        String qName = getQuartzName(js);
-        String qGroup = getQuartzGroup();
-        TriggerKey triggerKey = triggerKey(qName, qGroup);
-        if (factory.getScheduler().checkExists(triggerKey)) {
-            throw new 
GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
+    public JobSchedule getJobSchedule(Long jobId) {
+        AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
+        if (job == null) {
+            LOGGER.warn("Job id {} does not exist.", jobId);
+            throw new 
GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
         }
-        GriffinJob job = new GriffinJob(measure.getId(), js.getJobName(), 
qName, qGroup, js,false);
-        job = jobRepo.save(job);
-        addJob(triggerKey, js, job);
-        return job.getJobSchedule();
+        return getJobSchedule(job);
     }
 
-    private void addJob(TriggerKey triggerKey, JobSchedule js, GriffinJob job) 
throws Exception {
-        JobDetail jobDetail = addJobDetail(triggerKey, js, job);
-        factory.getScheduler().scheduleJob(genTriggerInstance(triggerKey, 
jobDetail, js));
+    private JobSchedule getJobSchedule(AbstractJob job) {
+        JobSchedule jobSchedule = job.getJobSchedule();
+        jobSchedule.setId(job.getId());
+        return jobSchedule;
     }
 
-    private String getQuartzName(JobSchedule js) {
-        return js.getJobName() + "_" + System.currentTimeMillis();
+    @Override
+    public JobSchedule addJob(JobSchedule js) throws Exception {
+        Long measureId = js.getMeasureId();
+        GriffinMeasure measure = getMeasureIfValid(measureId);
+        JobOperator op = getJobOperator(measure.getProcessType());
+        return op.add(js, measure);
     }
 
-    private String getQuartzGroup() {
-        return "BA";
+    /**
+     * @param jobId  job id
+     * @param action job operation: start job, stop job
+     */
+    @Override
+    public JobDataBean onAction(Long jobId, String action) throws Exception {
+        AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
+        validateJobExist(job);
+        JobOperator op = getJobOperator(job);
+        doAction(action, job, op);
+        return genJobDataBean(job,action);
     }
 
-    private void validateJobScheduleParams(JobSchedule js, GriffinMeasure 
measure) {
-        if (!isValidJobName(js.getJobName())) {
-            throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
-        }
-        if (!isValidCronExpression(js.getCronExpression())) {
-            throw new 
GriffinException.BadRequestException(INVALID_CRON_EXPRESSION);
-        }
-        if (!isValidBaseLine(js.getSegments())) {
-            throw new 
GriffinException.BadRequestException(MISSING_BASELINE_CONFIG);
-        }
-        List<String> names = getConnectorNames(measure);
-        if (!isValidConnectorNames(js.getSegments(), names)) {
-            throw new 
GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
+    private void doAction(String action, AbstractJob job, JobOperator op) 
throws Exception {
+        switch (action) {
+            case START:
+                op.start(job);
+                break;
+            case STOP:
+                op.stop(job);
+                break;
+            default:
+                throw new 
GriffinException.NotFoundException(NO_SUCH_JOB_ACTION);
         }
     }
 
-    private boolean isValidJobName(String jobName) {
-        if (StringUtils.isEmpty(jobName)) {
-            LOGGER.warn("Job name cannot be empty.");
-            return false;
+
+    /**
+     * logically delete
+     * 1. pause these jobs
+     * 2. set these jobs as deleted status
+     *
+     * @param jobId griffin job id
+     */
+    @Override
+    public void deleteJob(Long jobId) throws SchedulerException {
+        AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
+        validateJobExist(job);
+        JobOperator op = getJobOperator(job);
+        op.delete(job);
+    }
+
+    /**
+     * logically delete
+     *
+     * @param name griffin job name which may not be unique.
+     */
+    @Override
+    public void deleteJob(String name) throws SchedulerException {
+        List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(name, false);
+        if (CollectionUtils.isEmpty(jobs)) {
+            LOGGER.warn("There is no job with '{}' name.", name);
+            throw new 
GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST);
         }
-        int size = jobRepo.countByJobNameAndDeleted(jobName, false);
-        if (size > 0) {
-            LOGGER.warn("Job name already exits.");
-            return false;
+        for (AbstractJob job : jobs) {
+            JobOperator op = getJobOperator(job);
+            op.delete(job);
         }
-        return true;
     }
 
-    private boolean isValidCronExpression(String cronExpression) {
-        if (StringUtils.isEmpty(cronExpression)) {
-            LOGGER.warn("Cron Expression is empty.");
-            return false;
-        }
-        if (!isValidExpression(cronExpression)) {
-            LOGGER.warn("Cron Expression is invalid.");
-            return false;
+    @Override
+    public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int 
size) {
+        AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
+        if (job == null) {
+            LOGGER.warn("Job id {} does not exist.", jobId);
+            throw new 
GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
         }
-        return true;
+        size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size;
+        size = size <= 0 ? DEFAULT_PAGE_SIZE : size;
+        Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, 
"tms");
+        List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, 
pageable);
+        return updateState(instances);
     }
 
-    private boolean isValidBaseLine(List<JobDataSegment> segments) {
-        for (JobDataSegment jds : segments) {
-            if (jds.isBaseline()) {
-                return true;
+    private List<JobInstanceBean> updateState(List<JobInstanceBean> instances) 
{
+        for (JobInstanceBean instance : instances) {
+            State state = instance.getState();
+            if (state.equals(UNKNOWN) || isActive(state)) {
+                syncInstancesOfJob(instance);
             }
         }
-        LOGGER.warn("Please set segment timestamp baseline in as.baseline 
field.");
-        return false;
+        return instances;
     }
 
-    private boolean isValidConnectorNames(List<JobDataSegment> segments, 
List<String> names) {
-        Set<String> sets = new HashSet<>();
-        for (JobDataSegment segment : segments) {
-            String dcName = segment.getDataConnectorName();
-            sets.add(dcName);
-            boolean exist = names.stream().anyMatch(name -> 
name.equals(dcName));
-            if (!exist) {
-                LOGGER.warn("Param {} is a illegal string. Please input one of 
strings in {}.", dcName, names);
-                return false;
+    /**
+     * a job is regard as healthy job when its latest instance is in healthy 
state.
+     *
+     * @return job healthy statistics
+     */
+    @Override
+    public JobHealth getHealthInfo() {
+        JobHealth jobHealth = new JobHealth();
+        List<AbstractJob> jobs = jobRepo.findByDeleted(false);
+        for (AbstractJob job : jobs) {
+            JobOperator op = getJobOperator(job);
+            try {
+                jobHealth = op.getHealth(jobHealth, job);
+            } catch (SchedulerException e) {
+                LOGGER.error("Job schedule exception. {}", e.getMessage());
+                throw new GriffinException.ServiceException("Fail to Get 
HealthInfo", e);
             }
+
         }
-        if (sets.size() < segments.size()) {
-            LOGGER.warn("Connector names in job data segment cannot 
duplicate.");
-            return false;
-        }
-        return true;
+        return jobHealth;
     }
 
-    private List<String> getConnectorNames(GriffinMeasure measure) {
-        Set<String> sets = new HashSet<>();
-        List<DataSource> sources = measure.getDataSources();
-        for (DataSource source : sources) {
-            source.getConnectors().forEach(dc -> sets.add(dc.getName()));
-        }
-        if (sets.size() < sources.size()) {
-            LOGGER.warn("Connector names cannot be repeated.");
-            return Collections.emptyList();
+    @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
+    public void deleteExpiredJobInstance() {
+        Long timeMills = System.currentTimeMillis();
+        List<JobInstanceBean> instances = 
instanceRepo.findByExpireTmsLessThanEqual(timeMills);
+        if (!batchJobOp.pauseJobInstances(instances)) {
+            LOGGER.error("Pause job failure.");
+            return;
         }
-        return new ArrayList<>(sets);
+        int count = instanceRepo.deleteByExpireTimestamp(timeMills);
+        LOGGER.info("Delete {} expired job instances.", count);
     }
 
-    private GriffinMeasure getMeasureIfValid(Long measureId) {
-        GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, 
false);
-        if (measure == null) {
-            LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist 
or is external measure type.", measureId);
-            throw new GriffinException.BadRequestException(INVALID_MEASURE_ID);
+    private void validateJobExist(AbstractJob job) {
+        if (job == null) {
+            LOGGER.warn("Griffin job does not exist.");
+            throw new 
GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
         }
-        return measure;
     }
 
-    private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, 
JobSchedule js) {
-        return newTrigger()
-                .withIdentity(triggerKey)
-                .forJob(jd)
-                .withSchedule(cronSchedule(js.getCronExpression())
-                        .inTimeZone(getTimeZone(js.getTimeZone()))
-                )
-                .build();
+    private JobOperator getJobOperator(AbstractJob job) {
+        if (job instanceof BatchJob) {
+            return batchJobOp;
+        } else if (job instanceof StreamingJob) {
+            return streamingJobOp;
+        }
+        throw new 
GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT);
     }
 
-    private JobDetail addJobDetail(TriggerKey triggerKey, JobSchedule js, 
GriffinJob job) throws SchedulerException {
-        Scheduler scheduler = factory.getScheduler();
-        JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
-        JobDetail jobDetail;
-        Boolean isJobKeyExist = scheduler.checkExists(jobKey);
-        if (isJobKeyExist) {
-            jobDetail = scheduler.getJobDetail(jobKey);
-        } else {
-            jobDetail = 
newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build();
+    private JobOperator getJobOperator(ProcessType type) {
+        if (type == BATCH) {
+            return batchJobOp;
+        } else if (type == STREAMING) {
+            return streamingJobOp;
         }
-        setJobDataMap(jobDetail, js, job);
-        scheduler.addJob(jobDetail, isJobKeyExist);
-        return jobDetail;
+        throw new 
GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_SUPPORT);
     }
 
+    TriggerKey getTriggerKeyIfValid(String qName, String qGroup) throws 
SchedulerException {
+        TriggerKey triggerKey = triggerKey(qName, qGroup);
+        if (factory.getScheduler().checkExists(triggerKey)) {
+            throw new 
GriffinException.ConflictException(QUARTZ_JOB_ALREADY_EXIST);
+        }
+        return triggerKey;
+    }
 
-    private void setJobDataMap(JobDetail jd, JobSchedule js, GriffinJob job) {
-        JobDataMap jobDataMap = jd.getJobDataMap();
-        jobDataMap.put(JOB_SCHEDULE_ID, js.getId().toString());
-        jobDataMap.put(GRIFFIN_JOB_ID, job.getId().toString());
+    List<? extends Trigger> getTriggers(String name, String group) throws 
SchedulerException {
+        JobKey jobKey = new JobKey(name, group);
+        Scheduler scheduler = factory.getScheduler();
+        return scheduler.getTriggersOfJob(jobKey);
     }
 
-    private boolean pauseJob(List<JobInstanceBean> instances) {
-        if (CollectionUtils.isEmpty(instances)) {
-            return true;
+    private JobDataBean genJobDataBean(AbstractJob job, String action) throws 
SchedulerException {
+        if (job.getName() == null || job.getGroup() == null) {
+            return null;
         }
-        List<JobInstanceBean> deletedInstances = new ArrayList<>();
-        boolean pauseStatus = true;
-        for (JobInstanceBean instance : instances) {
-            boolean status = pauseJob(instance, deletedInstances);
-            pauseStatus = pauseStatus && status;
+        JobDataBean jobData = new JobDataBean();
+        List<? extends Trigger> triggers = getTriggers(job.getName(), 
job.getGroup());
+        /* If triggers are empty, in Griffin it means job is not scheduled or 
completed whose trigger state is NONE. */
+        if (CollectionUtils.isEmpty(triggers) && job instanceof BatchJob) {
+            return null;
         }
-        jobInstanceRepo.save(deletedInstances);
-        return pauseStatus;
+        setTriggerTime(triggers, jobData);
+        JobOperator op = getJobOperator(job);
+        JobState state = op.getState(job, jobData, action);
+        jobData.setJobState(state);
+        jobData.setJobId(job.getId());
+        jobData.setJobName(job.getJobName());
+        jobData.setMeasureId(job.getMeasureId());
+        jobData.setCronExpression(getCronExpression(triggers));
+        jobData.setProcessType(job instanceof BatchJob ? BATCH : STREAMING);
+        return jobData;
     }
 
-    private boolean pauseJob(JobInstanceBean instance, List<JobInstanceBean> 
deletedInstances) {
-        boolean status = true;
-        try {
-            pauseJob(instance.getPredicateGroup(), 
instance.getPredicateName());
-            instance.setDeleted(true);
-            deletedInstances.add(instance);
-        } catch (SchedulerException e) {
-            LOGGER.error("Failed to pause predicate job({},{}).", 
instance.getId(), instance.getPredicateName());
-            status = false;
-        }
-        return status;
+    private JobDataBean genJobDataBean(AbstractJob job) throws 
SchedulerException {
+        return genJobDataBean(job,null);
     }
 
-    @Override
-    public void pauseJob(String group, String name) throws SchedulerException {
-        Scheduler scheduler = factory.getScheduler();
-        JobKey jobKey = new JobKey(name, group);
-        if (!scheduler.checkExists(jobKey)) {
-            LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), 
jobKey.getName());
+    private void setTriggerTime(List<? extends Trigger> triggers, JobDataBean 
jobBean) {
+        if (CollectionUtils.isEmpty(triggers)) {
             return;
         }
-        scheduler.pauseJob(jobKey);
-    }
-
-    private void setJobDeleted(GriffinJob job) {
-        job.setDeleted(true);
-        jobRepo.save(job);
+        Trigger trigger = triggers.get(0);
+        Date nextFireTime = trigger.getNextFireTime();
+        Date previousFireTime = trigger.getPreviousFireTime();
+        jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() 
: -1);
+        jobBean.setPreviousFireTime(previousFireTime != null ? 
previousFireTime.getTime() : -1);
     }
 
-    private void deletePredicateJob(GriffinJob job) throws SchedulerException {
-        List<JobInstanceBean> instances = 
jobInstanceRepo.findByJobId(job.getId());
-        for (JobInstanceBean instance : instances) {
-            if (!instance.isDeleted()) {
-                deleteJob(instance.getPredicateGroup(), 
instance.getPredicateName());
-                instance.setDeleted(true);
-                if 
(instance.getState().equals(LivySessionStates.State.finding)) {
-                    instance.setState(LivySessionStates.State.not_found);
-                }
+    private String getCronExpression(List<? extends Trigger> triggers) {
+        for (Trigger trigger : triggers) {
+            if (trigger instanceof CronTrigger) {
+                return ((CronTrigger) trigger).getCronExpression();
             }
         }
+        return null;
     }
 
-    /**
-     * logically delete
-     * 1. pause these jobs
-     * 2. set these jobs as deleted status
-     *
-     * @param jobId griffin job id
-     */
-    @Override
-    public void deleteJob(Long jobId) {
-        GriffinJob job = jobRepo.findByIdAndDeleted(jobId, false);
-        if (job == null) {
-            LOGGER.warn("Griffin job does not exist.");
-            throw new 
GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
-        }
-        deleteJob(job);
+    void addJob(TriggerKey tk, JobSchedule js, AbstractJob job, ProcessType 
type) throws Exception {
+        JobDetail jobDetail = addJobDetail(tk, job);
+        Trigger trigger = genTriggerInstance(tk, jobDetail, js, type);
+        factory.getScheduler().scheduleJob(trigger);
     }
 
-    /**
-     * logically delete
-     *
-     * @param name griffin job name which may not be unique.
-     */
-    @Override
-    public void deleteJob(String name) {
-        List<GriffinJob> jobs = jobRepo.findByJobNameAndDeleted(name, false);
-        if (CollectionUtils.isEmpty(jobs)) {
-            LOGGER.warn("There is no job with '{}' name.", name);
-            throw new 
GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST);
+    String getQuartzName(JobSchedule js) {
+        return js.getJobName() + "_" + System.currentTimeMillis();
+    }
+
+    String getQuartzGroup() {
+        return "BA";
+    }
+
+    boolean isValidJobName(String jobName) {
+        if (StringUtils.isEmpty(jobName)) {
+            LOGGER.warn("Job name cannot be empty.");
+            return false;
         }
-        for (GriffinJob job : jobs) {
-            deleteJob(job);
+        int size = jobRepo.countByJobNameAndDeleted(jobName, false);
+        if (size > 0) {
+            LOGGER.warn("Job name already exits.");
+            return false;
         }
+        return true;
     }
 
-    private void deleteJob(GriffinJob job) {
-        try {
-            pauseJob(job.getQuartzGroup(), job.getQuartzName());
-            deletePredicateJob(job);
-            setJobDeleted(job);
-        } catch (Exception e) {
-            LOGGER.error("Failed to delete job", e);
-            throw new GriffinException.ServiceException("Failed to delete 
job", e);
+
+    private GriffinMeasure getMeasureIfValid(Long measureId) {
+        GriffinMeasure measure = measureRepo.findByIdAndDeleted(measureId, 
false);
+        if (measure == null) {
+            LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist 
or is external measure type.", measureId);
+            throw new GriffinException.BadRequestException(INVALID_MEASURE_ID);
         }
+        return measure;
+    }
+
+    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, 
JobSchedule js, ProcessType type) {
+        TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd);
+        if (type == BATCH) {
+            TimeZone timeZone = getTimeZone(js.getTimeZone());
+            return 
builder.withSchedule(cronSchedule(js.getCronExpression()).inTimeZone(timeZone)).build();
+        } else if (type == STREAMING) {
+            return 
builder.startNow().withSchedule(simpleSchedule().withRepeatCount(0)).build();
+        }
+        throw new 
GriffinException.BadRequestException(JOB_TYPE_DOES_NOT_SUPPORT);
+
     }
 
-    private void deleteJob(String group, String name) throws 
SchedulerException {
+    private JobDetail addJobDetail(TriggerKey triggerKey, AbstractJob job) 
throws SchedulerException {
         Scheduler scheduler = factory.getScheduler();
-        JobKey jobKey = new JobKey(name, group);
-        if (!scheduler.checkExists(jobKey)) {
-            LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), 
jobKey.getName());
-            return;
+        JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
+        JobDetail jobDetail;
+        Boolean isJobKeyExist = scheduler.checkExists(jobKey);
+        if (isJobKeyExist) {
+            jobDetail = scheduler.getJobDetail(jobKey);
+        } else {
+            jobDetail = 
newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build();
         }
-        scheduler.deleteJob(jobKey);
+        setJobDataMap(jobDetail, job);
+        scheduler.addJob(jobDetail, isJobKeyExist);
+        return jobDetail;
     }
 
+    private void setJobDataMap(JobDetail jd, AbstractJob job) {
+        JobDataMap jobDataMap = jd.getJobDataMap();
+        jobDataMap.put(GRIFFIN_JOB_ID, job.getId().toString());
+    }
+
+
     /**
      * deleteJobsRelateToMeasure
      * 1. search jobs related to measure
@@ -427,143 +445,91 @@ public class JobServiceImpl implements JobService {
      *
      * @param measureId measure id
      */
-    public void deleteJobsRelateToMeasure(Long measureId) {
-        List<GriffinJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, 
false);
+    public void deleteJobsRelateToMeasure(Long measureId) throws 
SchedulerException {
+        List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, 
false);
         if (CollectionUtils.isEmpty(jobs)) {
             LOGGER.info("Measure id {} has no related jobs.", measureId);
             return;
         }
-        for (GriffinJob job : jobs) {
-            deleteJob(job);
-        }
-    }
-
-    @Override
-    public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int 
size) {
-        AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
-        if (job == null) {
-            LOGGER.warn("Job id {} does not exist.", jobId);
-            throw new 
GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
-        }
-        size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size;
-        size = size <= 0 ? DEFAULT_PAGE_SIZE : size;
-        Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, 
"tms");
-        return jobInstanceRepo.findByJobId(jobId, pageable);
-    }
-
-    @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
-    public void deleteExpiredJobInstance() {
-        Long timeMills = System.currentTimeMillis();
-        List<JobInstanceBean> instances = 
jobInstanceRepo.findByExpireTmsLessThanEqual(timeMills);
-        if (!pauseJob(instances)) {
-            LOGGER.error("Pause job failure.");
-            return;
+        for (AbstractJob job : jobs) {
+            JobOperator op = getJobOperator(job);
+            op.delete(job);
         }
-        jobInstanceRepo.deleteByExpireTimestamp(timeMills);
-        LOGGER.info("Delete expired job instances success.");
     }
 
     @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}")
     public void syncInstancesOfAllJobs() {
-        State[] states = {starting, not_started, recovering, idle, running, 
busy};
-        List<JobInstanceBean> beans = 
jobInstanceRepo.findByActiveState(states);
-        if (!CollectionUtils.isEmpty(beans)) {
-            for (JobInstanceBean jobInstance : beans) {
-                syncInstancesOfJob(jobInstance);
-            }
+        LivySessionStates.State[] states = {STARTING, NOT_STARTED, RECOVERING, 
IDLE, RUNNING, BUSY};
+        List<JobInstanceBean> beans = instanceRepo.findByActiveState(states);
+        for (JobInstanceBean jobInstance : beans) {
+            syncInstancesOfJob(jobInstance);
         }
     }
 
     /**
      * call livy to update part of job instance table data associated with 
group and jobName in mysql.
      *
-     * @param jobInstance job instance livy info
+     * @param instance job instance livy info
      */
-    private void syncInstancesOfJob(JobInstanceBean jobInstance) {
-        String uri = livyConf.getProperty("livy.uri") + "/" + 
jobInstance.getSessionId();
+    private void syncInstancesOfJob(JobInstanceBean instance) {
+        if (instance.getSessionId() == null) {
+            return;
+        }
+        String uri = livyConf.getProperty("livy.uri") + "/" + 
instance.getSessionId();
         TypeReference<HashMap<String, Object>> type = new 
TypeReference<HashMap<String, Object>>() {
         };
         try {
             String resultStr = restTemplate.getForObject(uri, String.class);
             HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, 
type);
-            setJobInstanceIdAndUri(jobInstance, resultMap);
-        } catch (RestClientException e) {
-            LOGGER.warn("Spark session {} has overdue, set state as unknown!\n 
{}", jobInstance.getSessionId(), e.getMessage());
-            setJobInstanceUnknownStatus(jobInstance);
-        } catch (IOException e) {
-            LOGGER.error("Job instance json converts to map failed. {}", 
e.getMessage());
-        } catch (IllegalArgumentException e) {
-            LOGGER.error("Livy status is illegal. {}", e.getMessage());
+            setJobInstanceIdAndUri(instance, resultMap);
+        } catch (ResourceAccessException e) {
+            LOGGER.error("Your url may be wrong. Please check {}.\n {}", uri, 
e.getMessage());
+        } catch (HttpClientErrorException e) {
+            LOGGER.warn("sessionId({}) appId({}) {}.", 
instance.getSessionId(), instance.getAppId(), e.getMessage());
+            setStateByYarn(instance, e);
         } catch (Exception e) {
-            LOGGER.error("Sync job instances failure. {}", e.getMessage());
-        }
-    }
-
-    private void setJobInstanceIdAndUri(JobInstanceBean instance, 
HashMap<String, Object> resultMap) {
-        if (resultMap != null && resultMap.size() != 0 && 
resultMap.get("state") != null) {
-            
instance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
-            if (resultMap.get("appId") != null) {
-                String appId = String.valueOf(resultMap.get("appId"));
-                String appUri = livyConf.getProperty("spark.uri") + 
"/cluster/app/" + appId;
-                instance.setAppId(appId);
-                instance.setAppUri(appUri);
-            }
-            jobInstanceRepo.save(instance);
-
+            LOGGER.error(e.getMessage());
         }
 
     }
 
-    private void setJobInstanceUnknownStatus(JobInstanceBean jobInstance) {
-        //if server cannot get session from Livy, set State as unknown.
-        jobInstance.setState(LivySessionStates.State.unknown);
-        jobInstanceRepo.save(jobInstance);
-    }
-
-    /**
-     * a job is regard as healthy job when its latest instance is in healthy 
state.
-     *
-     * @return job healthy statistics
-     */
-    @Override
-    public JobHealth getHealthInfo() {
-        JobHealth jobHealth = new JobHealth();
-        List<GriffinJob> jobs = jobRepo.findByDeleted(false);
-        for (GriffinJob job : jobs) {
-            jobHealth = getHealthInfo(jobHealth, job);
+    private void setStateByYarn(JobInstanceBean instance, 
HttpClientErrorException e) {
+        int code = e.getStatusCode().value();
+        boolean match = (code == 400 || code == 404) && instance.getAppId() != 
null;
+        //this means your url is correct,but your param is wrong or livy 
session may be overdue.
+        if (match) {
+            setStateByYarn(instance);
         }
-        return jobHealth;
     }
 
-    private JobHealth getHealthInfo(JobHealth jobHealth, GriffinJob job) {
-        List<Trigger> triggers = getTriggers(job);
-        if (!CollectionUtils.isEmpty(triggers)) {
-            jobHealth.setJobCount(jobHealth.getJobCount() + 1);
-            if (isJobHealthy(job.getId())) {
-                jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 
1);
+    private void setStateByYarn(JobInstanceBean instance) {
+        LOGGER.warn("Spark session {} may be overdue! Now we use yarn to 
update state.", instance.getSessionId());
+        String yarnUrl = livyConf.getProperty("spark.uri");
+        boolean success = YarnNetUtil.update(yarnUrl, instance);
+        if (!success) {
+            if (instance.getState().equals(UNKNOWN)) {
+                return;
             }
+            instance.setState(UNKNOWN);
         }
-        return jobHealth;
+        instanceRepo.save(instance);
     }
 
-    @SuppressWarnings("unchecked")
-    private List<Trigger> getTriggers(GriffinJob job) {
-        JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
-        List<Trigger> triggers;
-        try {
-            triggers = (List<Trigger>) 
factory.getScheduler().getTriggersOfJob(jobKey);
-        } catch (SchedulerException e) {
-            LOGGER.error("Job schedule exception. {}", e.getMessage());
-            throw new GriffinException.ServiceException("Fail to Get 
HealthInfo", e);
+
+    private void setJobInstanceIdAndUri(JobInstanceBean instance, 
HashMap<String, Object> resultMap) {
+        if (resultMap != null) {
+            Object state = resultMap.get("state");
+            Object appId = resultMap.get("appId");
+            instance.setState(state == null ? null : 
LivySessionStates.State.valueOf(state.toString().toUpperCase()));
+            instance.setAppId(appId == null ? null : appId.toString());
+            instance.setAppUri(appId == null ? null : 
livyConf.getProperty("spark.uri") + "/cluster/app/" + appId);
+            instanceRepo.save(instance);
         }
-        return triggers;
     }
 
-    private Boolean isJobHealthy(Long jobId) {
+    public Boolean isJobHealthy(Long jobId) {
         Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
-        List<JobInstanceBean> instances = jobInstanceRepo.findByJobId(jobId, 
pageable);
+        List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, 
pageable);
         return !CollectionUtils.isEmpty(instances) && 
LivySessionStates.isHealthy(instances.get(0).getState());
     }
-
-
 }

Reply via email to