Repository: incubator-griffin Updated Branches: refs/heads/master 6a309cd08 -> 0b100a16c
fix measure id not exist , job health and update job service ut Author: ahutsunshine <[email protected]> Closes #167 from ahutsunshine/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/0b100a16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/0b100a16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/0b100a16 Branch: refs/heads/master Commit: 0b100a16c61fc26c44ea59b2d5b22cc85ad911f0 Parents: 6a309cd Author: ahutsunshine <[email protected]> Authored: Fri Nov 3 17:59:25 2017 +0800 Committer: Lionel Liu <[email protected]> Committed: Fri Nov 3 17:59:25 2017 +0800 ---------------------------------------------------------------------- griffin-doc/postman/griffin.json | 2 +- .../core/config/jobConfig/SparkJobConfig.java | 1 - .../apache/griffin/core/job/JobServiceImpl.java | 62 ++++++++--- .../core/job/entity/LivySessionStates.java | 2 +- .../griffin/core/job/JobServiceImplTest.java | 107 ++++++++++++++----- .../griffin/core/job/SparkSubmitJobTest.java | 8 +- 6 files changed, 133 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/griffin-doc/postman/griffin.json ---------------------------------------------------------------------- diff --git a/griffin-doc/postman/griffin.json b/griffin-doc/postman/griffin.json index f401f09..c692125 100644 --- a/griffin-doc/postman/griffin.json +++ b/griffin-doc/postman/griffin.json @@ -1968,7 +1968,7 @@ "helperAttributes": {}, "time": 1509333184841, "name": "Add job", - "description": "`POST /api/v1/jobs`\n\n#### Request Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request Parameters\nname | description | type | example value\n--- | --- | --- | ---\ngroup | job group name | String | BA\njobName | job name | String | measure-BA-0-1508466621000 \nmeasureId | measure id | Long | 4\n\n#### Request Body\nname | description | type | example value\n--- | --- | --- | ---\njobRequestBody | custom class composed of job key parameters | JobRequestBody | `{\"sourcePattern\":\"YYYYMMdd-HH\",\"targetPattern\":\"YYYYMMdd-HH\",\"jobStartTime\":1508428800000,\"interval\":36000,\"groupName\":\"BA\"}`\n\n\n#### Response Body Sample\n```\n{\n \"code\": 205,\n \"description\": \"Create Job Succeed\"\n}\n```\nIt may return failed messages.Such as,\n\n```\n{\n \"code\": 405,\n \"description\": \"Create Job Failed\"\n}\n```\n\nThe reason for failure may be that trigger key already exists.You should rename group and job name to make trigge r key unique.", + "description": "`POST /api/v1/jobs`\n\n#### Request Header\nkey | value\n--- | ---\nContent-Type | application/json\n\n#### Request Parameters\nname | description | type | example value\n--- | --- | --- | ---\ngroup | job group name | String | BA\njobName | job name | String | measure-BA-0-1508466621000 \nmeasureId | measure id | Long | 4\n\n#### Request Body\nname | description | type | example value\n--- | --- | --- | ---\njobRequestBody | custom class composed of job key parameters | JobRequestBody | `{\"sourcePattern\":\"YYYYMMdd-HH\",\"targetPattern\":\"YYYYMMdd-HH\",\"jobStartTime\":1508428800000,\"interval\":36000,\"groupName\":\"BA\"}`\n\n\n#### Response Body Sample\n```\n{\n \"code\": 205,\n \"description\": \"Create Job Succeed\"\n}\n```\nIt may return failed messages.Such as,\n\n```\n{\n \"code\": 405,\n \"description\": \"Create Job Failed\"\n}\n```\n\nThe reason for failure may be that trigger key already exists or the measure id associated with job may not exist . Firstly,You should check group and job name to make trigger key unique. Secondly,you should check whether your measure id exists.", "collectionId": "689bb3f2-1c6a-b45e-5409-4df1ef07554c", "responses": [ { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java b/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java index e089872..ffaef70 100644 --- a/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java +++ b/service/src/main/java/org/apache/griffin/core/config/jobConfig/SparkJobConfig.java @@ -19,7 +19,6 @@ under the License. package org.apache.griffin.core.config.jobConfig; -import org.apache.griffin.core.util.JsonUtil; import org.apache.griffin.core.util.PropertiesUtil; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index 6e76658..425368c 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -66,9 +66,14 @@ public class JobServiceImpl implements JobService { private JobInstanceRepo jobInstanceRepo; @Autowired private Properties sparkJobProps; + @Autowired + private MeasureRepo measureRepo; + + private RestTemplate restTemplate; public JobServiceImpl() { + restTemplate = new RestTemplate(); } @Override @@ -145,6 +150,11 @@ public class JobServiceImpl implements JobService { return CREATE_JOB_FAIL; } + if (!isMeasureIdAvailable(measureId)) { + LOGGER.error("The measure id {} does't exist.", measureId); + return CREATE_JOB_FAIL; + } + JobDetail jobDetail = addJobDetail(scheduler, groupName, jobName, measureId, jobRequestBody); scheduler.scheduleJob(newTriggerInstance(triggerKey, jobDetail, interval, jobStartTime)); return GriffinOperationMessage.CREATE_JOB_SUCCESS; @@ -157,6 +167,14 @@ public class JobServiceImpl implements JobService { } } + private Boolean isMeasureIdAvailable(long measureId) { + Measure measure = measureRepo.findOne(measureId); + if (measure != null && !measure.getDeleted()) { + return true; + } + return false; + } + private JobDetail addJobDetail(Scheduler scheduler, String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) throws SchedulerException { JobKey jobKey = jobKey(jobName, groupName); JobDetail jobDetail; @@ -262,8 +280,7 @@ public class JobServiceImpl implements JobService { * 2. deleteJob * * @param measure measure data quality between source and target dataset - * @throws SchedulerException quartz throws if schedule has problem - * + * @throws SchedulerException quartz throws if schedule has problem */ public void deleteJobsRelateToMeasure(Measure measure) throws SchedulerException { Scheduler scheduler = factory.getObject(); @@ -320,7 +337,6 @@ public class JobServiceImpl implements JobService { } private void setJobInstanceInfo(JobInstance jobInstance, String uri, String group, String jobName) { - RestTemplate restTemplate = new RestTemplate(); TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() { }; try { @@ -366,18 +382,12 @@ public class JobServiceImpl implements JobService { int jobCount = 0; int notHealthyCount = 0; try { - for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) { - jobCount++; - String jobName = jobKey.getName(); - String jobGroup = jobKey.getGroup(); - Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp"); - JobInstance latestJobInstance; - List<JobInstance> jobInstances = jobInstanceRepo.findByGroupNameAndJobName(jobGroup, jobName, pageRequest); - if (jobInstances != null && jobInstances.size() > 0) { - latestJobInstance = jobInstances.get(0); - if (!LivySessionStates.isHeathy(latestJobInstance.getState())) { - notHealthyCount++; - } + Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup()); + for (JobKey jobKey : jobKeys) { + List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey); + if (triggers != null && triggers.size() != 0 && !isJobDeleted(scheduler, jobKey)) { + jobCount++; + notHealthyCount = getJobNotHealthyCount(notHealthyCount, jobKey); } } } catch (SchedulerException e) { @@ -387,13 +397,33 @@ public class JobServiceImpl implements JobService { return new JobHealth(jobCount - notHealthyCount, jobCount); } + private int getJobNotHealthyCount(int notHealthyCount, JobKey jobKey) { + if (!isJobHealthy(jobKey)) { + notHealthyCount++; + } + return notHealthyCount; + } + + private Boolean isJobHealthy(JobKey jobKey) { + Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp"); + JobInstance latestJobInstance; + List<JobInstance> jobInstances = jobInstanceRepo.findByGroupNameAndJobName(jobKey.getGroup(), jobKey.getName(), pageRequest); + if (jobInstances != null && jobInstances.size() > 0) { + latestJobInstance = jobInstances.get(0); + if (LivySessionStates.isHealthy(latestJobInstance.getState())) { + return true; + } + } + return false; + } + @Override public Map<String, List<Map<String, Serializable>>> getJobDetailsGroupByMeasureId() { Map<String, List<Map<String, Serializable>>> jobDetailsMap = new HashMap<>(); List<Map<String, Serializable>> jobInfoList = getAliveJobs(); for (Map<String, Serializable> jobInfo : jobInfoList) { String measureId = (String) jobInfo.get("measureId"); - List<Map<String, Serializable>> jobs = jobDetailsMap.getOrDefault(measureId, new ArrayList<Map<String, Serializable>>()); + List<Map<String, Serializable>> jobs = jobDetailsMap.getOrDefault(measureId, new ArrayList<>()); jobs.add(jobInfo); jobDetailsMap.put(measureId, jobs); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java index 5839fb5..773bd98 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java @@ -81,7 +81,7 @@ public class LivySessionStates { } } - public static boolean isHeathy(State state) { + public static boolean isHealthy(State state) { if (State.error.equals(state) || State.dead.equals(state) || State.shutting_down.equals(state)) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java index a838933..ef9b34b 100644 --- a/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java +++ b/service/src/test/java/org/apache/griffin/core/job/JobServiceImplTest.java @@ -24,14 +24,20 @@ import org.apache.griffin.core.job.entity.JobInstance; import org.apache.griffin.core.job.entity.JobRequestBody; import org.apache.griffin.core.job.entity.LivySessionStates; import org.apache.griffin.core.job.repo.JobInstanceRepo; +import org.apache.griffin.core.measure.repo.MeasureRepo; import org.apache.griffin.core.util.GriffinOperationMessage; +import org.apache.griffin.core.util.PropertiesUtil; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; import org.mockito.Mockito; +import org.powermock.reflect.Whitebox; import org.quartz.*; import org.quartz.impl.JobDetailImpl; import org.quartz.impl.matchers.GroupMatcher; +import org.quartz.impl.triggers.SimpleTriggerImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; @@ -41,12 +47,11 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.web.client.HttpClientErrorException; -import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; import java.util.*; +import static org.apache.griffin.core.measure.MeasureTestHelper.createATestMeasure; import static org.apache.griffin.core.measure.MeasureTestHelper.createJobDetail; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -78,11 +83,22 @@ public class JobServiceImplTest { @MockBean private SchedulerFactoryBean factory; + @MockBean + private Properties sparkJobProps; + + @MockBean + private RestTemplate restTemplate; + @Autowired - public JobServiceImpl service; + private JobServiceImpl service; + + @MockBean + private MeasureRepo measureRepo; + @Before public void setup() { + } @Test @@ -128,12 +144,13 @@ public class JobServiceImplTest { } @Test - public void testAddJobForSuccess() { + public void testAddJobForSuccess() throws Exception { JobRequestBody jobRequestBody = new JobRequestBody("YYYYMMdd-HH", "YYYYMMdd-HH", String.valueOf(System.currentTimeMillis()), String.valueOf(System.currentTimeMillis()), "1000"); Scheduler scheduler = Mockito.mock(Scheduler.class); given(factory.getObject()).willReturn(scheduler); - assertEquals(service.addJob("BA", "jobName", 0L, jobRequestBody), GriffinOperationMessage.CREATE_JOB_SUCCESS); + given(measureRepo.findOne(1L)).willReturn(createATestMeasure("measureName","org")); + assertEquals(service.addJob("BA", "jobName", 1L, jobRequestBody), GriffinOperationMessage.CREATE_JOB_SUCCESS); } @Test @@ -208,19 +225,55 @@ public class JobServiceImplTest { assertEquals(service.findInstancesOfJob(groupName, jobName, page, size).size(), 1); } -// @Test -// public void testSyncInstancesOfJob() { -// JobInstance instance = newJobInstance(); -// instance.setSessionId(1234564); -// String group = "groupName"; -// String jobName = "jobName"; -// RestTemplate restTemplate = mock(RestTemplate.class); -// given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) (new Object[]{group, jobName}))); -// given(jobInstanceRepo.findByGroupNameAndJobName(group, jobName)).willReturn(Arrays.asList(instance)); -// given(restTemplate.getForObject("uri", String.class)).willThrow(RestClientException.class); -// RestClientException restClientException = getJobInstanceStatusExpectException(); -// assert (restClientException != null); -// } + @Test + public void testSyncInstancesOfJobForSuccess() { + JobInstance instance = newJobInstance(); + String group = "groupName"; + String jobName = "jobName"; + given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) (new Object[]{group, jobName}))); + given(jobInstanceRepo.findByGroupNameAndJobName(group, jobName)).willReturn(Arrays.asList(instance)); + Whitebox.setInternalState(service, "restTemplate", restTemplate); + String result = "{\"id\":1,\"state\":\"starting\",\"appId\":123,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}"; + given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn(result); + service.syncInstancesOfAllJobs(); + } + + + @Test + public void testSyncInstancesOfJobForRestClientException() { + JobInstance instance = newJobInstance(); + instance.setSessionId(1234564); + String group = "groupName"; + String jobName = "jobName"; + given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) (new Object[]{group, jobName}))); + given(jobInstanceRepo.findByGroupNameAndJobName(group, jobName)).willReturn(Arrays.asList(instance)); + given(sparkJobProps.getProperty("livy.uri")).willReturn(PropertiesUtil.getProperties("/sparkJob.properties").getProperty("livy.uri")); + service.syncInstancesOfAllJobs(); + } + + @Test + public void testSyncInstancesOfJobForIOException() throws Exception { + JobInstance instance = newJobInstance(); + String group = "groupName"; + String jobName = "jobName"; + given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) (new Object[]{group, jobName}))); + given(jobInstanceRepo.findByGroupNameAndJobName(group, jobName)).willReturn(Arrays.asList(instance)); + Whitebox.setInternalState(service, "restTemplate", restTemplate); + given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn("result"); + service.syncInstancesOfAllJobs(); + } + + @Test + public void testSyncInstancesOfJobForIllegalArgumentException() throws Exception { + JobInstance instance = newJobInstance(); + String group = "groupName"; + String jobName = "jobName"; + given(jobInstanceRepo.findGroupWithJobName()).willReturn(Arrays.asList((Object) (new Object[]{group, jobName}))); + given(jobInstanceRepo.findByGroupNameAndJobName(group, jobName)).willReturn(Arrays.asList(instance)); + Whitebox.setInternalState(service, "restTemplate", restTemplate); + given(restTemplate.getForObject(Matchers.anyString(), Matchers.any())).willReturn("{\"state\":\"wrong\"}"); + service.syncInstancesOfAllJobs(); + } @Test public void testGetHealthInfoWithHealthy() throws SchedulerException { @@ -228,6 +281,15 @@ public class JobServiceImplTest { given(factory.getObject()).willReturn(scheduler); given(scheduler.getJobGroupNames()).willReturn(Arrays.asList("BA")); JobKey jobKey = new JobKey("test"); + SimpleTrigger trigger = new SimpleTriggerImpl(); + List<Trigger> triggers = new ArrayList<>(); + triggers.add(trigger); + given((List<Trigger>) scheduler.getTriggersOfJob(jobKey)).willReturn(triggers); + JobDataMap jobDataMap = mock(JobDataMap.class); + JobDetailImpl jobDetail = new JobDetailImpl(); + jobDetail.setJobDataMap(jobDataMap); + given(scheduler.getJobDetail(jobKey)).willReturn(jobDetail); + given(jobDataMap.getBooleanFromString("deleted")).willReturn(false); Set<JobKey> jobKeySet = new HashSet<>(); jobKeySet.add(jobKey); given(scheduler.getJobKeys(GroupMatcher.anyGroup())).willReturn((jobKeySet)); @@ -266,15 +328,6 @@ public class JobServiceImplTest { .repeatForever()).startAt(new Date()).build(); } - private RestClientException getJobInstanceStatusExpectException() { - RestClientException exception = null; - try { - service.syncInstancesOfAllJobs(); - } catch (RestClientException e) { - exception = e; - } - return exception; - } private GriffinException.GetJobsFailureException getTriggersOfJobExpectException(Scheduler scheduler, JobKey jobKey) { GriffinException.GetJobsFailureException exception = null; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/0b100a16/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java index 92072e3..6433d04 100644 --- a/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java +++ b/service/src/test/java/org/apache/griffin/core/job/SparkSubmitJobTest.java @@ -28,6 +28,8 @@ import org.apache.griffin.core.util.PropertiesUtil; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.powermock.reflect.Whitebox; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.springframework.beans.factory.annotation.Autowired; @@ -81,13 +83,13 @@ public class SparkSubmitJobTest { @Test public void testExecute() throws Exception { - String livyUri = null; String result = "{\"id\":1,\"state\":\"starting\",\"appId\":null,\"appInfo\":{\"driverLogUrl\":null,\"sparkUiUrl\":null},\"log\":[]}"; JobExecutionContext context = mock(JobExecutionContext.class); JobDetail jd = createJobDetail(); given(context.getJobDetail()).willReturn(jd); - given(measureRepo.findOne(Long.valueOf(jd.getJobDataMap().getString("measureId")))).willReturn(createATestMeasure("view_item_hourly", "test")); - given(restTemplate.postForObject(livyUri, new SparkJobDO(), String.class)).willReturn(result); + given(measureRepo.findOne(Long.valueOf(jd.getJobDataMap().getString("measureId")))).willReturn(createATestMeasure("view_item_hourly", "ebay")); + Whitebox.setInternalState(sparkSubmitJob,"restTemplate",restTemplate); + given(restTemplate.postForObject(Matchers.anyString(), Matchers.any(), Matchers.any())).willReturn(result); given(jobInstanceRepo.save(new JobInstance())).willReturn(new JobInstance()); sparkSubmitJob.execute(context); assertTrue(true);
