http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java 
b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index e66c7f3..6e04122 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -19,21 +19,22 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.griffin.core.job.entity.JobInstanceBean;
 import org.apache.griffin.core.job.entity.LivyConf;
-import org.apache.griffin.core.job.entity.LivySessionStates;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.job.factory.PredicatorFactory;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
 import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
+import org.apache.griffin.core.util.FileUtil;
 import org.apache.griffin.core.util.JsonUtil;
 import org.quartz.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
@@ -42,9 +43,14 @@ import java.io.IOException;
 import java.util.*;
 
 import static org.apache.griffin.core.job.JobInstance.*;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.FOUND;
+import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_FOUND;
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
 
 @PersistJobDataAfterExecution
 @DisallowConcurrentExecution
+@Component
 public class SparkSubmitJob implements Job {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkSubmitJob.class);
     private static final String SPARK_JOB_JARS_SPLIT = ";";
@@ -55,7 +61,7 @@ public class SparkSubmitJob implements Job {
     @Qualifier("livyConf")
     private Properties livyConfProps;
     @Autowired
-    private JobServiceImpl jobService;
+    private BatchJobOperatorImpl batchJobOp;
 
     private GriffinMeasure measure;
     private String livyUri;
@@ -76,7 +82,7 @@ public class SparkSubmitJob implements Job {
             }
             saveJobInstance(jd);
         } catch (Exception e) {
-            LOGGER.error("Post spark task error.", e);
+            LOGGER.error("Post spark task ERROR.", e);
         }
     }
 
@@ -85,7 +91,7 @@ public class SparkSubmitJob implements Job {
         int repeatCount = simpleTrigger.getRepeatCount();
         int fireCount = simpleTrigger.getTimesTriggered();
         if (fireCount > repeatCount) {
-            saveJobInstance(null, LivySessionStates.State.not_found);
+            saveJobInstance(null, NOT_FOUND);
         }
     }
 
@@ -95,7 +101,7 @@ public class SparkSubmitJob implements Job {
             result = restTemplate.postForObject(livyUri, livyConf, 
String.class);
             LOGGER.info(result);
         } catch (Exception e) {
-            LOGGER.error("Post to livy error. {}", e.getMessage());
+            LOGGER.error("Post to livy ERROR. {}", e.getMessage());
             result = null;
         }
         return result;
@@ -121,11 +127,12 @@ public class SparkSubmitJob implements Job {
 
     private void initParam(JobDetail jd) throws IOException {
         mPredicates = new ArrayList<>();
-        livyUri = livyConfProps.getProperty("livy.uri");
         jobInstance = 
jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME));
         measure = JsonUtil.toEntity(jd.getJobDataMap().getString(MEASURE_KEY), 
GriffinMeasure.class);
+        livyUri = livyConfProps.getProperty("livy.uri");
         setPredicates(jd.getJobDataMap().getString(PREDICATES_KEY));
-        setMeasureInstanceName(measure, jd);
+        // in order to keep metric name unique, we set job name as measure 
name at present
+        measure.setName(jd.getJobDataMap().getString(JOB_NAME));
     }
 
     @SuppressWarnings("unchecked")
@@ -135,26 +142,26 @@ public class SparkSubmitJob implements Job {
         }
         List<Map<String, Object>> maps = JsonUtil.toEntity(json, new 
TypeReference<List<Map>>() {
         });
-        assert maps != null;
         for (Map<String, Object> map : maps) {
             SegmentPredicate sp = new SegmentPredicate();
             sp.setType((String) map.get("type"));
-            sp.setConfigMap((Map<String, String>) map.get("config"));
+            sp.setConfigMap((Map<String, Object>) map.get("config"));
             mPredicates.add(sp);
         }
     }
 
-    private void setMeasureInstanceName(GriffinMeasure measure, JobDetail jd) {
-        // in order to keep metric name unique, we set job name as measure 
name at present
-        measure.setName(jd.getJobDataMap().getString(JOB_NAME));
-    }
-
     private String escapeCharacter(String str, String regex) {
         String escapeCh = "\\" + regex;
         return str.replaceAll(regex, escapeCh);
     }
 
-    private void setLivyConf() throws JsonProcessingException {
+    private String genEnv() throws IOException {
+        ProcessType type = measure.getProcessType();
+        String env = type == BATCH ? FileUtil.readEnv("env/env_batch.json") : 
FileUtil.readEnv("env/env_streaming.json");
+        return env.replaceAll("\\$\\{JOB_NAME}", measure.getName());
+    }
+
+    private void setLivyConf() throws IOException {
         setLivyParams();
         setLivyArgs();
         setLivyJars();
@@ -173,15 +180,15 @@ public class SparkSubmitJob implements Job {
         livyConf.setFiles(new ArrayList<>());
     }
 
-    private void setLivyArgs() throws JsonProcessingException {
+    private void setLivyArgs() throws IOException {
         List<String> args = new ArrayList<>();
-        args.add(livyConfProps.getProperty("sparkJob.args_1"));
+        args.add(genEnv());
         String measureJson = JsonUtil.toJsonWithFormat(measure);
         // to fix livy bug: character ` will be ignored by livy
         String finalMeasureJson = escapeCharacter(measureJson, "\\`");
         LOGGER.info(finalMeasureJson);
         args.add(finalMeasureJson);
-        args.add(livyConfProps.getProperty("sparkJob.args_3"));
+        args.add("raw,raw");
         livyConf.setArgs(args);
     }
 
@@ -202,16 +209,14 @@ public class SparkSubmitJob implements Job {
 
     private void saveJobInstance(JobDetail jd) throws SchedulerException, 
IOException {
         String result = post2Livy();
-        if (result != null) {
-            String group = jd.getKey().getGroup();
-            String name = jd.getKey().getName();
-            jobService.pauseJob(group, name);
-            LOGGER.info("Delete predicate job({},{}) success.", group, name);
-        }
-        saveJobInstance(result, LivySessionStates.State.found);
+        String group = jd.getKey().getGroup();
+        String name = jd.getKey().getName();
+        batchJobOp.deleteJob(group, name);
+        LOGGER.info("Delete predicate job({},{}) SUCCESS.", group, name);
+        saveJobInstance(result, FOUND);
     }
 
-    private void saveJobInstance(String result, LivySessionStates.State state) 
throws IOException {
+    private void saveJobInstance(String result,State state) throws IOException 
{
         TypeReference<HashMap<String, Object>> type = new 
TypeReference<HashMap<String, Object>>() {
         };
         Map<String, Object> resultMap = null;
@@ -222,20 +227,16 @@ public class SparkSubmitJob implements Job {
         jobInstanceRepo.save(jobInstance);
     }
 
-    private void setJobInstance(Map<String, Object> resultMap, 
LivySessionStates.State state) {
+    private void setJobInstance(Map<String, Object> resultMap, State state) {
         jobInstance.setState(state);
-        jobInstance.setDeleted(true);
-        if (resultMap == null) {
-            return;
-        }
-        if (resultMap.get("state") != null) {
-            
jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
-        }
-        if (resultMap.get("id") != null) {
-            
jobInstance.setSessionId(Long.parseLong(resultMap.get("id").toString()));
-        }
-        if (resultMap.get("appId") != null) {
-            jobInstance.setAppId(resultMap.get("appId").toString());
+        jobInstance.setPredicateDeleted(true);
+        if (resultMap != null) {
+            Object status = resultMap.get("state");
+            Object id = resultMap.get("id");
+            Object appId = resultMap.get("appId");
+            jobInstance.setState(status == null ? null : 
State.valueOf(status.toString().toUpperCase()));
+            jobInstance.setSessionId(id == null ? null : 
Long.parseLong(id.toString()));
+            jobInstance.setAppId(appId == null ? null : appId.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
 
b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
new file mode 100644
index 0000000..55c64ab
--- /dev/null
+++ 
b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
@@ -0,0 +1,265 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.job.repo.StreamingJobRepo;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.util.YarnNetUtil;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.CollectionUtils;
+import org.springframework.web.client.ResourceAccessException;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_JOB_NAME;
+import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.STREAMING_JOB_IS_RUNNING;
+import static org.apache.griffin.core.job.JobServiceImpl.START;
+import static org.apache.griffin.core.job.JobServiceImpl.STOP;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State;
+import static 
org.apache.griffin.core.job.entity.LivySessionStates.State.STOPPED;
+import static 
org.apache.griffin.core.job.entity.LivySessionStates.convert2QuartzState;
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
+import static org.quartz.TriggerKey.triggerKey;
+
+@Service
+public class StreamingJobOperatorImpl implements JobOperator {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamingJobOperatorImpl.class);
+    @Autowired
+    private StreamingJobRepo streamingJobRepo;
+    @Autowired
+    @Qualifier("livyConf")
+    private Properties livyConfProps;
+    @Autowired
+    private JobServiceImpl jobService;
+    @Autowired
+    private JobInstanceRepo instanceRepo;
+    @Autowired
+    private SchedulerFactoryBean factory;
+
+    private String livyUri;
+    private RestTemplate restTemplate;
+
+    @PostConstruct
+    public void init() {
+        restTemplate = new RestTemplate();
+        livyUri = livyConfProps.getProperty("livy.uri");
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public JobSchedule add(JobSchedule js, GriffinMeasure measure) throws 
Exception {
+        validateParams(js);
+        String qName = jobService.getQuartzName(js);
+        String qGroup = jobService.getQuartzGroup();
+        TriggerKey triggerKey = jobService.getTriggerKeyIfValid(qName, qGroup);
+        StreamingJob streamingJob = new StreamingJob(js.getMeasureId(), 
js.getJobName(), qName, qGroup, false);
+        streamingJob.setJobSchedule(js);
+        streamingJob = streamingJobRepo.save(streamingJob);
+        jobService.addJob(triggerKey, js, streamingJob, STREAMING);
+        JobSchedule jobSchedule = streamingJob.getJobSchedule();
+        jobSchedule.setId(streamingJob.getId());
+        return jobSchedule;
+    }
+
+    /**
+     * active states: NOT_STARTED, STARTING, RECOVERING, IDLE, RUNNING, BUSY
+     * inactive states: SHUTTING_DOWN, ERROR, DEAD, SUCCESS
+     *
+     * @param job streaming job
+     */
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void start(AbstractJob job) throws Exception {
+        StreamingJob streamingJob = (StreamingJob) job;
+        verifyJobState(streamingJob);
+        streamingJob = streamingJobRepo.save(streamingJob);
+        JobSchedule js = streamingJob.getJobSchedule();
+        String qName = jobService.getQuartzName(js);
+        String qGroup = jobService.getQuartzGroup();
+        TriggerKey triggerKey = triggerKey(qName, qGroup);
+        jobService.addJob(triggerKey, js, streamingJob, STREAMING);
+    }
+
+    private void verifyJobState(StreamingJob job) throws SchedulerException {
+        /* Firstly you should check whether job is scheduled. If it is 
scheduled, triggers are empty. */
+        List<? extends Trigger> triggers = 
jobService.getTriggers(job.getName(), job.getGroup());
+        if (!CollectionUtils.isEmpty(triggers)) {
+            throw new 
GriffinException.BadRequestException(STREAMING_JOB_IS_RUNNING);
+        }
+        /* Secondly you should check whether job instance is running. */
+        List<JobInstanceBean> instances = 
instanceRepo.findByJobId(job.getId());
+        instances.stream().filter(instance -> 
!instance.isDeleted()).forEach(instance -> {
+            State state = instance.getState();
+            String quartzState = convert2QuartzState(state);
+            if (!getStartStatus(quartzState)) {
+                throw new 
GriffinException.BadRequestException(STREAMING_JOB_IS_RUNNING);
+            }
+            instance.setDeleted(true);
+        });
+    }
+
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public void stop(AbstractJob job) throws SchedulerException {
+        StreamingJob streamingJob = (StreamingJob) job;
+        stop(streamingJob, false);
+    }
+
+    @Override
+    public void delete(AbstractJob job) throws SchedulerException {
+        StreamingJob streamingJob = (StreamingJob) job;
+        stop(streamingJob, true);
+    }
+
+
+    @Override
+    public JobHealth getHealth(JobHealth jobHealth, AbstractJob job) {
+        jobHealth.setJobCount(jobHealth.getJobCount() + 1);
+        if (jobService.isJobHealthy(job.getId())) {
+            jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1);
+        }
+        return jobHealth;
+    }
+
+    @Override
+    public JobState getState(AbstractJob job, JobDataBean bean, String action) 
{
+        JobState jobState = new JobState();
+        List<JobInstanceBean> instances = 
instanceRepo.findByJobId(job.getId());
+        for (JobInstanceBean instance : instances) {
+            State state = instance.getState();
+            if (!instance.isDeleted() && state != null) {
+                String quartzState = convert2QuartzState(state);
+                jobState.setState(quartzState);
+                jobState.setToStart(getStartStatus(quartzState));
+                jobState.setToStop(getStopStatus(quartzState));
+                break;
+            }
+        }
+        setStateIfNull(action, jobState);
+        return jobState;
+    }
+
+    private void setStateIfNull(String action, JobState jobState) {
+        if (jobState.getState() == null && START.equals(action)) {
+            jobState.setState("NORMAL");
+            jobState.setToStop(true);
+        } else if (jobState.getState() == null || STOP.equals(action)) {
+            jobState.setState("NONE");
+            jobState.setToStart(true);
+        }
+
+    }
+
+    /**
+     * NORMAL or BLOCKED state of job cannot be started
+     *
+     * @param state job state
+     * @return true: job can be started, false: job is running which cannot be 
started
+     */
+    private boolean getStartStatus(String state) {
+        return !"NORMAL".equals(state) && !"BLOCKED".equals(state);
+    }
+
+    /**
+     * COMPLETE or ERROR state of job cannot be stopped
+     *
+     * @param state job state
+     * @return true: job can be stopped, false: job is running which cannot be 
stopped
+     */
+    private boolean getStopStatus(String state) {
+        return !"COMPLETE".equals(state) && !"ERROR".equals(state);
+    }
+
+    private void deleteByLivy(JobInstanceBean instance) {
+        Long sessionId = instance.getSessionId();
+        if (sessionId == null) {
+            LOGGER.warn("Session id of instance({},{}) is null.", 
instance.getPredicateGroup(), instance.getPredicateName());
+            return;
+        }
+        String url = livyUri + "/" + instance.getSessionId();
+        try {
+            restTemplate.delete(url);
+            LOGGER.info("Job instance({}) has been deleted. {}", 
instance.getSessionId(), url);
+        } catch (ResourceAccessException e) {
+            LOGGER.error("Your url may be wrong. Please check {}.\n {}", 
livyUri, e.getMessage());
+        } catch (RestClientException e) {
+            LOGGER.warn("sessionId({}) appId({}) {}.", 
instance.getSessionId(), instance.getAppId(), e.getMessage());
+            YarnNetUtil.delete(livyUri, instance.getAppId());
+        }
+    }
+
+
+    /**
+     * @param job    streaming job
+     * @param delete true: delete job, false: only stop instance, but not 
delete job
+     */
+    private void stop(StreamingJob job, boolean delete) throws 
SchedulerException {
+        pauseJob(job);
+        /* to prevent situation that streaming job is submitted before pause 
or when pausing. */
+        List<JobInstanceBean> instances = 
instanceRepo.findByJobId(job.getId());
+        instances.stream().filter(instance -> 
!instance.isDeleted()).forEach(instance -> {
+            State state = instance.getState();
+            String quartzState = convert2QuartzState(state);
+            if (getStopStatus(quartzState)) {
+                deleteByLivy(instance);
+
+            }
+            instance.setState(STOPPED);
+            instance.setDeleted(true);
+        });
+        job.setDeleted(delete);
+        streamingJobRepo.save(job);
+    }
+
+    private void pauseJob(StreamingJob job) throws SchedulerException {
+        String name = job.getName();
+        String group = job.getGroup();
+        List<? extends Trigger> triggers = jobService.getTriggers(name, group);
+        if (!CollectionUtils.isEmpty(triggers)) {
+            factory.getScheduler().pauseJob(JobKey.jobKey(name, group));
+        }
+    }
+
+
+    private void validateParams(JobSchedule js) {
+        if (!jobService.isValidJobName(js.getJobName())) {
+            throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
index 7839315..0a7388b 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
@@ -19,6 +19,8 @@ under the License.
 
 package org.apache.griffin.core.job.entity;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
 
 import javax.persistence.*;
@@ -36,14 +38,27 @@ public abstract class AbstractJob extends 
AbstractAuditableEntity {
 
     protected String metricName;
 
+    @Column(name = "quartz_job_name")
+    private String name;
+
+    @Column(name = "quartz_group_name")
+    private String group;
+
+    @JsonIgnore
     protected boolean deleted = false;
 
+    @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+    @JoinColumn(name = "job_schedule_id")
+    private JobSchedule jobSchedule;
+
     AbstractJob() {
     }
 
-    AbstractJob(Long measureId, String jobName, boolean deleted) {
+    AbstractJob(Long measureId, String jobName, String name, String group, 
boolean deleted) {
         this.measureId = measureId;
         this.jobName = jobName;
+        this.name = name;
+        this.group = group;
         this.deleted = deleted;
     }
 
@@ -53,26 +68,32 @@ public abstract class AbstractJob extends 
AbstractAuditableEntity {
         this.metricName = metricName;
     }
 
+    @JsonProperty("job.name")
     public String getJobName() {
         return jobName;
     }
 
+    @JsonProperty("job.name")
     public void setJobName(String jobName) {
         this.jobName = jobName;
     }
 
+    @JsonProperty("metric.name")
     public String getMetricName() {
         return metricName;
     }
 
+    @JsonProperty("metric.name")
     public void setMetricName(String metricName) {
         this.metricName = metricName;
     }
 
+    @JsonProperty("measure.id")
     public Long getMeasureId() {
         return measureId;
     }
 
+    @JsonProperty("measure.id")
     public void setMeasureId(Long measureId) {
         this.measureId = measureId;
     }
@@ -85,4 +106,33 @@ public abstract class AbstractJob extends 
AbstractAuditableEntity {
         this.deleted = deleted;
     }
 
+    @JsonProperty("job.config")
+    public JobSchedule getJobSchedule() {
+        return jobSchedule;
+    }
+
+    @JsonProperty("job.config")
+    public void setJobSchedule(JobSchedule jobSchedule) {
+        this.jobSchedule = jobSchedule;
+    }
+
+    @JsonProperty("quartz.name")
+    public String getName() {
+        return name;
+    }
+
+    @JsonProperty("quartz.name")
+    public void setName(String quartzName) {
+        this.name = quartzName;
+    }
+
+    @JsonProperty("quartz.group")
+    public String getGroup() {
+        return group;
+    }
+
+    @JsonProperty("quartz.group")
+    public void setGroup(String quartzGroup) {
+        this.group = quartzGroup;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java
new file mode 100644
index 0000000..54f938d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import javax.persistence.DiscriminatorValue;
+import javax.persistence.Entity;
+
+@Entity
+@DiscriminatorValue("griffinBatchJob")
+public class BatchJob extends AbstractJob {
+    public BatchJob() {
+        super();
+    }
+
+    public BatchJob(Long measureId, String jobName, String name, String group, 
boolean deleted) {
+        super(measureId, jobName, name, group, deleted);
+        this.metricName = jobName;
+    }
+
+    public BatchJob(Long jobId, Long measureId, String jobName, String 
qJobName, String qGroupName, boolean deleted) {
+        this(measureId, jobName, qJobName, qGroupName, deleted);
+        setId(jobId);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
deleted file mode 100644
index fe9553c..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.entity;
-
-import javax.persistence.*;
-import java.util.ArrayList;
-import java.util.List;
-
-@Entity
-@DiscriminatorValue("griffin_job")
-public class GriffinJob extends AbstractJob {
-
-    @Column(name = "quartz_job_name")
-    private String quartzName;
-
-    @Column(name = "quartz_group_name")
-    private String quartzGroup;
-
-
-    @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
-    @JoinColumn(name = "job_schedule_id")
-    private JobSchedule jobSchedule;
-
-    public String getQuartzName() {
-        return quartzName;
-    }
-
-    public void setQuartzName(String quartzName) {
-        this.quartzName = quartzName;
-    }
-
-    public String getQuartzGroup() {
-        return quartzGroup;
-    }
-
-    public void setQuartzGroup(String quartzGroup) {
-        this.quartzGroup = quartzGroup;
-    }
-
-    public JobSchedule getJobSchedule() {
-        return jobSchedule;
-    }
-
-    public void setJobSchedule(JobSchedule jobSchedule) {
-        this.jobSchedule = jobSchedule;
-    }
-
-
-    public GriffinJob() {
-        super();
-    }
-
-    public GriffinJob(Long measureId, String jobName, String quartzName, 
String quartzGroup, JobSchedule schedule,boolean deleted) {
-        this(measureId, jobName, quartzName, quartzGroup, deleted);
-        this.jobSchedule = schedule;
-    }
-
-    public GriffinJob(Long measureId, String jobName, String quartzName, 
String quartzGroup, boolean deleted) {
-        super(measureId, jobName, deleted);
-        this.metricName = jobName;
-        this.quartzName = quartzName;
-        this.quartzGroup = quartzGroup;
-    }
-
-    public GriffinJob(Long jobId, Long measureId, String jobName, String 
qJobName, String qGroupName, boolean deleted) {
-        this(measureId, jobName, qJobName, qGroupName, deleted);
-        setId(jobId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
index b27ab91..d97dc52 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
@@ -20,7 +20,8 @@ under the License.
 package org.apache.griffin.core.job.entity;
 
 
-import org.quartz.Trigger;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
 
 public class JobDataBean {
 
@@ -30,14 +31,19 @@ public class JobDataBean {
 
     private Long measureId;
 
-    private Trigger.TriggerState triggerState;
+    private JobState jobState;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private Long nextFireTime;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private Long previousFireTime;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String cronExpression;
 
+    private ProcessType processType;
+
     public Long getJobId() {
         return jobId;
     }
@@ -62,12 +68,12 @@ public class JobDataBean {
         this.measureId = measureId;
     }
 
-    public Trigger.TriggerState getTriggerState() {
-        return triggerState;
+    public JobState getJobState() {
+        return jobState;
     }
 
-    public void setTriggerState(Trigger.TriggerState triggerState) {
-        this.triggerState = triggerState;
+    public void setJobState(JobState jobState) {
+        this.jobState = jobState;
     }
 
     public Long getNextFireTime() {
@@ -94,5 +100,11 @@ public class JobDataBean {
         this.cronExpression = cronExpression;
     }
 
+    public ProcessType getProcessType() {
+        return processType;
+    }
 
+    public void setProcessType(ProcessType processType) {
+        this.processType = processType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
index 89e0e8a..32dba65 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
@@ -19,12 +19,17 @@ under the License.
 
 package org.apache.griffin.core.job.entity;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.griffin.core.job.entity.LivySessionStates.State;
 import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import  org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
 
 import javax.persistence.*;
 
+import static 
org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+
 @Entity
 public class JobInstanceBean extends AbstractAuditableEntity {
 
@@ -35,9 +40,15 @@ public class JobInstanceBean extends AbstractAuditableEntity 
{
     @Enumerated(EnumType.STRING)
     private State state;
 
+    @Enumerated(EnumType.STRING)
+    private ProcessType type = BATCH;
+
+    /** The application id of this session **/
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String appId;
 
     @Column(length = 2 * 1024)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String appUri;
 
     @Column(name = "timestamp")
@@ -47,17 +58,32 @@ public class JobInstanceBean extends 
AbstractAuditableEntity {
     private Long expireTms;
 
     @Column(name = "predicate_group_name")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String predicateGroup;
 
     @Column(name = "predicate_job_name")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String predicateName;
 
     @Column(name = "predicate_job_deleted")
+    @JsonIgnore
+    private boolean predicateDeleted = false;
+
+    @JsonIgnore
     private boolean deleted = false;
 
     @ManyToOne
     @JoinColumn(name = "job_id",nullable = false)
-    private GriffinJob griffinJob;
+    @JsonIgnore
+    private AbstractJob job;
+
+    public AbstractJob getJob() {
+        return job;
+    }
+
+    public void setJob(AbstractJob job) {
+        this.job = job;
+    }
 
     public Long getSessionId() {
         return sessionId;
@@ -75,6 +101,14 @@ public class JobInstanceBean extends 
AbstractAuditableEntity {
         this.state = state;
     }
 
+    public ProcessType getType() {
+        return type;
+    }
+
+    public void setType(ProcessType type) {
+        this.type = type;
+    }
+
     public String getAppId() {
         return appId;
     }
@@ -127,6 +161,14 @@ public class JobInstanceBean extends 
AbstractAuditableEntity {
         this.predicateName = predicateName;
     }
 
+    public boolean isPredicateDeleted() {
+        return predicateDeleted;
+    }
+
+    public void setPredicateDeleted(boolean predicateDeleted) {
+        this.predicateDeleted = predicateDeleted;
+    }
+
     public boolean isDeleted() {
         return deleted;
     }
@@ -135,15 +177,13 @@ public class JobInstanceBean extends 
AbstractAuditableEntity {
         this.deleted = deleted;
     }
 
-    public GriffinJob getGriffinJob() {
-        return griffinJob;
-    }
-
-    public void setGriffinJob(GriffinJob griffinJob) {
-        this.griffinJob = griffinJob;
+    public JobInstanceBean() {
     }
 
-    public JobInstanceBean() {
+    public JobInstanceBean(State state, Long tms, Long expireTms) {
+        this.state = state;
+        this.tms = tms;
+        this.expireTms = expireTms;
     }
 
     public JobInstanceBean(State state, String pName, String pGroup, Long tms, 
Long expireTms) {
@@ -154,6 +194,16 @@ public class JobInstanceBean extends 
AbstractAuditableEntity {
         this.expireTms = expireTms;
     }
 
+    public JobInstanceBean(State state, String pName, String pGroup, Long tms, 
Long expireTms,AbstractJob job) {
+        this(state, pName, pGroup, tms, expireTms);
+        this.job = job;
+    }
+
+    public JobInstanceBean(State state, String pName, String pGroup, Long tms, 
Long expireTms, ProcessType type) {
+        this(state, pName, pGroup, tms, expireTms);
+        this.type = type;
+    }
+
     public JobInstanceBean(Long sessionId, State state, String appId, String 
appUri, Long timestamp, Long expireTms) {
         this.sessionId = sessionId;
         this.state = state;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
index c71b6d8..f85af34 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
@@ -20,6 +20,7 @@ under the License.
 package org.apache.griffin.core.job.entity;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -27,7 +28,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
 import org.apache.griffin.core.util.JsonUtil;
 import org.apache.griffin.core.util.PropertiesUtil;
-import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.io.ClassPathResource;
@@ -48,14 +48,17 @@ public class JobSchedule extends AbstractAuditableEntity {
     @NotNull
     private String jobName;
 
-    @NotNull
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String cronExpression;
 
+    @Transient
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private JobState jobState;
+
     @NotNull
     private String timeZone;
 
     @JsonIgnore
-//    @Access(AccessType.PROPERTY)
     private String predicateConfig;
 
     @Transient
@@ -97,13 +100,17 @@ public class JobSchedule extends AbstractAuditableEntity {
 
     @JsonProperty("cron.expression")
     public void setCronExpression(String cronExpression) {
-        if (StringUtils.isEmpty(cronExpression) || 
!isCronExpressionValid(cronExpression)) {
-            LOGGER.warn("Cron expression is invalid.Please check your cron 
expression.");
-            throw new IllegalArgumentException();
-        }
         this.cronExpression = cronExpression;
     }
 
+    public JobState getJobState() {
+        return jobState;
+    }
+
+    public void setJobState(JobState jobState) {
+        this.jobState = jobState;
+    }
+
     @JsonProperty("cron.time.zone")
     public String getTimeZone() {
         return timeZone;
@@ -174,14 +181,6 @@ public class JobSchedule extends AbstractAuditableEntity {
         return scheduleConf;
     }
 
-    private boolean isCronExpressionValid(String cronExpression) {
-        if (!CronExpression.isValidExpression(cronExpression)) {
-            LOGGER.warn("Cron expression {} is invalid.", cronExpression);
-            return false;
-        }
-        return true;
-    }
-
     public JobSchedule() throws JsonProcessingException {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java
new file mode 100644
index 0000000..8219af2
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+/**
+ * Encapsulating job scheduler state to reduce job startup and stop logical 
processing
+ */
+public class JobState {
+
+    /**
+     * job scheduler state
+     */
+    private String state;
+
+    /**
+     * whether job can be started
+     */
+    private boolean toStart = false;
+
+    /**
+     * whether job can be stopped
+     */
+    private boolean toStop = false;
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public boolean isToStart() {
+        return toStart;
+    }
+
+    public void setToStart(boolean toStart) {
+        this.toStart = toStart;
+    }
+
+    public boolean isToStop() {
+        return toStop;
+    }
+
+    public void setToStop(boolean toStop) {
+        this.toStop = toStop;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
 
b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
index 01e5070..51e06b0 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
@@ -20,28 +20,35 @@ under the License.
 package org.apache.griffin.core.job.entity;
 
 import com.cloudera.livy.sessions.SessionState;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.quartz.Trigger;
+
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.*;
+import static org.quartz.Trigger.TriggerState;
 
 public class LivySessionStates {
 
     /**
-     * unknown is used to represent the state that server get null from Livy.
+     * UNKNOWN is used to represent the state that server get null from Livy.
      * the other state is just same as com.cloudera.livy.sessions.SessionState.
      */
     public enum State {
-        not_started,
-        starting,
-        recovering,
-        idle,
-        running,
-        busy,
-        shutting_down,
-        error,
-        dead,
-        success,
-        unknown,
-        finding,
-        not_found,
-        found
+        NOT_STARTED,
+        STARTING,
+        RECOVERING,
+        IDLE,
+        RUNNING,
+        BUSY,
+        SHUTTING_DOWN,
+        ERROR,
+        DEAD,
+        SUCCESS,
+        UNKNOWN,
+        STOPPED,
+        FINDING,
+        NOT_FOUND,
+        FOUND
     }
 
     private static SessionState toSessionState(State state) {
@@ -49,43 +56,93 @@ public class LivySessionStates {
             return null;
         }
         switch (state) {
-            case not_started:
+            case NOT_STARTED:
                 return new SessionState.NotStarted();
-            case starting:
+            case STARTING:
                 return new SessionState.Starting();
-            case recovering:
+            case RECOVERING:
                 return new SessionState.Recovering();
-            case idle:
+            case IDLE:
                 return new SessionState.Idle();
-            case running:
+            case RUNNING:
                 return new SessionState.Running();
-            case busy:
+            case BUSY:
                 return new SessionState.Busy();
-            case shutting_down:
+            case SHUTTING_DOWN:
                 return new SessionState.ShuttingDown();
-            case error:
+            case ERROR:
                 return new SessionState.Error(System.nanoTime());
-            case dead:
+            case DEAD:
                 return new SessionState.Dead(System.nanoTime());
-            case success:
+            case SUCCESS:
                 return new SessionState.Success(System.nanoTime());
             default:
                 return null;
         }
     }
 
+    public static State toLivyState(JsonObject object) {
+        if (object != null) {
+            JsonElement state = object.get("state");
+            JsonElement finalStatus = object.get("finalStatus");
+            State finalState = parseState(state);
+            return finalState != null ? finalState : parseState(finalStatus);
+        }
+        return UNKNOWN;
+    }
+
+    private static State parseState(JsonElement state) {
+        if (state == null) {
+            return null;
+        }
+        switch (state.getAsString()) {
+            case "NEW":
+            case "NEW_SAVING":
+            case "SUBMITTED":
+                return NOT_STARTED;
+            case "ACCEPTED":
+                return STARTING;
+            case "RUNNING":
+                return RUNNING;
+            case "SUCCEEDED":
+                return SUCCESS;
+            case "FAILED":
+                return DEAD;
+            case "KILLED":
+                return SHUTTING_DOWN;
+            case "FINISHED":
+                return null;
+            default:
+                return UNKNOWN;
+        }
+    }
+
     public static boolean isActive(State state) {
-        if (State.unknown.equals(state) || State.finding.equals(state) || 
State.not_found.equals(state) || State.found.equals(state)) {
-            // set unknown isActive() as false.
+        if (UNKNOWN.equals(state) || STOPPED.equals(state) || 
NOT_FOUND.equals(state) || FOUND.equals(state)) {
+            // set UNKNOWN isActive() as false.
             return false;
+        } else if (FINDING.equals(state)) {
+            return true;
         }
         SessionState sessionState = toSessionState(state);
         return sessionState != null && sessionState.isActive();
     }
 
+    public static String convert2QuartzState(State state) {
+        SessionState sessionState = toSessionState(state);
+        if (STOPPED.equals(state) || SUCCESS.equals(state)) {
+            return "COMPLETE";
+        }
+        if (UNKNOWN.equals(state) || NOT_FOUND.equals(state) || 
FOUND.equals(state) || sessionState == null || !sessionState.isActive()) {
+            return "ERROR";
+        }
+        return "NORMAL";
+
+    }
+
     public static boolean isHealthy(State state) {
-        return !(State.error.equals(state) || State.dead.equals(state) ||
-                State.shutting_down.equals(state) || 
State.finding.equals(state) ||
-                State.not_found.equals(state) || State.found.equals(state));
+        return !(State.ERROR.equals(state) || State.DEAD.equals(state) ||
+                State.SHUTTING_DOWN.equals(state) || 
State.FINDING.equals(state) ||
+                State.NOT_FOUND.equals(state) || State.FOUND.equals(state));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
 
b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
index dafba89..b3deb31 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
@@ -38,11 +38,10 @@ public class SegmentPredicate extends 
AbstractAuditableEntity {
     private String type;
 
     @JsonIgnore
-//    @Access(AccessType.PROPERTY)
     private String config;
 
     @Transient
-    private Map<String, String> configMap;
+    private Map<String, Object> configMap;
 
     public String getType() {
         return type;
@@ -53,20 +52,20 @@ public class SegmentPredicate extends 
AbstractAuditableEntity {
     }
 
     @JsonProperty("config")
-    public Map<String, String> getConfigMap() {
+    public Map<String, Object> getConfigMap() {
         return configMap;
     }
 
     @JsonProperty("config")
-    public void setConfigMap(Map<String, String> configMap) {
+    public void setConfigMap(Map<String, Object> configMap) {
         this.configMap = configMap;
     }
 
-    public String getConfig() {
+    private String getConfig() {
         return config;
     }
 
-    public void setConfig(String config) {
+    private void setConfig(String config) {
         this.config = config;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java
new file mode 100644
index 0000000..70a4acc
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import javax.persistence.DiscriminatorValue;
+import javax.persistence.Entity;
+
+@Entity
+@DiscriminatorValue("griffinStreamingJob")
+public class StreamingJob extends AbstractJob {
+
+    public StreamingJob() {
+    }
+
+    public StreamingJob(Long measureId, String jobName, String name, String 
group, boolean deleted) {
+        super(measureId, jobName, name, group, deleted);
+        this.metricName = jobName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java 
b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
index ad98603..ed2c88d 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
@@ -19,9 +19,11 @@ under the License.
 
 package org.apache.griffin.core.job.entity;
 
+import javax.persistence.DiscriminatorValue;
 import javax.persistence.Entity;
 
 @Entity
+@DiscriminatorValue("virtualJob")
 public class VirtualJob extends AbstractJob {
 
     public VirtualJob() {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java 
b/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java
new file mode 100644
index 0000000..665887e
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/BatchJobRepo.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.BatchJob;
+
+public interface BatchJobRepo extends JobRepo<BatchJob> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java 
b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
deleted file mode 100644
index aaaa77d..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.repo;
-
-import org.apache.griffin.core.job.entity.GriffinJob;
-
-public interface GriffinJobRepo extends JobRepo<GriffinJob> {
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java 
b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
index d55d27d..868a321 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
@@ -31,23 +31,19 @@ import static 
org.apache.griffin.core.job.entity.LivySessionStates.State;
 
 public interface JobInstanceRepo extends CrudRepository<JobInstanceBean, Long> 
{
 
-//    @Query("select DISTINCT s from JobInstanceBean s " +
-//            "where s.state in ('starting', 'not_started', 'recovering', 
'idle', 'running', 'busy')")
-//    List<JobInstanceBean> findByActiveState();
-
     JobInstanceBean findByPredicateName(String name);
 
-    @Query("select s from JobInstanceBean s where s.griffinJob.id = ?1")
+    @Query("select s from JobInstanceBean s where s.job.id = ?1")
     List<JobInstanceBean> findByJobId(Long jobId, Pageable pageable);
 
-    @Query("select s from JobInstanceBean s where s.griffinJob.id = ?1")
+    @Query("select s from JobInstanceBean s where s.job.id = ?1")
     List<JobInstanceBean> findByJobId(Long jobId);
 
     List<JobInstanceBean> findByExpireTmsLessThanEqual(Long expireTms);
 
     @Transactional(rollbackFor = Exception.class)
     @Modifying
-    @Query("delete from JobInstanceBean j where j.expireTms <= ?1")
+    @Query("delete from JobInstanceBean j where j.expireTms <= ?1 and 
j.deleted = false ")
     int deleteByExpireTimestamp(Long expireTms);
 
     @Query("select DISTINCT s from JobInstanceBean s " +

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java 
b/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java
new file mode 100644
index 0000000..a6b198b
--- /dev/null
+++ 
b/service/src/main/java/org/apache/griffin/core/job/repo/StreamingJobRepo.java
@@ -0,0 +1,6 @@
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.StreamingJob;
+
+public interface StreamingJobRepo extends JobRepo<StreamingJob> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
 
b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
deleted file mode 100644
index bb76c09..0000000
--- 
a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.measure;
-
-import org.apache.griffin.core.job.entity.VirtualJob;
-import org.apache.griffin.core.job.repo.VirtualJobRepo;
-import org.apache.griffin.core.measure.entity.ExternalMeasure;
-import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.ExternalMeasureRepo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
-import static org.apache.griffin.core.util.MeasureUtil.validateMeasure;
-
-@Component("externalOperation")
-public class ExternalMeasureOperationImpl implements MeasureOperation {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExternalMeasureOperationImpl.class);
-
-    @Autowired
-    private ExternalMeasureRepo measureRepo;
-    @Autowired
-    private VirtualJobRepo jobRepo;
-
-    @Override
-    @Transactional
-    public Measure create(Measure measure) {
-        ExternalMeasure em = (ExternalMeasure) measure;
-        validateMeasure(em);
-        em.setVirtualJob(new VirtualJob());
-        em = measureRepo.save(em);
-        VirtualJob vj = genVirtualJob(em, em.getVirtualJob());
-        jobRepo.save(vj);
-        return em;
-    }
-
-    @Override
-    public void update(Measure measure) {
-        ExternalMeasure latestMeasure = (ExternalMeasure) measure;
-        validateMeasure(latestMeasure);
-        ExternalMeasure originMeasure = 
measureRepo.findOne(latestMeasure.getId());
-        VirtualJob vj = genVirtualJob(latestMeasure, 
originMeasure.getVirtualJob());
-        latestMeasure.setVirtualJob(vj);
-        measureRepo.save(latestMeasure);
-    }
-
-    @Override
-    public void delete(Measure measure) {
-        ExternalMeasure em = (ExternalMeasure) measure;
-        em.setDeleted(true);
-        em.getVirtualJob().setDeleted(true);
-        measureRepo.save(em);
-    }
-
-    private VirtualJob genVirtualJob(ExternalMeasure em, VirtualJob vj) {
-        vj.setMeasureId(em.getId());
-        vj.setJobName(em.getName());
-        vj.setMetricName(em.getMetricName());
-        return vj;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
 
b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
new file mode 100644
index 0000000..1a5068c
--- /dev/null
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperatorImpl.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+import org.apache.griffin.core.job.entity.VirtualJob;
+import org.apache.griffin.core.job.repo.VirtualJobRepo;
+import org.apache.griffin.core.measure.entity.ExternalMeasure;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.repo.ExternalMeasureRepo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import static org.apache.griffin.core.util.MeasureUtil.validateMeasure;
+
+@Component("externalOperation")
+public class ExternalMeasureOperatorImpl implements MeasureOperator {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExternalMeasureOperatorImpl.class);
+
+    @Autowired
+    private ExternalMeasureRepo measureRepo;
+    @Autowired
+    private VirtualJobRepo jobRepo;
+
+    @Override
+    @Transactional
+    public Measure create(Measure measure) {
+        ExternalMeasure em = (ExternalMeasure) measure;
+        validateMeasure(em);
+        em.setVirtualJob(new VirtualJob());
+        em = measureRepo.save(em);
+        VirtualJob vj = genVirtualJob(em, em.getVirtualJob());
+        jobRepo.save(vj);
+        return em;
+    }
+
+    @Override
+    public Measure update(Measure measure) {
+        ExternalMeasure latestMeasure = (ExternalMeasure) measure;
+        validateMeasure(latestMeasure);
+        ExternalMeasure originMeasure = 
measureRepo.findOne(latestMeasure.getId());
+        VirtualJob vj = genVirtualJob(latestMeasure, 
originMeasure.getVirtualJob());
+        latestMeasure.setVirtualJob(vj);
+        measure = measureRepo.save(latestMeasure);
+        return measure;
+    }
+
+    @Override
+    public void delete(Measure measure) {
+        ExternalMeasure em = (ExternalMeasure) measure;
+        em.setDeleted(true);
+        em.getVirtualJob().setDeleted(true);
+        measureRepo.save(em);
+    }
+
+    private VirtualJob genVirtualJob(ExternalMeasure em, VirtualJob vj) {
+        vj.setMeasureId(em.getId());
+        vj.setJobName(em.getName());
+        vj.setMetricName(em.getMetricName());
+        return vj;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
 
b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
deleted file mode 100644
index c15ff23..0000000
--- 
a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.measure;
-
-import org.apache.griffin.core.job.JobServiceImpl;
-import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.MeasureRepo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import static org.apache.griffin.core.util.MeasureUtil.validateMeasure;
-
-@Component("griffinOperation")
-public class GriffinMeasureOperationImpl implements MeasureOperation {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GriffinMeasureOperationImpl.class);
-
-    private final MeasureRepo<Measure> measureRepo;
-
-    private final JobServiceImpl jobService;
-
-    @Autowired
-    public GriffinMeasureOperationImpl(MeasureRepo<Measure> measureRepo, 
JobServiceImpl jobService) {
-        this.measureRepo = measureRepo;
-        this.jobService = jobService;
-    }
-
-
-    @Override
-    public Measure create(Measure measure) {
-        validateMeasure(measure);
-        return measureRepo.save(measure);
-    }
-
-    @Override
-    public void update(Measure measure) {
-        validateMeasure(measure);
-        measure.setDeleted(false);
-        measureRepo.save(measure);
-    }
-
-    @Override
-    public void delete(Measure measure) {
-        jobService.deleteJobsRelateToMeasure(measure.getId());
-        measure.setDeleted(true);
-        measureRepo.save(measure);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java
 
b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java
new file mode 100644
index 0000000..9938e0e
--- /dev/null
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperatorImpl.java
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+import org.apache.griffin.core.job.JobServiceImpl;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import static org.apache.griffin.core.util.MeasureUtil.validateMeasure;
+
+@Component("griffinOperation")
+public class GriffinMeasureOperatorImpl implements MeasureOperator {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(GriffinMeasureOperatorImpl.class);
+
+    private final MeasureRepo<Measure> measureRepo;
+
+    private final JobServiceImpl jobService;
+
+    @Autowired
+    public GriffinMeasureOperatorImpl(MeasureRepo<Measure> measureRepo, 
JobServiceImpl jobService) {
+        this.measureRepo = measureRepo;
+        this.jobService = jobService;
+    }
+
+
+    @Override
+    public Measure create(Measure measure) {
+        validateMeasure(measure);
+        return measureRepo.save(measure);
+    }
+
+    @Override
+    public Measure update(Measure measure) {
+        validateMeasure(measure);
+        measure.setDeleted(false);
+        measure = measureRepo.save(measure);
+        return measure;
+    }
+
+    @Override
+    public void delete(Measure measure) throws SchedulerException {
+        jobService.deleteJobsRelateToMeasure(measure.getId());
+        measure.setDeleted(true);
+        measureRepo.save(measure);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
index d7917e4..98d2211 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
@@ -20,10 +20,12 @@ under the License.
 package org.apache.griffin.core.measure;
 
 import org.apache.griffin.core.measure.entity.Measure;
+import org.quartz.SchedulerException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.*;
 
+import javax.validation.Valid;
 import java.util.List;
 
 @RestController
@@ -44,24 +46,24 @@ public class MeasureController {
 
     @RequestMapping(value = "/measures/{id}", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
-    public void deleteMeasureById(@PathVariable("id") Long id) {
+    public void deleteMeasureById(@PathVariable("id") Long id) throws 
SchedulerException {
         measureService.deleteMeasureById(id);
     }
 
     @RequestMapping(value = "/measures", method = RequestMethod.DELETE)
     @ResponseStatus(HttpStatus.NO_CONTENT)
-    public void deleteMeasures() {
+    public void deleteMeasures() throws SchedulerException {
         measureService.deleteMeasures();
     }
 
     @RequestMapping(value = "/measures", method = RequestMethod.PUT)
-    @ResponseStatus(HttpStatus.NO_CONTENT)
-    public void updateMeasure(@RequestBody Measure measure) {
-        measureService.updateMeasure(measure);
+    @ResponseStatus(HttpStatus.OK)
+    public Measure updateMeasure(@RequestBody Measure measure) {
+        return measureService.updateMeasure(measure);
     }
 
     @RequestMapping(value = "/measures/owner/{owner}", method = 
RequestMethod.GET)
-    public List<Measure> getAliveMeasuresByOwner(@PathVariable("owner") String 
owner) {
+    public List<Measure> getAliveMeasuresByOwner(@PathVariable("owner")  
@Valid String owner) {
         return measureService.getAliveMeasuresByOwner(owner);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
deleted file mode 100644
index 1ebc06c..0000000
--- 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.measure;
-
-
-import org.apache.griffin.core.measure.entity.Measure;
-
-public interface MeasureOperation {
-
-    Measure create(Measure measure);
-
-    void update(Measure measure);
-
-    void delete(Measure measure);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java
new file mode 100644
index 0000000..42a361a
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperator.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+
+import org.apache.griffin.core.measure.entity.Measure;
+import org.quartz.SchedulerException;
+
+public interface MeasureOperator {
+
+    Measure create(Measure measure);
+
+    Measure update(Measure measure);
+
+    void delete(Measure measure) throws SchedulerException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
index 37942bf..d422ce6 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
@@ -21,6 +21,7 @@ package org.apache.griffin.core.measure;
 
 
 import org.apache.griffin.core.measure.entity.Measure;
+import org.quartz.SchedulerException;
 
 import java.util.List;
 
@@ -30,11 +31,11 @@ public interface MeasureService {
 
     Measure getMeasureById(long id);
 
-    void deleteMeasureById(Long id);
+    void deleteMeasureById(Long id) throws SchedulerException;
 
-    void deleteMeasures();
+    void deleteMeasures() throws SchedulerException;
 
-    void updateMeasure(Measure measure);
+    Measure updateMeasure(Measure measure);
 
     List<Measure> getAliveMeasuresByOwner(String owner);
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
index bd24be3..a983f01 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
@@ -21,11 +21,13 @@ package org.apache.griffin.core.measure;
 
 
 import org.apache.griffin.core.exception.GriffinException;
+import org.apache.griffin.core.measure.entity.ExternalMeasure;
 import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.apache.griffin.core.measure.entity.Measure;
 import org.apache.griffin.core.measure.repo.ExternalMeasureRepo;
 import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
 import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +42,8 @@ import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.*;
 @Service
 public class MeasureServiceImpl implements MeasureService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MeasureServiceImpl.class);
+    private static final String GRIFFIN = "griffin";
+    private static final String EXTERNAL = "external";
 
     @Autowired
     private MeasureRepo<Measure> measureRepo;
@@ -49,16 +53,16 @@ public class MeasureServiceImpl implements MeasureService {
     private ExternalMeasureRepo externalMeasureRepo;
     @Autowired
     @Qualifier("griffinOperation")
-    private MeasureOperation griffinOp;
+    private MeasureOperator griffinOp;
     @Autowired
     @Qualifier("externalOperation")
-    private MeasureOperation externalOp;
+    private MeasureOperator externalOp;
 
     @Override
     public List<? extends Measure> getAllAliveMeasures(String type) {
-        if (type.equals("griffin")) {
+        if (type.equals(GRIFFIN)) {
             return griffinMeasureRepo.findByDeleted(false);
-        } else if (type.equals("external")) {
+        } else if (type.equals(EXTERNAL)) {
             return externalMeasureRepo.findByDeleted(false);
         }
         return measureRepo.findByDeleted(false);
@@ -85,12 +89,12 @@ public class MeasureServiceImpl implements MeasureService {
             LOGGER.warn("Failed to create new measure {}, it already exists.", 
measure.getName());
             throw new 
GriffinException.ConflictException(MEASURE_NAME_ALREADY_EXIST);
         }
-        MeasureOperation op = getOperation(measure);
+        MeasureOperator op = getOperation(measure);
         return op.create(measure);
     }
 
     @Override
-    public void updateMeasure(Measure measure) {
+    public Measure updateMeasure(Measure measure) {
         Measure m = measureRepo.findByIdAndDeleted(measure.getId(), false);
         if (m == null) {
             throw new 
GriffinException.NotFoundException(MEASURE_ID_DOES_NOT_EXIST);
@@ -99,34 +103,36 @@ public class MeasureServiceImpl implements MeasureService {
             LOGGER.warn("Can't update measure to different type.");
             throw new 
GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_MATCH);
         }
-        MeasureOperation op = getOperation(measure);
-        op.update(measure);
+        MeasureOperator op = getOperation(measure);
+        return op.update(measure);
     }
 
     @Override
-    public void deleteMeasureById(Long measureId) {
+    public void deleteMeasureById(Long measureId) throws SchedulerException {
         Measure measure = measureRepo.findByIdAndDeleted(measureId, false);
         if (measure == null) {
             throw new 
GriffinException.NotFoundException(MEASURE_ID_DOES_NOT_EXIST);
         }
-        MeasureOperation op = getOperation(measure);
+        MeasureOperator op = getOperation(measure);
         op.delete(measure);
     }
 
     @Override
-    public void deleteMeasures() {
+    public void deleteMeasures() throws SchedulerException {
         List<Measure> measures = measureRepo.findByDeleted(false);
         for (Measure m : measures) {
-            MeasureOperation op = getOperation(m);
+            MeasureOperator op = getOperation(m);
             op.delete(m);
         }
     }
 
-    private MeasureOperation getOperation(Measure measure) {
+    private MeasureOperator getOperation(Measure measure) {
         if (measure instanceof GriffinMeasure) {
             return griffinOp;
+        } else if (measure instanceof ExternalMeasure) {
+            return externalOp;
         }
-        return externalOp;
+        throw new 
GriffinException.BadRequestException(MEASURE_TYPE_DOES_NOT_SUPPORT);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6be53303/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
 
b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
index 532cb48..916df33 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
@@ -28,6 +28,7 @@ import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
 import javax.persistence.*;
@@ -43,10 +44,20 @@ public class DataConnector extends AbstractAuditableEntity {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(DataConnector.class);
 
+    public enum DataType {
+        /**
+         * There are three data source type which we support now.
+         */
+        HIVE,
+        KAFKA,
+        AVRO
+    }
+
     @NotNull
     private String name;
 
-    private String type;
+    @Enumerated(EnumType.STRING)
+    private DataType type;
 
     private String version;
 
@@ -61,16 +72,20 @@ public class DataConnector extends AbstractAuditableEntity {
     private String defaultDataUnit = "365000d";
 
     @JsonIgnore
-//    @Access(AccessType.PROPERTY)
     private String config;
 
     @Transient
-    private Map<String, String> configMap;
+    private Map<String, Object> configMap;
 
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, 
CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "data_connector_id")
     private List<SegmentPredicate> predicates = new ArrayList<>();
 
+    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, 
CascadeType.REMOVE, CascadeType.MERGE})
+    @JoinColumn(name = "pre_process_id")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private List<StreamingPreProcess> preProcess;
+
     public List<SegmentPredicate> getPredicates() {
         return predicates;
     }
@@ -79,21 +94,31 @@ public class DataConnector extends AbstractAuditableEntity {
         this.predicates = predicates;
     }
 
+    @JsonProperty("pre.proc")
+    public List<StreamingPreProcess> getPreProcess() {
+        return CollectionUtils.isEmpty(preProcess) ? null : preProcess;
+    }
+
+    @JsonProperty("pre.proc")
+    public void setPreProcess(List<StreamingPreProcess> preProcess) {
+        this.preProcess = preProcess;
+    }
+
     @JsonProperty("config")
-    public Map<String, String> getConfigMap() {
+    public Map<String, Object> getConfigMap() {
         return configMap;
     }
 
     @JsonProperty("config")
-    public void setConfigMap(Map<String, String> configMap) {
+    public void setConfigMap(Map<String, Object> configMap) {
         this.configMap = configMap;
     }
 
-    public void setConfig(String config) {
+    private void setConfig(String config) {
         this.config = config;
     }
 
-    public String getConfig() {
+    private String getConfig() {
         return config;
     }
 
@@ -137,11 +162,11 @@ public class DataConnector extends 
AbstractAuditableEntity {
         this.name = name;
     }
 
-    public String getType() {
+    public DataType getType() {
         return type;
     }
 
-    public void setType(String type) {
+    public void setType(DataType type) {
         this.type = type;
     }
 
@@ -163,7 +188,7 @@ public class DataConnector extends AbstractAuditableEntity {
 
     @PostLoad
     public void load() throws IOException {
-        if (!org.springframework.util.StringUtils.isEmpty(config)) {
+        if (!StringUtils.isEmpty(config)) {
             this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, Object>>() {
             });
         }
@@ -173,16 +198,16 @@ public class DataConnector extends 
AbstractAuditableEntity {
     public DataConnector() {
     }
 
-    public DataConnector(String name, String type, String version, String 
config) throws IOException {
+    public DataConnector(String name, DataType type, String version, String 
config) throws IOException {
         this.name = name;
         this.type = type;
         this.version = version;
         this.config = config;
-        this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, String>>() {
+        this.configMap = JsonUtil.toEntity(config, new 
TypeReference<Map<String, Object>>() {
         });
     }
 
-    public DataConnector(String name, String dataUnit, Map<String,String> 
configMap, List<SegmentPredicate> predicates) {
+    public DataConnector(String name, String dataUnit, Map configMap, 
List<SegmentPredicate> predicates) {
         this.name = name;
         this.dataUnit = dataUnit;
         this.configMap = configMap;

Reply via email to