http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties 
b/service/src/main/resources/application.properties
index b3f5e94..c7b57be 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -42,11 +42,11 @@ kafka.schema.registry.url = http://localhost:8081
 
 # Update job instance state at regular intervals
 jobInstance.fixedDelay.in.milliseconds = 60000
-# Expired time of job instance which is 7 days that is 604800000 milliseconds
+# Expired time of job instance which is 7 days that is 604800000 
milliseconds.Time unit only supports milliseconds
 jobInstance.expired.milliseconds = 604800000
 
 # schedule predicate job every 5 minutes and repeat 12 times at most
-#interval unit m:minute h:hour d:day,only support these three units
+#interval time unit s:second m:minute h:hour d:day,only support these four 
units
 predicate.job.interval = 5m
 predicate.job.repeat.count = 12
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/main/resources/sparkJob.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/sparkJob.properties 
b/service/src/main/resources/sparkJob.properties
index f9fd2f9..a9be693 100644
--- a/service/src/main/resources/sparkJob.properties
+++ b/service/src/main/resources/sparkJob.properties
@@ -18,13 +18,13 @@
 #
 
 # spark required
-sparkJob.file=hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/jar/griffin-measure.jar
+sparkJob.file=hdfs:///griffin/griffin-measure.jar
 sparkJob.className=org.apache.griffin.measure.Application
-sparkJob.args_1=hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/conf/env.json
+sparkJob.args_1=hdfs:///griffin/json/env.json
 sparkJob.args_3=hdfs,raw
 
 sparkJob.name=griffin
-sparkJob.queue=hdlq-gdi-sla
+sparkJob.queue=default
 
 # options
 sparkJob.numExecutors=10
@@ -33,12 +33,12 @@ sparkJob.driverMemory=2g
 sparkJob.executorMemory=2g
 
 # shouldn't config in server, but in
-sparkJob.jars = 
hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/spark-avro_2.11-2.0.1.jar;\
-  
hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-api-jdo-3.2.6.jar;\
-  
hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-core-3.2.10.jar;\
-  
hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/datanucleus-rdbms-3.2.9.jar
+sparkJob.jars = hdfs://livy/spark-avro_2.11-2.0.1.jar;\
+  hdfs://livy/datanucleus-api-jdo-3.2.6.jar;\
+  hdfs://livy/datanucleus-core-3.2.10.jar;\
+  hdfs://livy/datanucleus-rdbms-3.2.9.jar
 
-spark.yarn.dist.files = 
hdfs://apollo-phx-nn-ha/apps/hdmi-technology/b_des/griffin/livy/hive-site.xml
+spark.yarn.dist.files = hdfs://livy/hive-site.xml
 
 # livy
 # livy.uri=http://10.9.246.187:8998/batches

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java
 
b/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java
new file mode 100644
index 0000000..65a8a1d
--- /dev/null
+++ 
b/service/src/test/java/org/apache/griffin/core/config/PropertiesConfigTest.java
@@ -0,0 +1,160 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.config;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.FileNotFoundException;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+@RunWith(SpringRunner.class)
+//@TestPropertySource("classpath")
+public class PropertiesConfigTest {
+
+    @TestConfiguration
+    public static class PropertiesConf {
+
+        @Bean(name = "noLivyConf")
+        public PropertiesConfig noSparkConf() {
+            return new PropertiesConfig(null);
+        }
+
+        @Bean(name = "livyConf")
+        public PropertiesConfig sparkConf() {
+            return new PropertiesConfig("src/test/resources");
+        }
+
+        @Bean(name = "livyNotFoundConfig")
+        public PropertiesConfig sparkNotFoundConfig() {
+            return new PropertiesConfig("test");
+        }
+
+        @Bean(name = "noQuartzConf")
+        public PropertiesConfig noQuartzConf() {
+            return new PropertiesConfig(null);
+        }
+
+        @Bean(name = "quartzConf")
+        public PropertiesConfig quartzConf() {
+            return new PropertiesConfig("src/test/resources");
+        }
+
+        @Bean(name = "quartzNotFoundConfig")
+        public PropertiesConfig quartzNotFoundConfig() {
+            return new PropertiesConfig("test");
+        }
+    }
+
+    @Autowired
+    @Qualifier(value = "noLivyConf")
+    private PropertiesConfig noLivyConf;
+
+    @Autowired
+    @Qualifier(value = "livyConf")
+    private PropertiesConfig livyConf;
+
+    @Autowired
+    @Qualifier(value = "livyNotFoundConfig")
+    private PropertiesConfig livyNotFoundConfig;
+
+
+    @Autowired
+    @Qualifier(value = "noQuartzConf")
+    private PropertiesConfig noQuartzConf;
+
+    @Autowired
+    @Qualifier(value = "quartzConf")
+    private PropertiesConfig quartzConf;
+
+    @Autowired
+    @Qualifier(value = "quartzNotFoundConfig")
+    private PropertiesConfig quartzNotFoundConfig;
+
+    @Test
+    public void appConf() throws Exception {
+        Properties conf = noLivyConf.appConf();
+        assertEquals(conf.get("spring.datasource.username"),"test");
+    }
+
+    @Test
+    public void livyConfWithLocationNotNull() throws Exception {
+        Properties conf = livyConf.livyConf();
+        assertEquals(conf.get("sparkJob.name"),"test");
+    }
+
+    @Test
+    public void livyConfWithLocationNull() throws Exception {
+        Properties conf = noLivyConf.livyConf();
+        assertEquals(conf.get("sparkJob.name"),"test");
+    }
+
+    @Test
+    public void livyConfWithFileNotFoundException() throws Exception {
+        FileNotFoundException e = livyFileNotFoundException();
+        assert e != null;
+    }
+
+    @Test
+    public void quartzConfWithLocationNotNull() throws Exception {
+        Properties conf = quartzConf.quartzConf();
+        
assertEquals(conf.get("org.quartz.scheduler.instanceName"),"spring-boot-quartz-test");
+    }
+
+    @Test
+    public void quartzConfWithLocationNull() throws Exception {
+        Properties conf = noQuartzConf.quartzConf();
+        
assertEquals(conf.get("org.quartz.scheduler.instanceName"),"spring-boot-quartz-test");
+    }
+
+    @Test
+    public void quartzConfWithFileNotFoundException() throws Exception {
+        FileNotFoundException e = quartzFileNotFoundException();
+        assert e != null;
+    }
+
+    private FileNotFoundException livyFileNotFoundException() {
+        FileNotFoundException exception = null;
+        try {
+            livyNotFoundConfig.livyConf();
+        } catch (FileNotFoundException e) {
+            exception = e;
+        }
+        return exception;
+    }
+
+    private FileNotFoundException quartzFileNotFoundException() {
+        FileNotFoundException exception = null;
+        try {
+            quartzNotFoundConfig.livyConf();
+        } catch (FileNotFoundException e) {
+            exception = e;
+        }
+        return exception;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java 
b/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java
new file mode 100644
index 0000000..6a39685
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/job/JobInstanceTest.java
@@ -0,0 +1,186 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.GriffinJobRepo;
+import org.apache.griffin.core.job.repo.JobScheduleRepo;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
+import org.apache.griffin.core.util.JsonUtil;
+import org.apache.griffin.core.util.PropertiesUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.quartz.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.validation.constraints.AssertTrue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.griffin.core.util.EntityHelper.*;
+import static org.junit.Assert.*;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+
+@RunWith(SpringRunner.class)
+public class JobInstanceTest {
+
+    @TestConfiguration
+    public static class jobInstanceBean{
+        @Bean
+        public JobInstance instance() {
+            return new JobInstance();
+        }
+
+        @Bean(name = "appConf")
+        public Properties sparkJobProps() {
+            String path = "application.properties";
+            return PropertiesUtil.getProperties(path, new 
ClassPathResource(path));
+        }
+
+        @Bean
+        public SchedulerFactoryBean factoryBean() {
+            return new SchedulerFactoryBean();
+        }
+    }
+
+    @Autowired
+    private JobInstance jobInstance;
+
+    @Autowired
+    @Qualifier("appConf")
+    private Properties appConfProps;
+
+    @MockBean
+    private SchedulerFactoryBean factory;
+
+    @MockBean
+    private GriffinMeasureRepo measureRepo;
+
+    @MockBean
+    private GriffinJobRepo jobRepo;
+
+    @MockBean
+    private JobScheduleRepo jobScheduleRepo;
+
+
+
+    @Test
+    public void testExecute() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        Scheduler scheduler = mock(Scheduler.class);
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+        JobSchedule jobSchedule = createJobSchedule();
+        GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", 
false);
+        List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+        given(context.getJobDetail()).willReturn(jd);
+        
given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+        given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+        given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+        given(factory.getObject()).willReturn(scheduler);
+        
given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+        
given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+        given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        jobInstance.execute(context);
+    }
+
+    @Test
+    public void testExecuteWithRangeLessThanZero() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        Scheduler scheduler = mock(Scheduler.class);
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+        JobSchedule jobSchedule = createJobSchedule("jobName",new 
SegmentRange("-1h","-1h"));
+        GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", 
false);
+        List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+        given(context.getJobDetail()).willReturn(jd);
+        
given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+        given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+        given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+        given(factory.getObject()).willReturn(scheduler);
+        
given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+        
given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+        given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        jobInstance.execute(context);
+    }
+
+    @Test
+    public void testExecuteWithRangeGreaterThanDataUnit() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        Scheduler scheduler = mock(Scheduler.class);
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+        JobSchedule jobSchedule = createJobSchedule("jobName",new 
SegmentRange("-1h","5h"));
+        GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", 
false);
+        List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+        given(context.getJobDetail()).willReturn(jd);
+        
given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+        given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+        given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+        given(factory.getObject()).willReturn(scheduler);
+        
given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+        
given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+        given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        jobInstance.execute(context);
+    }
+
+    @Test
+    public void testExecuteWithPredicate() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        Scheduler scheduler = mock(Scheduler.class);
+        GriffinMeasure measure = 
createGriffinMeasure("measureName",createFileExistPredicate(),createFileExistPredicate());
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+        JobSchedule jobSchedule = createJobSchedule("jobName");
+        GriffinJob job = new GriffinJob(1L, "jobName", "qName", "qGroup", 
false);
+        List<Trigger> triggers = Arrays.asList(createSimpleTrigger(2, 0));
+        given(context.getJobDetail()).willReturn(jd);
+        
given(jobScheduleRepo.findOne(Matchers.anyLong())).willReturn(jobSchedule);
+        given(measureRepo.findOne(Matchers.anyLong())).willReturn(measure);
+        given(jobRepo.findOne(Matchers.anyLong())).willReturn(job);
+        given(factory.getObject()).willReturn(scheduler);
+        
given((List<Trigger>)scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+        
given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(false);
+        given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        jobInstance.execute(context);
+    }
+
+    @Test
+    public void testExecuteWithNullException() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        jobInstance.execute(context);
+        assertTrue(true);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java 
b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
index 988a188..d529753 100644
--- a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
@@ -1,374 +1,560 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//
-//package org.apache.griffin.core.job;
-//
-//import org.apache.griffin.core.error.exception.GriffinException;
-//import org.apache.griffin.core.job.entity.GriffinJob;
-//import org.apache.griffin.core.job.entity.JobInstanceBean;
-//import org.apache.griffin.core.job.entity.LivySessionStates;
-//import org.apache.griffin.core.job.repo.JobInstanceRepo;
-//import org.apache.griffin.core.job.repo.JobRepo;
-//import org.apache.griffin.core.job.repo.JobScheduleRepo;
-//import org.apache.griffin.core.measure.repo.MeasureRepo;
-//import org.apache.griffin.core.util.GriffinOperationMessage;
-//import org.apache.griffin.core.util.PropertiesUtil;
-//import org.junit.Before;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.mockito.Matchers;
-//import org.mockito.Mockito;
-//import org.mockito.internal.util.reflection.Whitebox;
-//import org.quartz.*;
-//import org.quartz.impl.JobDetailImpl;
-//import org.quartz.impl.triggers.SimpleTriggerImpl;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.boot.test.context.TestConfiguration;
-//import org.springframework.boot.test.mock.mockito.MockBean;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.core.io.ClassPathResource;
-//import org.springframework.scheduling.quartz.SchedulerFactoryBean;
-//import org.springframework.test.context.junit4.SpringRunner;
-//import org.springframework.web.client.RestTemplate;
-//
-//import java.util.*;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.mockito.BDDMockito.given;
-//import static org.mockito.Mockito.doNothing;
-//import static org.mockito.Mockito.mock;
-//import static org.quartz.TriggerBuilder.newTrigger;
-//
-//@RunWith(SpringRunner.class)
-//public class JobServiceImplTest {
-//
-//    @TestConfiguration
-//    public static class SchedulerServiceConfiguration {
-//        @Bean
-//        public JobServiceImpl service() {
-//            return new JobServiceImpl();
-//        }
-//
-//        @Bean
-//        public SchedulerFactoryBean factoryBean() {
-//            return new SchedulerFactoryBean();
-//        }
-//    }
-//
-//    @MockBean
-//    private JobScheduleRepo jobScheduleRepo;
-//
-//    @MockBean
-//    private MeasureRepo measureRepo;
-//
-//    @MockBean
-//    private JobRepo<GriffinJob> jobRepo;
-//    @MockBean
-//    private JobInstanceRepo jobInstanceRepo;
-//
-//    @MockBean
-//    private SchedulerFactoryBean factory;
-//
-//    @MockBean
-//    private Properties sparkJobProps;
-//
-//    @MockBean
-//    private RestTemplate restTemplate;
-//
-//    @Autowired
-//    private JobServiceImpl service;
-//
-//
-//    @Before
-//    public void setup() {
-//
-//    }
-//
-//    @Test
-//    public void testGetAliveJobsForNormalRun() throws SchedulerException {
-//        Scheduler scheduler = Mockito.mock(Scheduler.class);
-//        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
-//        given(factory.getObject()).willReturn(scheduler);
-//        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-//        JobKey jobKey = new JobKey(job.getQuartzName(), 
job.getQuartzGroup());
-//        SimpleTrigger trigger = new SimpleTriggerImpl();
-//        List<Trigger> triggers = new ArrayList<>();
-//        triggers.add(trigger);
-//        given((List<Trigger>) 
scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-//        assertEquals(service.getAliveJobs().size(), 1);
-//    }
-//
-//    @Test
-//    public void testGetAliveJobsForNoJobsWithTriggerEmpty() throws 
SchedulerException {
-//        Scheduler scheduler = Mockito.mock(Scheduler.class);
-//        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
-//        given(factory.getObject()).willReturn(scheduler);
-//        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-//        JobKey jobKey = new JobKey(job.getQuartzName(), 
job.getQuartzGroup());
-//        List<Trigger> triggers = new ArrayList<>();
-//        given((List<Trigger>) 
scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-//        assertEquals(service.getAliveJobs().size(), 0);
-//    }
-//
-//
-////    @Test
-////    public void testAddJobForSuccess() throws Exception {
-////        JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", 
"YYYYMMdd-HH",
-////                String.valueOf(System.currentTimeMillis()), 
String.valueOf(System.currentTimeMillis()), "1000");
-////        Scheduler scheduler = Mockito.mock(Scheduler.class);
-////        given(factory.getObject()).willReturn(scheduler);
-////        
given(measureRepo.findOne(1L)).willReturn(createATestGriffinMeasure("measureName","org"));
-////        assertEquals(service.addJob("BA", "jobName", 1L, jobRequestBody), 
GriffinOperationMessage.CREATE_JOB_SUCCESS);
-////    }
-////
-////    @Test
-////    public void testAddJobForFailWithFormatError() {
-////        JobRequestBody jobRequestBody = new JobRequestBody();
-////        Scheduler scheduler = Mockito.mock(Scheduler.class);
-////        given(factory.getObject()).willReturn(scheduler);
-////        assertEquals(service.addJob("BA", "jobName", 0L, jobRequestBody), 
GriffinOperationMessage.CREATE_JOB_FAIL);
-////    }
-////
-////    @Test
-////    public void testAddJobForFailWithTriggerKeyExist() throws 
SchedulerException {
-////        String groupName = "BA";
-////        String jobName = "jobName";
-////        JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", 
"YYYYMMdd-HH",
-////                String.valueOf(System.currentTimeMillis()), 
String.valueOf(System.currentTimeMillis()), "1000");
-////        Scheduler scheduler = Mockito.mock(Scheduler.class);
-////        given(factory.getObject()).willReturn(scheduler);
-////        given(scheduler.checkExists(TriggerKey.triggerKey(jobName, 
groupName))).willReturn(true);
-////        assertEquals(service.addJob(groupName, jobName, 0L, 
jobRequestBody), GriffinOperationMessage.CREATE_JOB_FAIL);
-////    }
-////
-////    @Test
-////    public void testAddJobForFailWithScheduleException() throws 
SchedulerException {
-////        String groupName = "BA";
-////        String jobName = "jobName";
-////        JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", 
"YYYYMMdd-HH",
-////                String.valueOf(System.currentTimeMillis()), 
String.valueOf(System.currentTimeMillis()), "1000");
-////        Scheduler scheduler = Mockito.mock(Scheduler.class);
-////        given(factory.getObject()).willReturn(scheduler);
-////        Trigger trigger = 
newTrigger().withIdentity(TriggerKey.triggerKey(jobName, groupName)).build();
-////        
given(scheduler.scheduleJob(trigger)).willThrow(SchedulerException.class);
-////        assertEquals(service.addJob(groupName, jobName, 0L, 
jobRequestBody), GriffinOperationMessage.CREATE_JOB_FAIL);
-////    }
-//
-//    @Test
-//    public void testDeleteJobForJobIdSuccess() throws SchedulerException {
-//        Long jobId = 1L;
-////        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", "pJobName", "pGroupName", false);
-////        Scheduler scheduler = Mockito.mock(Scheduler.class);
-////        JobKey jobKey = new JobKey(job.getQuartzJobName(), 
job.getQuartzGroupName());
-////        JobKey pJobKey = new JobKey(job.getJobName(), job.getGroupName());
-////        given(factory.getObject()).willReturn(scheduler);
-////        given(scheduler.checkExists(pJobKey)).willReturn(true);
-////        given(scheduler.checkExists(jobKey)).willReturn(true);
-////        doNothing().when(scheduler).pauseJob(pJobKey);
-////        doNothing().when(scheduler).pauseJob(jobKey);
-////        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
-////        assertEquals(service.deleteJob(jobId), 
GriffinOperationMessage.DELETE_JOB_SUCCESS);
-//    }
-//
-//    @Test
-//    public void testDeleteJobForJobIdFailureWithNull() throws 
SchedulerException {
-//        Long jobId = 1L;
-//        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
-//        assertEquals(service.deleteJob(jobId), 
GriffinOperationMessage.DELETE_JOB_FAIL);
-//    }
-//
-//    @Test
-//    public void testDeleteJobForJobIdFailureWithTriggerNotExist() throws 
SchedulerException {
-//        Long jobId = 1L;
-//        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
-//        Scheduler scheduler = Mockito.mock(Scheduler.class);
-//        JobKey jobKey = new JobKey(job.getQuartzName(), 
job.getQuartzGroup());
-//        given(factory.getObject()).willReturn(scheduler);
-//        given(scheduler.checkExists(jobKey)).willReturn(false);
-//        assertEquals(service.deleteJob(jobId), 
GriffinOperationMessage.DELETE_JOB_FAIL);
-//    }
-//
-//
-//    @Test
-//    public void testDeleteJobForJobNameSuccess() throws SchedulerException {
-//        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
-//        Scheduler scheduler = Mockito.mock(Scheduler.class);
-//        JobKey jobKey = new JobKey(job.getQuartzName(), 
job.getQuartzGroup());
-////        given(jobRepo.findByJobNameAndDeleted(job.getJobName(), 
false)).willReturn(Arrays.asList(job));
-//        given(factory.getObject()).willReturn(scheduler);
-//        given(scheduler.checkExists(jobKey)).willReturn(true);
-//        doNothing().when(scheduler).pauseJob(jobKey);
-//        assertEquals(service.deleteJob(job.getJobName()), 
GriffinOperationMessage.DELETE_JOB_SUCCESS);
-//    }
-//
-//    @Test
-//    public void testDeleteJobForJobNameFailureWithNull() throws 
SchedulerException {
-//        String jobName = "jobName";
-////        given(jobRepo.findByJobNameAndDeleted(jobName, 
false)).willReturn(new ArrayList<>());
-//        assertEquals(service.deleteJob(jobName), 
GriffinOperationMessage.DELETE_JOB_FAIL);
-//    }
-//
-//    @Test
-//    public void testDeleteJobForJobNameFailureWithTriggerNotExist() throws 
SchedulerException {
-//        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
-//        Scheduler scheduler = Mockito.mock(Scheduler.class);
-//        JobKey jobKey = new JobKey(job.getQuartzName(), 
job.getQuartzGroup());
-////        given(jobRepo.findByJobNameAndDeleted(job.getJobName(), 
false)).willReturn(Arrays.asList(job));
-//        given(factory.getObject()).willReturn(scheduler);
-//        given(scheduler.checkExists(jobKey)).willReturn(false);
-//        assertEquals(service.deleteJob(job.getJobName()), 
GriffinOperationMessage.DELETE_JOB_FAIL);
-//    }
-//
-////    @Test
-////    public void testFindInstancesOfJobForSuccess() throws 
SchedulerException {
-////        Long jobId = 1L;
-////        int page = 0;
-////        int size = 2;
-////        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
-////        JobInstanceBean jobInstance = new JobInstanceBean(1L, 
LivySessionStates.State.dead, "app_id", "app_uri", System.currentTimeMillis(), 
System.currentTimeMillis());
-////        Pageable pageRequest = new PageRequest(page, size, 
Sort.Direction.DESC, "timestamp");
-////        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
-////        given(jobInstanceRepo.findByJobId(1L, 
pageRequest)).willReturn(Arrays.asList(jobInstance));
-////        assertEquals(service.findInstancesOfJob(1L, page, size).size(), 1);
-////    }
-////
-////    @Test
-////    public void testFindInstancesOfJobForNull() throws SchedulerException {
-////        Long jobId = 1L;
-////        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
-////        assertEquals(service.findInstancesOfJob(jobId, 0, 2).size(), 0);
-////    }
-////
-////    @Test
-////    public void testSyncInstancesOfJobForSuccess() {
-////        JobInstanceBean instance = createJobInstance();
-////        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-////        Whitebox.setInternalState(service, "restTemplate", restTemplate);
-////        String result = 
"{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
-////        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn(result);
-////        service.syncInstancesOfAllJobs();
-////    }
-//
-//    @Test
-//    public void testSyncInstancesOfJobForRestClientException() {
-//        JobInstanceBean instance = createJobInstance();
-//        instance.setSessionId(1234564L);
-//        String path = "/sparkJob.properties";
-//        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-//        
given(sparkJobProps.getProperty("livy.uri")).willReturn(PropertiesUtil.getProperties(path,new
 ClassPathResource(path)).getProperty("livy.uri"));
-//        service.syncInstancesOfAllJobs();
-//    }
-//
-//    @Test
-//    public void testSyncInstancesOfJobForIOException() throws Exception {
-//        JobInstanceBean instance = createJobInstance();
-//        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-//        Whitebox.setInternalState(service, "restTemplate", restTemplate);
-//        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn("result");
-//        service.syncInstancesOfAllJobs();
-//    }
-//
-//    @Test
-//    public void testSyncInstancesOfJobForIllegalArgumentException() throws 
Exception {
-//        JobInstanceBean instance = createJobInstance();
-//        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
-//        Whitebox.setInternalState(service, "restTemplate", restTemplate);
-//        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn("{\"state\":\"wrong\"}");
-//        service.syncInstancesOfAllJobs();
-//    }
-//
-////    @Test
-////    public void testGetHealthInfoWithHealthy() throws SchedulerException {
-////        Scheduler scheduler = Mockito.mock(Scheduler.class);
-////        GriffinJob job = new GriffinJob(1L, 1L, "jobName", 
"quartzJobName", "quartzGroupName", false);
-////        given(factory.getObject()).willReturn(scheduler);
-////        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-////        JobKey jobKey = new JobKey(job.getQuartzJobName(), 
job.getQuartzGroupName());
-////        SimpleTrigger trigger = new SimpleTriggerImpl();
-////        List<Trigger> triggers = new ArrayList<>();
-////        triggers.add(trigger);
-////        given((List<Trigger>) 
scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-////
-////        Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, 
"timestamp");
-////        List<JobInstanceBean> scheduleStateList = new ArrayList<>();
-////        scheduleStateList.add(createJobInstance());
-////        given(jobInstanceRepo.findByJobId(1L, 
pageRequest)).willReturn(scheduleStateList);
-////        assertEquals(service.getHealthInfo().getHealthyJobCount(), 1);
-////
-////    }
-////
-////    @Test
-////    public void testGetHealthInfoWithUnhealthy() throws SchedulerException 
{
-////        Scheduler scheduler = Mockito.mock(Scheduler.class);
-////        GriffinJob job = new GriffinJob(1L, 1L, "jobName", 
"quartzJobName", "quartzGroupName", false);
-////        given(factory.getObject()).willReturn(scheduler);
-////        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
-////        JobKey jobKey = new JobKey(job.getQuartzJobName(), 
job.getQuartzGroupName());
-////        SimpleTrigger trigger = new SimpleTriggerImpl();
-////        List<Trigger> triggers = new ArrayList<>();
-////        triggers.add(trigger);
-////        given((List<Trigger>) 
scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
-////
-////        Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, 
"timestamp");
-////        List<JobInstanceBean> scheduleStateList = new ArrayList<>();
-////        JobInstanceBean instance = createJobInstance();
-////        instance.setState(LivySessionStates.State.error);
-////        scheduleStateList.add(instance);
-////        given(jobInstanceRepo.findByJobId(1L, 
pageRequest)).willReturn(scheduleStateList);
-////        assertEquals(service.getHealthInfo().getHealthyJobCount(), 0);
-////    }
-//
-//    private void mockJsonDataMap(Scheduler scheduler, JobKey jobKey, Boolean 
deleted) throws SchedulerException {
-//        JobDataMap jobDataMap = mock(JobDataMap.class);
-//        JobDetailImpl jobDetail = new JobDetailImpl();
-//        jobDetail.setJobDataMap(jobDataMap);
-//        given(scheduler.getJobDetail(jobKey)).willReturn(jobDetail);
-//        
given(jobDataMap.getBooleanFromString("deleted")).willReturn(deleted);
-//    }
-//
-//    private Trigger newTriggerInstance(String name, String group, int 
internalInSeconds) {
-//        return newTrigger().withIdentity(TriggerKey.triggerKey(name, group)).
-//                withSchedule(SimpleScheduleBuilder.simpleSchedule()
-//                        .withIntervalInSeconds(internalInSeconds)
-//                        .repeatForever()).startAt(new Date()).build();
-//    }
-//
-//
-//    private GriffinException.GetJobsFailureException 
getTriggersOfJobExpectException(Scheduler scheduler, JobKey jobKey) {
-//        GriffinException.GetJobsFailureException exception = null;
-//        try {
-//            given(scheduler.getTriggersOfJob(jobKey)).willThrow(new 
GriffinException.GetJobsFailureException());
-//            service.getAliveJobs();
-//        } catch (GriffinException.GetJobsFailureException e) {
-//            exception = e;
-//        } catch (SchedulerException e) {
-//            e.printStackTrace();
-//        }
-//        return exception;
-//    }
-//
-//    private JobInstanceBean createJobInstance() {
-//        JobInstanceBean jobBean = new JobInstanceBean();
-//        jobBean.setSessionId(1L);
-//        jobBean.setState(LivySessionStates.State.starting);
-//        jobBean.setAppId("app_id");
-//        jobBean.setTms(System.currentTimeMillis());
-//        return jobBean;
-//    }
-//}
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import org.apache.griffin.core.error.exception.GriffinException;
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.GriffinJobRepo;
+import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.job.repo.JobScheduleRepo;
+import org.apache.griffin.core.measure.entity.DataConnector;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.quartz.*;
+import org.quartz.impl.triggers.SimpleTriggerImpl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.data.domain.Sort;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.griffin.core.util.EntityHelper.*;
+import static org.apache.griffin.core.util.GriffinOperationMessage.*;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.doThrow;
+
+@RunWith(SpringRunner.class)
+public class JobServiceImplTest {
+
+    @TestConfiguration
+    public static class SchedulerServiceConfiguration {
+        @Bean
+        public JobServiceImpl service() {
+            return new JobServiceImpl();
+        }
+
+        @Bean
+        public SchedulerFactoryBean factoryBean() {
+            return new SchedulerFactoryBean();
+        }
+    }
+
+    @MockBean
+    private JobScheduleRepo jobScheduleRepo;
+
+    @MockBean
+    private GriffinMeasureRepo griffinMeasureRepo;
+
+    @MockBean
+    private GriffinJobRepo jobRepo;
+
+    @MockBean
+    private JobInstanceRepo jobInstanceRepo;
+
+    @MockBean
+    private SchedulerFactoryBean factory;
+
+    @MockBean(name = "livyConf")
+    private Properties sparkJobProps;
+
+    @MockBean
+    private RestTemplate restTemplate;
+
+    @Autowired
+    private JobServiceImpl service;
+
+
+    @Before
+    public void setup() {
+
+    }
+
+    @Test
+    public void testGetAliveJobsForSuccess() throws SchedulerException {
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+        SimpleTrigger trigger = new SimpleTriggerImpl();
+        given((List<Trigger>) 
scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(Arrays.asList(trigger));
+        assertEquals(service.getAliveJobs().size(), 1);
+    }
+
+    @Test
+    public void testGetAliveJobsForNoJobsWithTriggerEmpty() throws 
SchedulerException {
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+        given((List<Trigger>) 
scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(new 
ArrayList<>());
+        assertEquals(service.getAliveJobs().size(), 0);
+    }
+
+    @Test
+    public void testGetAliveJobsForNoJobsWithException() throws 
SchedulerException {
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+        GriffinException.GetJobsFailureException exception = 
getExceptionForGetAliveJObs(scheduler);
+        assert exception != null;
+    }
+
+
+    @Test
+    public void testAddJobForSuccess() throws Exception {
+        JobSchedule js = createJobSchedule();
+        js.setId(1L);
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        given(jobRepo.countByJobNameAndDeleted(js.getJobName(), 
false)).willReturn(0);
+        given(jobScheduleRepo.save(js)).willReturn(js);
+        given(jobRepo.save(Matchers.any(GriffinJob.class))).willReturn(job);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_SUCCESS);
+    }
+
+    @Test
+    public void testAddJobForFailureWithMeasureNull() throws Exception {
+        JobSchedule js = createJobSchedule();
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(null);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testAddJobForFailureWitJobNameRepeat() throws Exception {
+        JobSchedule js = createJobSchedule();
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        given(jobRepo.countByJobNameAndDeleted(js.getJobName(), 
false)).willReturn(1);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testAddJobForFailureWitJobNameNull() throws Exception {
+        JobSchedule js = createJobSchedule(null);
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testAddJobForFailureWithBaselineInvalid() throws Exception {
+        JobDataSegment source = createJobDataSegment("source_name", false);
+        JobDataSegment target = createJobDataSegment("target_name", false);
+        JobSchedule js = createJobSchedule("jobName", source, target);
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testAddJobForFailureWithConnectorNameInvalid() throws 
Exception {
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDataSegment source = createJobDataSegment("source_connector_name", 
true);
+        JobDataSegment target = createJobDataSegment("target_name", false);
+        JobSchedule js = createJobSchedule("jobName", source, target);
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testAddJobForFailureWithMeasureConnectorNameRepeat() throws 
Exception {
+        JobSchedule js = createJobSchedule();
+        DataConnector dcSource = createDataConnector("connector_name", 
"default", "test_data_src", "dt=#YYYYMMdd# AND hour=#HH#");
+        DataConnector dcTarget = createDataConnector("connector_name", 
"default", "test_data_tgt", "dt=#YYYYMMdd# AND hour=#HH#");
+        GriffinMeasure measure = createGriffinMeasure("measureName", dcSource, 
dcTarget);
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testAddJobForFailureWithJobScheduleConnectorNameRepeat() 
throws Exception {
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDataSegment source = createJobDataSegment("source_name", true);
+        JobDataSegment target = createJobDataSegment("source_name", false);
+        JobSchedule js = createJobSchedule("jobName", source, target);
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testAddJobForFailureWithTriggerKeyExist() throws Exception {
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDataSegment source = createJobDataSegment("source_name", true);
+        JobDataSegment target = createJobDataSegment("target_name", false);
+        JobSchedule js = createJobSchedule("jobName", source, target);
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(griffinMeasureRepo.findByIdAndDeleted(js.getMeasureId(), 
false)).willReturn(measure);
+        
given(scheduler.checkExists(Matchers.any(TriggerKey.class))).willReturn(true);
+        GriffinOperationMessage message = service.addJob(js);
+        assertEquals(message, CREATE_JOB_FAIL);
+    }
+
+    @Test
+    public void testDeleteJobByIdForSuccessWithTriggerKeyExist() throws 
SchedulerException {
+        Long jobId = 1L;
+        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+        job.setJobInstances(Arrays.asList(instance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        assertEquals(service.deleteJob(jobId), DELETE_JOB_SUCCESS);
+    }
+
+    @Test
+    public void testDeleteJobByIdForSuccessWithTriggerKeyNotExist() throws 
SchedulerException {
+        Long jobId = 1L;
+        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+        job.setJobInstances(Arrays.asList(instance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        assertEquals(service.deleteJob(jobId), DELETE_JOB_SUCCESS);
+    }
+
+    @Test
+    public void testDeleteJobByIdForFailureWithNull() throws 
SchedulerException {
+        Long jobId = 1L;
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
+        assertEquals(service.deleteJob(jobId), DELETE_JOB_FAIL);
+    }
+
+    @Test
+    public void testDeleteJobByIdForFailureWithException() throws 
SchedulerException {
+        Long jobId = 1L;
+        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        
doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+        assertEquals(service.deleteJob(jobId), DELETE_JOB_FAIL);
+    }
+
+
+    @Test
+    public void testDeleteJobByNameForSuccessWithTriggerKeyExist() throws 
SchedulerException {
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+        job.setJobInstances(Arrays.asList(instance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(jobRepo.findByJobNameAndDeleted(job.getJobName(), 
false)).willReturn(Arrays.asList(job));
+        given(factory.getObject()).willReturn(scheduler);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        assertEquals(service.deleteJob(job.getJobName()), DELETE_JOB_SUCCESS);
+    }
+
+    @Test
+    public void testDeleteJobByNameForSuccessWithTriggerKeyNotExist() throws 
SchedulerException {
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+        job.setJobInstances(Arrays.asList(instance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(jobRepo.findByJobNameAndDeleted(job.getJobName(), 
false)).willReturn(Arrays.asList(job));
+        given(factory.getObject()).willReturn(scheduler);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        assertEquals(service.deleteJob(job.getJobName()), DELETE_JOB_SUCCESS);
+    }
+
+    @Test
+    public void testDeleteJobByJobNameForFailureWithNull() throws 
SchedulerException {
+        String jobName = "jobName";
+        given(jobRepo.findByJobNameAndDeleted(jobName, false)).willReturn(new 
ArrayList<>());
+        assertEquals(service.deleteJob(jobName), DELETE_JOB_FAIL);
+    }
+
+    @Test
+    public void testDeleteJobByJobNameForFailureWithException() throws 
SchedulerException {
+        Long jobId = 1L;
+        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByJobNameAndDeleted(job.getJobName(), 
false)).willReturn(Arrays.asList(job));
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        
doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+        assertEquals(service.deleteJob(jobId), DELETE_JOB_FAIL);
+    }
+
+    @Test
+    public void testDeleteJobsRelateToMeasureForSuccessWithTriggerKeyExist() 
throws SchedulerException {
+        Long jobId = 1L;
+        Long measureId = 1L;
+        GriffinJob job = new GriffinJob(measureId, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+        job.setJobInstances(Arrays.asList(instance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+        given(jobRepo.findByMeasureIdAndDeleted(measureId, 
false)).willReturn(Arrays.asList(job));
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        assertEquals(service.deleteJobsRelateToMeasure(measureId), true);
+    }
+
+    @Test
+    public void 
testDeleteJobsRelateToMeasureForSuccessWithTriggerKeyNotExist() throws 
SchedulerException {
+        Long jobId = 1L;
+        Long measureId = 1L;
+        GriffinJob job = new GriffinJob(measureId, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+        job.setJobInstances(Arrays.asList(instance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+        given(jobRepo.findByMeasureIdAndDeleted(measureId, 
false)).willReturn(Arrays.asList(job));
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        assertEquals(service.deleteJobsRelateToMeasure(measureId), true);
+    }
+
+    @Test
+    public void testDeleteJobsRelateToMeasureForSuccessWithNull() throws 
SchedulerException {
+        Long measureId = 1L;
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByMeasureIdAndDeleted(measureId, 
false)).willReturn(null);
+        assertEquals(service.deleteJobsRelateToMeasure(measureId), true);
+    }
+
+    @Test
+    public void testDeleteJobsRelateToMeasureForFailureWithException() throws 
SchedulerException {
+        Long jobId = 1L;
+        Long measureId = 1L;
+        GriffinJob job = new GriffinJob(measureId, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean instance = new 
JobInstanceBean(LivySessionStates.State.finding, "pName", "pGroup", null, null);
+        job.setJobInstances(Arrays.asList(instance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+        given(jobRepo.findByMeasureIdAndDeleted(measureId, 
false)).willReturn(Arrays.asList(job));
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        
doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+        assertEquals(service.deleteJobsRelateToMeasure(measureId), false);
+    }
+
+    @Test
+    public void testFindInstancesOfJobForSuccess() throws SchedulerException {
+        Long jobId = 1L;
+        int page = 0;
+        int size = 2;
+        GriffinJob job = new GriffinJob(1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        JobInstanceBean jobInstance = new JobInstanceBean(1L, 
LivySessionStates.State.dead, "app_id", "app_uri", null, null);
+        Pageable pageRequest = new PageRequest(page, size, 
Sort.Direction.DESC, "tms");
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(job);
+        given(jobInstanceRepo.findByJobId(1L, 
pageRequest)).willReturn(Arrays.asList(jobInstance));
+        assertEquals(service.findInstancesOfJob(1L, page, size).size(), 1);
+    }
+
+    @Test
+    public void testFindInstancesOfJobWithNull() throws SchedulerException {
+        Long jobId = 1L;
+        given(jobRepo.findByIdAndDeleted(jobId, false)).willReturn(null);
+        assertEquals(service.findInstancesOfJob(jobId, 0, 2).size(), 0);
+    }
+
+    @Test
+    public void testDeleteExpiredJobInstanceForSuccessWithTriggerKeyExist() 
throws SchedulerException {
+        JobInstanceBean jobInstance = new 
JobInstanceBean(LivySessionStates.State.dead, "pName", "pGroup", null, null);
+        
given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(Arrays.asList(jobInstance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        service.deleteExpiredJobInstance();
+    }
+
+    @Test
+    public void testDeleteExpiredJobInstanceForSuccessWithTriggerKeyNotExist() 
throws SchedulerException {
+        JobInstanceBean jobInstance = new 
JobInstanceBean(LivySessionStates.State.dead, "pName", "pGroup", null, null);
+        
given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(Arrays.asList(jobInstance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(false);
+        service.deleteExpiredJobInstance();
+    }
+
+    @Test
+    public void testDeleteExpiredJobInstanceForSuccessWithNull() throws 
SchedulerException {
+        
given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(null);
+        service.deleteExpiredJobInstance();
+    }
+
+    @Test
+    public void testDeleteExpiredJobInstanceForFailureWithException() throws 
SchedulerException {
+        JobInstanceBean jobInstance = new 
JobInstanceBean(LivySessionStates.State.dead, "pName", "pGroup", null, null);
+        
given(jobInstanceRepo.findByExpireTmsLessThanEqual(Matchers.any())).willReturn(Arrays.asList(jobInstance));
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        given(factory.getObject()).willReturn(scheduler);
+        
given(scheduler.checkExists(Matchers.any(JobKey.class))).willReturn(true);
+        
doThrow(SchedulerException.class).when(scheduler).pauseJob(Matchers.any(JobKey.class));
+        service.deleteExpiredJobInstance();
+    }
+
+    @Test
+    public void testSyncInstancesOfJobForSuccess() {
+        JobInstanceBean instance = createJobInstance();
+        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        String result = 
"{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn(result);
+        service.syncInstancesOfAllJobs();
+    }
+
+    @Test
+    public void testSyncInstancesOfJobForFailureWithRestClientException() {
+        JobInstanceBean instance = createJobInstance();
+        instance.setSessionId(1234564L);
+        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willThrow(RestClientException.class);
+        service.syncInstancesOfAllJobs();
+    }
+
+    @Test
+    public void testSyncInstancesOfJobForFailureWithIOException() throws 
Exception {
+        JobInstanceBean instance = createJobInstance();
+        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn("result");
+        service.syncInstancesOfAllJobs();
+    }
+
+    @Test
+    public void testSyncInstancesOfJobForFailureWithIllegalArgumentException() 
throws Exception {
+        JobInstanceBean instance = createJobInstance();
+        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn("{\"state\":\"wrong\"}");
+        service.syncInstancesOfAllJobs();
+    }
+
+    @Test
+    public void testSyncInstancesOfJobForFailureWithException() throws 
Exception {
+        JobInstanceBean instance = createJobInstance();
+        
given(jobInstanceRepo.findByActiveState()).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        String result = 
"{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn(result);
+        
doThrow(Exception.class).when(jobInstanceRepo).save(Matchers.any(JobInstanceBean.class));
+        service.syncInstancesOfAllJobs();
+    }
+
+    @Test
+    public void testGetHealthInfoWithHealthy() throws SchedulerException {
+        Long jobId = 1L;
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        GriffinJob job = new GriffinJob(jobId, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+        SimpleTrigger trigger = new SimpleTriggerImpl();
+        List<Trigger> triggers = new ArrayList<>();
+        triggers.add(trigger);
+        given((List<Trigger>) 
scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(triggers);
+
+        Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, 
"tms");
+        given(jobInstanceRepo.findByJobId(jobId, 
pageRequest)).willReturn(Arrays.asList(createJobInstance()));
+        assertEquals(service.getHealthInfo().getHealthyJobCount(), 1);
+
+    }
+
+    @Test
+    public void testGetHealthInfoWithUnhealthy() throws SchedulerException {
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+        SimpleTrigger trigger = new SimpleTriggerImpl();
+        given((List<Trigger>) 
scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willReturn(Arrays.asList(trigger));
+
+        Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, 
"tms");
+        List<JobInstanceBean> scheduleStateList = new ArrayList<>();
+        JobInstanceBean instance = createJobInstance();
+        instance.setState(LivySessionStates.State.error);
+        scheduleStateList.add(instance);
+        given(jobInstanceRepo.findByJobId(1L, 
pageRequest)).willReturn(scheduleStateList);
+        assertEquals(service.getHealthInfo().getHealthyJobCount(), 0);
+    }
+
+    @Test
+    public void testGetHealthInfoWithException() throws SchedulerException {
+        Scheduler scheduler = Mockito.mock(Scheduler.class);
+        GriffinJob job = new GriffinJob(1L, 1L, "jobName", "quartzJobName", 
"quartzGroupName", false);
+        given(factory.getObject()).willReturn(scheduler);
+        given(jobRepo.findByDeleted(false)).willReturn(Arrays.asList(job));
+        GriffinException.GetHealthInfoFailureException exception = 
getExceptionForHealthInfo(scheduler);
+        assert exception != null;
+    }
+
+
+    private GriffinException.GetHealthInfoFailureException 
getExceptionForHealthInfo(Scheduler scheduler) throws SchedulerException {
+        GriffinException.GetHealthInfoFailureException exception = null;
+        try {
+            
given(scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willThrow(SchedulerException.class);
+            service.getHealthInfo();
+        } catch (GriffinException.GetHealthInfoFailureException e) {
+            exception = e;
+        }
+        return exception;
+    }
+
+    private GriffinException.GetJobsFailureException 
getExceptionForGetAliveJObs(Scheduler scheduler) throws SchedulerException {
+        GriffinException.GetJobsFailureException exception = null;
+        try {
+            
given(scheduler.getTriggersOfJob(Matchers.any(JobKey.class))).willThrow(new 
GriffinException.GetJobsFailureException());
+            service.getAliveJobs();
+        } catch (GriffinException.GetJobsFailureException e) {
+            exception = e;
+        }
+        return exception;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java 
b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index 6fe64e3..ccb641b 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -19,55 +19,138 @@ under the License.
 
 package org.apache.griffin.core.job;
 
+import org.apache.griffin.core.job.entity.JobInstanceBean;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.util.JsonUtil;
+import org.apache.griffin.core.util.PropertiesUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.apache.griffin.core.util.EntityHelper.*;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 
 
-//@RunWith(SpringRunner.class)
-//public class SparkSubmitJobTest {
-
-//    @TestConfiguration
-//    public static class SchedulerServiceConfiguration {
-//        @Bean
-//        public SparkSubmitJob sparkSubmitJobBean() {
-//            return new SparkSubmitJob();
-//        }
-//
-//        @Bean
-//        public Properties sparkJobProps() {
-//            return PropertiesUtil.getProperties("/sparkJob.properties");
-//        }
-//
-//    }
-//
-//    @Autowired
-//    private SparkSubmitJob sparkSubmitJob;
-//
-//    @MockBean
-//    private MeasureRepo measureRepo;
-//
-//    @MockBean
-//    private RestTemplate restTemplate;
-//
-//    @MockBean
-//    private JobInstanceRepo jobInstanceRepo;
-//
-//    @Before
-//    public void setUp() {
-//    }
-//
-//    @Test
-//    public void testExecute() throws Exception {
-//        String result = 
"{\"id\":1,\"state\":\"starting\",\"appId\":null,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
-//        JobExecutionContext context = mock(JobExecutionContext.class);
-//        JobDetail jd = createJobDetail();
-//        given(context.getJobDetail()).willReturn(jd);
-//        
given(measureRepo.findOne(Long.valueOf(jd.getJobDataMap().getString("measureId")))).willReturn(createATestMeasure("view_item_hourly",
 "ebay"));
-//        Whitebox.setInternalState(sparkSubmitJob, "restTemplate", 
restTemplate);
-//        given(restTemplate.postForObject(Matchers.anyString(), 
Matchers.any(), Matchers.any())).willReturn(result);
-//        given(jobInstanceRepo.save(new JobInstanceBean())).willReturn(new 
JobInstanceBean());
-//        sparkSubmitJob.execute(context);
-//        assertTrue(true);
-//    }
-
-//}
+@RunWith(SpringRunner.class)
+public class SparkSubmitJobTest {
+
+    @TestConfiguration
+    public static class SchedulerServiceConfiguration {
+        @Bean
+        public SparkSubmitJob sparkSubmitJobBean() {
+            return new SparkSubmitJob();
+        }
+
+        @Bean(name = "livyConf")
+        public Properties sparkJobProps() {
+            String path = "sparkJob.properties";
+            return PropertiesUtil.getProperties(path, new 
ClassPathResource(path));
+        }
+
+    }
+
+    @Autowired
+    private SparkSubmitJob sparkSubmitJob;
+
+    @Autowired
+    @Qualifier("livyConf")
+    private Properties livyConfProps;
+
+    @MockBean
+    private RestTemplate restTemplate;
+
+    @MockBean
+    private JobInstanceRepo jobInstanceRepo;
+
+    @MockBean
+    private JobServiceImpl jobService;
+
+
+    @Before
+    public void setUp() {
+    }
+
+    @Test
+    public void testExecuteWithPredicateTriggerGreaterThanRepeat() throws 
Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        JobInstanceBean instance = createJobInstance();
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        SegmentPredicate predicate = createFileExistPredicate();
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), 
JsonUtil.toJson(Arrays.asList(predicate)));
+        given(context.getJobDetail()).willReturn(jd);
+        given(context.getTrigger()).willReturn(createSimpleTrigger(4, 5));
+        
given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+        sparkSubmitJob.execute(context);
+        assertTrue(true);
+    }
+
+    @Test
+    public void testExecuteWithPredicateTriggerLessThanRepeat() throws 
Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        JobInstanceBean instance = createJobInstance();
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        SegmentPredicate predicate = createFileExistPredicate();
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), 
JsonUtil.toJson(Arrays.asList(predicate)));
+        given(context.getJobDetail()).willReturn(jd);
+        given(context.getTrigger()).willReturn(createSimpleTrigger(4, 4));
+        
given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+        sparkSubmitJob.execute(context);
+        assertTrue(true);
+    }
+
+    @Test
+    public void testExecuteWithNoPredicateSuccess() throws Exception {
+        String result = 
"{\"id\":1,\"state\":\"starting\",\"appId\":null,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        JobInstanceBean instance = createJobInstance();
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+        given(context.getJobDetail()).willReturn(jd);
+        
given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+        Whitebox.setInternalState(sparkSubmitJob, "restTemplate", 
restTemplate);
+        given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(), 
Matchers.any())).willReturn(result);
+        given(jobService.pauseJob(Matchers.any(), 
Matchers.any())).willReturn(true);
+        sparkSubmitJob.execute(context);
+        assertTrue(true);
+    }
+
+    @Test
+    public void testExecuteWithPost2LivyException() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        JobInstanceBean instance = createJobInstance();
+        GriffinMeasure measure = createGriffinMeasure("measureName");
+        JobDetail jd = createJobDetail(JsonUtil.toJson(measure), "");
+        given(context.getJobDetail()).willReturn(jd);
+        
given(jobInstanceRepo.findByPredicateName(Matchers.anyString())).willReturn(instance);
+        given(jobService.pauseJob(Matchers.any(), 
Matchers.any())).willReturn(true);
+        sparkSubmitJob.execute(context);
+        assertTrue(true);
+    }
+
+    @Test
+    public void testExecuteWithNullException() throws Exception {
+        JobExecutionContext context = mock(JobExecutionContext.class);
+        sparkSubmitJob.execute(context);
+        assertTrue(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java 
b/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java
new file mode 100644
index 0000000..96a27e4
--- /dev/null
+++ b/service/src/test/java/org/apache/griffin/core/job/repo/JobRepoTest.java
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.AbstractJob;
+import org.apache.griffin.core.job.entity.GriffinJob;
+import org.apache.griffin.core.job.entity.VirtualJob;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
+import org.springframework.boot.test.autoconfigure.orm.jpa.TestEntityManager;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(SpringRunner.class)
+@DataJpaTest
+public class JobRepoTest {
+
+    @Autowired
+    private TestEntityManager entityManager;
+
+    @Autowired
+    private JobRepo jobRepo;
+
+    @Before
+    public void setup() throws Exception {
+        entityManager.clear();
+        entityManager.flush();
+        setEntityManager();
+    }
+
+    @Test
+    public void testCountByJobNameAndDeleted() throws Exception {
+        int count = jobRepo.countByJobNameAndDeleted("griffinJobName1", false);
+        assertEquals(count, 1);
+    }
+
+    @Test
+    public void testFindByDeleted() throws Exception {
+        List<AbstractJob> jobs = jobRepo.findByDeleted(false);
+        assertEquals(jobs.size(), 4);
+    }
+
+    @Test
+    public void findByJobNameAndDeleted() throws Exception {
+        List<AbstractJob> jobs = 
jobRepo.findByJobNameAndDeleted("griffinJobName1", false);
+        assertEquals(jobs.size(), 1);
+    }
+
+    @Test
+    public void findByMeasureIdAndDeleted() throws Exception {
+        List<AbstractJob> jobs = jobRepo.findByMeasureIdAndDeleted(1L, false);
+        assertEquals(jobs.size(), 4);
+    }
+
+    @Test
+    public void findByIdAndDeleted() throws Exception {
+        AbstractJob job = jobRepo.findByIdAndDeleted(1L, true);
+        assert job == null;
+    }
+
+    public void setEntityManager() throws Exception {
+        AbstractJob job1 = new GriffinJob(1L, "griffinJobName1", "qName1", 
"qGroup1", false);
+        AbstractJob job2 = new GriffinJob(1L, "griffinJobName2", "qName2", 
"qGroup2", false);
+        AbstractJob job3 = new VirtualJob("virtualJobName1", 1L, 
"metricName1");
+        AbstractJob job4 = new VirtualJob("virtualJobName2", 1L, 
"metricName2");
+        entityManager.persistAndFlush(job1);
+        entityManager.persistAndFlush(job2);
+        entityManager.persistAndFlush(job3);
+        entityManager.persistAndFlush(job4);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/71fcf93b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
 
b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
index 82a9ff9..5eb5a11 100644
--- 
a/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
+++ 
b/service/src/test/java/org/apache/griffin/core/measure/MeasureControllerTest.java
@@ -33,9 +33,11 @@ import org.springframework.http.MediaType;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
 
-import static 
org.apache.griffin.core.util.EntityHelper.createATestGriffinMeasure;
+import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
 import static org.apache.griffin.core.util.GriffinOperationMessage.*;
 import static org.hamcrest.CoreMatchers.is;
 import static org.mockito.BDDMockito.given;
@@ -60,7 +62,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testGetAllMeasures() throws Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         
given(service.getAllAliveMeasures()).willReturn(Arrays.asList(measure));
 
         mvc.perform(get(URLHelper.API_VERSION_PATH + "/measures"))
@@ -71,7 +73,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testGetMeasuresById() throws Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         given(service.getMeasureById(1L)).willReturn(measure);
 
         mvc.perform(get(URLHelper.API_VERSION_PATH + "/measures/1"))
@@ -109,7 +111,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testUpdateMeasureForSuccess() throws Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         String measureJson = JsonUtil.toJson(measure);
         
given(service.updateMeasure(measure)).willReturn(UPDATE_MEASURE_SUCCESS);
 
@@ -121,7 +123,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testUpdateMeasureForNotFound() throws Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         String measureJson = JsonUtil.toJson(measure);
         given(service.updateMeasure(measure)).willReturn(RESOURCE_NOT_FOUND);
 
@@ -134,7 +136,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testUpdateMeasureForFail() throws Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         String measureJson = JsonUtil.toJson(measure);
         given(service.updateMeasure(measure)).willReturn(UPDATE_MEASURE_FAIL);
 
@@ -148,7 +150,7 @@ public class MeasureControllerTest {
     public void testGetAllMeasuresByOwner() throws Exception {
         String owner = "test";
         List<Measure> measureList = new LinkedList<>();
-        Measure measure = createATestGriffinMeasure("view_item_hourly", owner);
+        Measure measure = createGriffinMeasure("view_item_hourly");
         measureList.add(measure);
         given(service.getAliveMeasuresByOwner(owner)).willReturn(measureList);
 
@@ -161,7 +163,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testCreateNewMeasureForSuccess() throws Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         String measureJson = JsonUtil.toJson(measure);
         
given(service.createMeasure(measure)).willReturn(CREATE_MEASURE_SUCCESS);
 
@@ -173,7 +175,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testCreateNewMeasureForFailWithDuplicate() throws Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         String measureJson = JsonUtil.toJson(measure);
         
given(service.createMeasure(measure)).willReturn(CREATE_MEASURE_FAIL_DUPLICATE);
 
@@ -185,7 +187,7 @@ public class MeasureControllerTest {
 
     @Test
     public void testCreateNewMeasureForFailWithSaveException() throws 
Exception {
-        Measure measure = createATestGriffinMeasure("view_item_hourly", 
"test");
+        Measure measure = createGriffinMeasure("view_item_hourly");
         String measureJson = JsonUtil.toJson(measure);
         
given(service.createMeasure(measure)).willReturn(GriffinOperationMessage.CREATE_MEASURE_FAIL);
 

Reply via email to