Repository: incubator-griffin
Updated Branches:
  refs/heads/master 6a309cd08 -> 0b100a16c


fix measure id not exist , job health and update job service ut

Author: ahutsunshine <[email protected]>

Closes #167 from ahutsunshine/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/0b100a16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/0b100a16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/0b100a16

Branch: refs/heads/master
Commit: 0b100a16c61fc26c44ea59b2d5b22cc85ad911f0
Parents: 6a309cd
Author: ahutsunshine <[email protected]>
Authored: Fri Nov 3 17:59:25 2017 +0800
Committer: Lionel Liu <[email protected]>
Committed: Fri Nov 3 17:59:25 2017 +0800

----------------------------------------------------------------------
 griffin-doc/postman/griffin.json                |   2 +-
 .../core/config/jobConfig/SparkJobConfig.java   |   1 -
 .../apache/griffin/core/job/JobServiceImpl.java |  62 ++++++++---
 .../core/job/entity/LivySessionStates.java      |   2 +-
 .../griffin/core/job/JobServiceImplTest.java    | 107 ++++++++++++++-----
 .../griffin/core/job/SparkSubmitJobTest.java    |   8 +-
 6 files changed, 133 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/griffin-doc/postman/griffin.json
----------------------------------------------------------------------
diff --git a/griffin-doc/postman/griffin.json b/griffin-doc/postman/griffin.json
index f401f09..c692125 100644
--- a/griffin-doc/postman/griffin.json
+++ b/griffin-doc/postman/griffin.json
@@ -1968,7 +1968,7 @@
                        "helperAttributes": {},
                        "time": 1509333184841,
                        "name": "Add job",
-                       "description": "`POST /api/v1/jobs`\n\n#### Request 
Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request 
Parameters\nname | description | type | example value\n--- | --- | --- | 
---\ngroup | job group name | String | BA\njobName | job name | String | 
measure-BA-0-1508466621000 \nmeasureId | measure id | Long | 4\n\n#### Request 
Body\nname | description | type | example value\n--- | --- | --- | 
---\njobRequestBody | custom class composed of job key parameters | 
JobRequestBody | 
`{\"sourcePattern\":\"YYYYMMdd-HH\",\"targetPattern\":\"YYYYMMdd-HH\",\"jobStartTime\":1508428800000,\"interval\":36000,\"groupName\":\"BA\"}`\n\n\n####
 Response Body Sample\n```\n{\n  \"code\": 205,\n  \"description\": \"Create 
Job Succeed\"\n}\n```\nIt may return failed messages.Such as,\n\n```\n{\n  
\"code\": 405,\n  \"description\": \"Create Job Failed\"\n}\n```\n\nThe reason 
for failure may be that trigger key already exists.You should rename group and 
job name to make trigge
 r key unique.",
+                       "description": "`POST /api/v1/jobs`\n\n#### Request 
Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request 
Parameters\nname | description | type | example value\n--- | --- | --- | 
---\ngroup | job group name | String | BA\njobName | job name | String | 
measure-BA-0-1508466621000 \nmeasureId | measure id | Long | 4\n\n#### Request 
Body\nname | description | type | example value\n--- | --- | --- | 
---\njobRequestBody | custom class composed of job key parameters | 
JobRequestBody | 
`{\"sourcePattern\":\"YYYYMMdd-HH\",\"targetPattern\":\"YYYYMMdd-HH\",\"jobStartTime\":1508428800000,\"interval\":36000,\"groupName\":\"BA\"}`\n\n\n####
 Response Body Sample\n```\n{\n  \"code\": 205,\n  \"description\": \"Create 
Job Succeed\"\n}\n```\nIt may return failed messages.Such as,\n\n```\n{\n  
\"code\": 405,\n  \"description\": \"Create Job Failed\"\n}\n```\n\nThe reason 
for failure may be that trigger key already exists or the measure id associated 
with job may not exist
 . Firstly,You should check group and job name to make trigger key unique. 
Secondly,you should check whether your measure id exists.",
                        "collectionId": "689bb3f2-1c6a-b45e-5409-4df1ef07554c",
                        "responses": [
                                {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java
 
b/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java
index e089872..ffaef70 100644
--- 
a/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java
+++ 
b/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java
@@ -19,7 +19,6 @@ under the License.
 
 package org.apache.griffin.core.config.jobConfig;
 
-import org.apache.griffin.core.util.JsonUtil;
 import org.apache.griffin.core.util.PropertiesUtil;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java 
b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 6e76658..425368c 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -66,9 +66,14 @@ public class JobServiceImpl implements JobService {
     private JobInstanceRepo jobInstanceRepo;
     @Autowired
     private Properties sparkJobProps;
+    @Autowired
+    private MeasureRepo measureRepo;
+
+    private RestTemplate restTemplate;
 
 
     public JobServiceImpl() {
+        restTemplate = new RestTemplate();
     }
 
     @Override
@@ -145,6 +150,11 @@ public class JobServiceImpl implements JobService {
                 return CREATE_JOB_FAIL;
             }
 
+            if (!isMeasureIdAvailable(measureId)) {
+                LOGGER.error("The measure id {} does't exist.", measureId);
+                return CREATE_JOB_FAIL;
+            }
+
             JobDetail jobDetail = addJobDetail(scheduler, groupName, jobName, 
measureId, jobRequestBody);
             scheduler.scheduleJob(newTriggerInstance(triggerKey, jobDetail, 
interval, jobStartTime));
             return GriffinOperationMessage.CREATE_JOB_SUCCESS;
@@ -157,6 +167,14 @@ public class JobServiceImpl implements JobService {
         }
     }
 
+    private Boolean isMeasureIdAvailable(long measureId) {
+        Measure measure = measureRepo.findOne(measureId);
+        if (measure != null && !measure.getDeleted()) {
+            return true;
+        }
+        return false;
+    }
+
     private JobDetail addJobDetail(Scheduler scheduler, String groupName, 
String jobName, Long measureId, JobRequestBody jobRequestBody) throws 
SchedulerException {
         JobKey jobKey = jobKey(jobName, groupName);
         JobDetail jobDetail;
@@ -262,8 +280,7 @@ public class JobServiceImpl implements JobService {
      * 2. deleteJob
      *
      * @param measure measure data quality between source and target dataset
-     * @throws SchedulerException  quartz throws if schedule has problem
-     *
+     * @throws SchedulerException quartz throws if schedule has problem
      */
     public void deleteJobsRelateToMeasure(Measure measure) throws 
SchedulerException {
         Scheduler scheduler = factory.getObject();
@@ -320,7 +337,6 @@ public class JobServiceImpl implements JobService {
     }
 
     private void setJobInstanceInfo(JobInstance jobInstance, String uri, 
String group, String jobName) {
-        RestTemplate restTemplate = new RestTemplate();
         TypeReference<HashMap<String, Object>> type = new 
TypeReference<HashMap<String, Object>>() {
         };
         try {
@@ -366,18 +382,12 @@ public class JobServiceImpl implements JobService {
         int jobCount = 0;
         int notHealthyCount = 0;
         try {
-            for (JobKey jobKey : 
scheduler.getJobKeys(GroupMatcher.anyGroup())) {
-                jobCount++;
-                String jobName = jobKey.getName();
-                String jobGroup = jobKey.getGroup();
-                Pageable pageRequest = new PageRequest(0, 1, 
Sort.Direction.DESC, "timestamp");
-                JobInstance latestJobInstance;
-                List<JobInstance> jobInstances = 
jobInstanceRepo.findByGroupNameAndJobName(jobGroup, jobName, pageRequest);
-                if (jobInstances != null && jobInstances.size() > 0) {
-                    latestJobInstance = jobInstances.get(0);
-                    if 
(!LivySessionStates.isHeathy(latestJobInstance.getState())) {
-                        notHealthyCount++;
-                    }
+            Set<JobKey> jobKeys = 
scheduler.getJobKeys(GroupMatcher.anyGroup());
+            for (JobKey jobKey : jobKeys) {
+                List<Trigger> triggers = (List<Trigger>) 
scheduler.getTriggersOfJob(jobKey);
+                if (triggers != null && triggers.size() != 0 && 
!isJobDeleted(scheduler, jobKey)) {
+                    jobCount++;
+                    notHealthyCount = getJobNotHealthyCount(notHealthyCount, 
jobKey);
                 }
             }
         } catch (SchedulerException e) {
@@ -387,13 +397,33 @@ public class JobServiceImpl implements JobService {
         return new JobHealth(jobCount - notHealthyCount, jobCount);
     }
 
+    private int getJobNotHealthyCount(int notHealthyCount, JobKey jobKey) {
+        if (!isJobHealthy(jobKey)) {
+            notHealthyCount++;
+        }
+        return notHealthyCount;
+    }
+
+    private Boolean isJobHealthy(JobKey jobKey) {
+        Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, 
"timestamp");
+        JobInstance latestJobInstance;
+        List<JobInstance> jobInstances = 
jobInstanceRepo.findByGroupNameAndJobName(jobKey.getGroup(), jobKey.getName(), 
pageRequest);
+        if (jobInstances != null && jobInstances.size() > 0) {
+            latestJobInstance = jobInstances.get(0);
+            if (LivySessionStates.isHealthy(latestJobInstance.getState())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
     public Map<String, List<Map<String, Serializable>>> 
getJobDetailsGroupByMeasureId() {
         Map<String, List<Map<String, Serializable>>> jobDetailsMap = new 
HashMap<>();
         List<Map<String, Serializable>> jobInfoList = getAliveJobs();
         for (Map<String, Serializable> jobInfo : jobInfoList) {
             String measureId = (String) jobInfo.get("measureId");
-            List<Map<String, Serializable>> jobs = 
jobDetailsMap.getOrDefault(measureId, new ArrayList<Map<String, 
Serializable>>());
+            List<Map<String, Serializable>> jobs = 
jobDetailsMap.getOrDefault(measureId, new ArrayList<>());
             jobs.add(jobInfo);
             jobDetailsMap.put(measureId, jobs);
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
 
b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
index 5839fb5..773bd98 100644
--- 
a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
+++ 
b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
@@ -81,7 +81,7 @@ public class LivySessionStates {
         }
     }
 
-    public static boolean isHeathy(State state) {
+    public static boolean isHealthy(State state) {
         if (State.error.equals(state) || State.dead.equals(state) || 
State.shutting_down.equals(state)) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java 
b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
index a838933..ef9b34b 100644
--- a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java
@@ -24,14 +24,20 @@ import org.apache.griffin.core.job.entity.JobInstance;
 import org.apache.griffin.core.job.entity.JobRequestBody;
 import org.apache.griffin.core.job.entity.LivySessionStates;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.measure.repo.MeasureRepo;
 import org.apache.griffin.core.util.GriffinOperationMessage;
+import org.apache.griffin.core.util.PropertiesUtil;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
 import org.quartz.*;
 import org.quartz.impl.JobDetailImpl;
 import org.quartz.impl.matchers.GroupMatcher;
+import org.quartz.impl.triggers.SimpleTriggerImpl;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.test.mock.mockito.MockBean;
@@ -41,12 +47,11 @@ import org.springframework.data.domain.Pageable;
 import org.springframework.data.domain.Sort;
 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.web.client.HttpClientErrorException;
-import org.springframework.web.client.RestClientException;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.*;
 
+import static 
org.apache.griffin.core.measure.MeasureTestHelper.createATestMeasure;
 import static 
org.apache.griffin.core.measure.MeasureTestHelper.createJobDetail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -78,11 +83,22 @@ public class JobServiceImplTest {
     @MockBean
     private SchedulerFactoryBean factory;
 
+    @MockBean
+    private Properties sparkJobProps;
+
+    @MockBean
+    private RestTemplate restTemplate;
+
     @Autowired
-    public JobServiceImpl service;
+    private JobServiceImpl service;
+
+    @MockBean
+    private MeasureRepo measureRepo;
+
 
     @Before
     public void setup() {
+
     }
 
     @Test
@@ -128,12 +144,13 @@ public class JobServiceImplTest {
     }
 
     @Test
-    public void testAddJobForSuccess() {
+    public void testAddJobForSuccess() throws Exception {
         JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", 
"YYYYMMdd-HH",
                 String.valueOf(System.currentTimeMillis()), 
String.valueOf(System.currentTimeMillis()), "1000");
         Scheduler scheduler = Mockito.mock(Scheduler.class);
         given(factory.getObject()).willReturn(scheduler);
-        assertEquals(service.addJob("BA", "jobName", 0L, jobRequestBody), 
GriffinOperationMessage.CREATE_JOB_SUCCESS);
+        
given(measureRepo.findOne(1L)).willReturn(createATestMeasure("measureName","org"));
+        assertEquals(service.addJob("BA", "jobName", 1L, jobRequestBody), 
GriffinOperationMessage.CREATE_JOB_SUCCESS);
     }
 
     @Test
@@ -208,19 +225,55 @@ public class JobServiceImplTest {
         assertEquals(service.findInstancesOfJob(groupName, jobName, page, 
size).size(), 1);
     }
 
-//    @Test
-//    public void testSyncInstancesOfJob() {
-//        JobInstance instance = newJobInstance();
-//        instance.setSessionId(1234564);
-//        String group = "groupName";
-//        String jobName = "jobName";
-//        RestTemplate restTemplate = mock(RestTemplate.class);
-//        
given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) 
(new Object[]{group, jobName})));
-//        given(jobInstanceRepo.findByGroupNameAndJobName(group, 
jobName)).willReturn(Arrays.asList(instance));
-//        given(restTemplate.getForObject("uri", 
String.class)).willThrow(RestClientException.class);
-//        RestClientException restClientException = 
getJobInstanceStatusExpectException();
-//        assert (restClientException != null);
-//    }
+    @Test
+    public void testSyncInstancesOfJobForSuccess() {
+        JobInstance instance = newJobInstance();
+        String group = "groupName";
+        String jobName = "jobName";
+        
given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) 
(new Object[]{group, jobName})));
+        given(jobInstanceRepo.findByGroupNameAndJobName(group, 
jobName)).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        String result = 
"{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn(result);
+        service.syncInstancesOfAllJobs();
+    }
+
+
+    @Test
+    public void testSyncInstancesOfJobForRestClientException() {
+        JobInstance instance = newJobInstance();
+        instance.setSessionId(1234564);
+        String group = "groupName";
+        String jobName = "jobName";
+        
given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) 
(new Object[]{group, jobName})));
+        given(jobInstanceRepo.findByGroupNameAndJobName(group, 
jobName)).willReturn(Arrays.asList(instance));
+        
given(sparkJobProps.getProperty("livy.uri")).willReturn(PropertiesUtil.getProperties("/sparkJob.properties").getProperty("livy.uri"));
+        service.syncInstancesOfAllJobs();
+    }
+
+    @Test
+    public void testSyncInstancesOfJobForIOException() throws Exception {
+        JobInstance instance = newJobInstance();
+        String group = "groupName";
+        String jobName = "jobName";
+        
given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) 
(new Object[]{group, jobName})));
+        given(jobInstanceRepo.findByGroupNameAndJobName(group, 
jobName)).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn("result");
+        service.syncInstancesOfAllJobs();
+    }
+
+    @Test
+    public void testSyncInstancesOfJobForIllegalArgumentException() throws 
Exception {
+        JobInstance instance = newJobInstance();
+        String group = "groupName";
+        String jobName = "jobName";
+        
given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) 
(new Object[]{group, jobName})));
+        given(jobInstanceRepo.findByGroupNameAndJobName(group, 
jobName)).willReturn(Arrays.asList(instance));
+        Whitebox.setInternalState(service, "restTemplate", restTemplate);
+        given(restTemplate.getForObject(Matchers.anyString(), 
Matchers.any())).willReturn("{\"state\":\"wrong\"}");
+        service.syncInstancesOfAllJobs();
+    }
 
     @Test
     public void testGetHealthInfoWithHealthy() throws SchedulerException {
@@ -228,6 +281,15 @@ public class JobServiceImplTest {
         given(factory.getObject()).willReturn(scheduler);
         given(scheduler.getJobGroupNames()).willReturn(Arrays.asList("BA"));
         JobKey jobKey = new JobKey("test");
+        SimpleTrigger trigger = new SimpleTriggerImpl();
+        List<Trigger> triggers = new ArrayList<>();
+        triggers.add(trigger);
+        given((List<Trigger>) 
scheduler.getTriggersOfJob(jobKey)).willReturn(triggers);
+        JobDataMap jobDataMap = mock(JobDataMap.class);
+        JobDetailImpl jobDetail = new JobDetailImpl();
+        jobDetail.setJobDataMap(jobDataMap);
+        given(scheduler.getJobDetail(jobKey)).willReturn(jobDetail);
+        given(jobDataMap.getBooleanFromString("deleted")).willReturn(false);
         Set<JobKey> jobKeySet = new HashSet<>();
         jobKeySet.add(jobKey);
         
given(scheduler.getJobKeys(GroupMatcher.anyGroup())).willReturn((jobKeySet));
@@ -266,15 +328,6 @@ public class JobServiceImplTest {
                         .repeatForever()).startAt(new Date()).build();
     }
 
-    private RestClientException getJobInstanceStatusExpectException() {
-        RestClientException exception = null;
-        try {
-            service.syncInstancesOfAllJobs();
-        } catch (RestClientException e) {
-            exception = e;
-        }
-        return exception;
-    }
 
     private GriffinException.GetJobsFailureException 
getTriggersOfJobExpectException(Scheduler scheduler, JobKey jobKey) {
         GriffinException.GetJobsFailureException exception = null;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
----------------------------------------------------------------------
diff --git 
a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java 
b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
index 92072e3..6433d04 100644
--- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java
@@ -28,6 +28,8 @@ import org.apache.griffin.core.util.PropertiesUtil;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.powermock.reflect.Whitebox;
 import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -81,13 +83,13 @@ public class SparkSubmitJobTest {
 
     @Test
     public void testExecute() throws Exception {
-        String livyUri = null;
         String result = 
"{\"id\":1,\"state\":\"starting\",\"appId\":null,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}";
         JobExecutionContext context = mock(JobExecutionContext.class);
         JobDetail jd = createJobDetail();
         given(context.getJobDetail()).willReturn(jd);
-        
given(measureRepo.findOne(Long.valueOf(jd.getJobDataMap().getString("measureId")))).willReturn(createATestMeasure("view_item_hourly",
 "test"));
-        given(restTemplate.postForObject(livyUri, new SparkJobDO(), 
String.class)).willReturn(result);
+        
given(measureRepo.findOne(Long.valueOf(jd.getJobDataMap().getString("measureId")))).willReturn(createATestMeasure("view_item_hourly",
 "ebay"));
+        Whitebox.setInternalState(sparkSubmitJob,"restTemplate",restTemplate);
+        given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(), 
Matchers.any())).willReturn(result);
         given(jobInstanceRepo.save(new JobInstance())).willReturn(new 
JobInstance());
         sparkSubmitJob.execute(context);
         assertTrue(true);

Reply via email to